Some ideas on improving the speed of HTTP requests:

  1. Importance of sessions / HTTP keep-alive
  2. Importance of Concurrency/Asynchronous Programming
  3. Importance of Multi-Tasking
  4. Importance of adapting code to situation

1 – Importance of sessions / HTTP keep-alive

Let’s start with the most simple example – we submit a requests.get() for each url we want to query and save the processed response

Example A: 220 requests per second

import requests

def process_response(url, qid):
    try:
        response = requests.get(url)
        s = float(response.status_code)
        if s == 200:
            json_geocode = response.json()
            tot_time_s = json_geocode['paths'][0]['time']
            tot_dist_m = json_geocode['paths'][0]['distance']
            return [qid, s, tot_time_s, tot_dist_m]
        elif s == 400:
            print("Done but no route for row: ", qid)
            return [qid, 999, 0, 0]
        else:
            print("Done but unknown error for: ", s)
            return [qid, 999, 0, 0]
    except Exception as err:
        print("Not done - unknown error: ", err)
        return [qid, 999, 0, 0]

stime = time.time()
calc_routes = []
for url, qid in url_routes:
    calc_routes.append(process_response(url, qid))
dur = time.time() - stime
print("Calculated %d distances in %.2f seconds:"
      " %.0f per second" % (len(calc_routes), dur, len(calc_routes) / dur))

If we run “netstat -n” we get the following output:

nosessions

This means the script opens a new socket for each request. This has two disadvantages: first, it is costly to open a new connection to the (same) server each time and second, we run the risk of exhausting our sockets as we have a limited supply and we will get the following error:

[WinError 10048] Only one usage of each socket address (protocol/network address/port) is normally permitted’

The problem is described in more detail here:

TCP/IP port exhaustion can occur on a client computer if the client computer is engaging in an unusually high number of TCIP/IP socket connections. This can occur if many client applications are initiating connections.

If all of the available ephemeral ports are allocated to client applications then the client experiences a condition known as TCP/IP port exhaustion. When TCP/IP port exhaustion occurs, client port reservations cannot be made and errors will occur in client applications that attempt to connect to a server via TCP/IP sockets.

Let’s try using the Requests.Session() object to persist a session; in their words: “if you’re making several requests to the same host, the underlying TCP connection will be reused, which can result in a significant performance increase (see HTTP persistent connection)”. However, it is worth noting that not all servers may recognise the keep-alive request.

Example B: 280 RPS

def process_response(sess, url, qid):
    try:
        response = sess.get(url)
        s = float(response.status_code)
        if s == 200:
            json_geocode = response.json()
            tot_time_s = json_geocode['paths'][0]['time']
            tot_dist_m = json_geocode['paths'][0]['distance']
            return [qid, s, tot_time_s, tot_dist_m]
        elif s == 400:
            print("Done but no route for row: ", qid)
            return [qid, 999, 0, 0]
        else:
            print("Done but unknown error for: ", s)
            return [qid, 999, 0, 0]
    except Exception as err:
        print("Not done - unknown error: ", err)
        return [qid, 999, 0, 0]

session = requests.session()
calc_routes = []
for url, qid in url_routes:
    calc_routes.append(process_response(session, url, qid))

We can verify that we have kept to just one socket:

one_sess

Another approach is to use urllib3.HTTPConnectionPool (“If you need to make requests to the same host repeatedly, then you should use a HTTPConnectionPool”)

Example C: 350 RPS

from urllib3 import HTTPConnectionPool

def process_response(sess, url, qid):
    try:
        response = sess.request('GET', url)
        s = float(response.status)
        if s == 200:
            json_geocode = json.loads(response.data.decode('utf-8'))
            tot_time_s = json_geocode['paths'][0]['time']
            tot_dist_m = json_geocode['paths'][0]['distance']
            return [qid, s, tot_time_s, tot_dist_m]
        elif s == 400:
            print("Done but no route for row: ", qid)
            return [qid, 999, 0, 0]
        else:
            print("Done but unknown error for: ", s)
            return [qid, 999, 0, 0]
    except Exception as err:
        print("Not done - unknown error: ", err)
        return [qid, 999, 0, 0]

stime = time.time()
session = HTTPConnectionPool(host='localhost', port=8989, maxsize=1)
calc_routes = []
for url, qid in url_routes:
    calc_routes.append(process_response(session, url, qid))

For some reason this is much faster (than requests.Session())

2 – Importance of Concurrency/Asynchronous Programming

First, Asynchronous programming is not multi-tasking. Asynchronous programming can be improved with more threads, however it does not rely on having multiple threads and can run on one. Asynchronous programming (‘event-driven’) is when the main programme flow resumes immediately while another function executes (I/O operations do not block any of the threads). To see how asynchronous code is different to multi-tasking, it may help to consider the below function which is not asynchronous but is multi-threaded (a general form of multi-tasking):

from threading import *
from queue import Queue
import time

def do_something_with_io_lag(in_work):
    out = in_work
    # Imagine we do some work that involves sending
    # something over the internet and processing the output
    # once it arrives
    time.sleep(0.5) # simulate IO lag
    print("Hello, bee number: ",
          str(current_thread().name).replace("Thread-",""))

class WorkerBee(Thread):
    def __init__(self, q):
        Thread.__init__(self)
        self.q = q

    def run(self):
        while True:
            # Get some work from the queue
            work_todo = self.q.get()
            # This function will simiulate I/O lag
            do_something_with_io_lag(work_todo)
            # Remove task from the queue
            self.q.task_done()

if __name__ == '__main__':
    def time_me(nmbr):
        number_of_worker_bees = nmbr
        worktodo = ['some input for work'] * 50

        # Create a queue
        q = Queue()
        # Fill with work
        [q.put(onework) for onework in worktodo]
        # Launch processes
        for _ in range(number_of_worker_bees):
            t = WorkerBee(q)
            t.start()
        # Block until queue is empty
        q.join()

    # Run this code in serial mode (just one worker)
    %time time_me(nmbr=1)
    # Wall time: 25 s
    # Basically 50 requests * 0.5 seconds IO lag
    # For me everything gets processed by bee number: 59

    # Run this code using multi-tasking (launch 50 workers)
    %time time_me(nmbr=50)
    # Wall time: 507 ms
    # Basically the 0.5 second IO lag + 0.07 seconds it took to launch them
    # Now everything gets processed by different bees

asynccode

The code above is synchronous  (and fits into ‘Fig 3’ above) because the flow of execution is paused when do_something_with_io_lag() is called. An operation is asynchronous if it is non-blocking – this means the operation returns straight away (without pausing our code). When the operation completes, an event fires and informs our programme (that the response is ready to process) – whilst that is happening we are free to run other bits of code. The way concurrency works in python is with an “event loop”, which is simply a list of functions that will be run – programming takes two forms: callbacks or futures.

  • Callback – the asynchronous function (once completed) calls the callback function with the result
  • Futures – the asynchronous function returns a promise of a future result (we can wait for the this future to be filled by either doing a ‘yield’ or by explicitly waiting)

Here is an example using requests-futures – technically we also have a callback function, however it can be removed and put into the second loop (it is only for style and hence called background_callback).

Requests-futures is basically a wrapper around concurrent.futures, which:

The concurrent.futures module provides a high-level interface for asynchronously executing callables.

The asynchronous execution can be performed with threads, using ThreadPoolExecutor, or separate processes, using ProcessPoolExecutor. Both implement the same interface, which is defined by the abstract Executor class.

We can see this if we look at the source-code of requests-futures (so requests-futures uses an asynchronous method and enhances it with more threads (Fig 4 in Example 1):

from concurrent.futures import ThreadPoolExecutor
from requests import Session
from requests.adapters import DEFAULT_POOLSIZE, HTTPAdapter

class FuturesSession(Session):

    def __init__(self, executor=None, max_workers=2, *args, **kwargs):
        super(FuturesSession, self).__init__(*args, **kwargs)
        if executor is None:
            executor = ThreadPoolExecutor(max_workers=max_workers)
            # set connection pool size equal to max_workers if needed
            if max_workers > DEFAULT_POOLSIZE:
                adapter_kwargs = dict(pool_connections=max_workers,
                                      pool_maxsize=max_workers)
                self.mount('https://', HTTPAdapter(**adapter_kwargs))
                self.mount('http://', HTTPAdapter(**adapter_kwargs))

        self.executor = executor

    def request(self, *args, **kwargs):
        func = sup = super(FuturesSession, self).request

        background_callback = kwargs.pop('background_callback', None)
        if background_callback:
            def wrap(*args_, **kwargs_):
                resp = sup(*args_, **kwargs_)
                background_callback(self, resp)
                return resp

            func = wrap

        return self.executor.submit(func, *args, **kwargs) # This returns a concurrent.futures.Future

Example D: 670 RPS

from requests_futures.sessions import FuturesSession
from concurrent.futures import as_completed

def process_response(sess, resp):
	""" This is our call-back function (once the future is filled) """
    try:
        json_geocode = resp.json()
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        out = [200, tot_time_s, tot_dist_m]
    except Exception as err:
        out = [999, 0, 0]
    resp.data = out

concurrent = 200  # How many requests to send off at one time
with FuturesSession(max_workers=concurrent) as session:
    # First: submit (concurrent) number of requests and generate a future
    futures = {}
    for i in range(len(url_routes)):
        url, qid = url_routes[i]
        # Generate a future
        future = session.get(url,
             background_callback=lambda sess, resp: process_response(sess, resp))
        futures[future] = qid

    # Second: once we have sent off our requests,
    # process the futures as soon as they available
    calc_routes = []
    for future in as_completed(futures):
        r = future.result()
        try:
            row = [futures[future]] + r.data
        except Exception as err:
            row = [futures[future], 999, 0, 0]
        calc_routes.append(row)

Here is an example using asyncIO (with aiohttp to get asynchronous http.get) which is even faster:

Example E: 1400 RPS

import aiohttp
import asyncio
concurrent = 200

def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    tot_time_s = json_geocode['paths'][0]['time']
    tot_dist_m = json_geocode['paths'][0]['distance']
    return [qid, 200, tot_time_s, tot_dist_m]

def chunked_http_client(num_chunks, s):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will work asynchronously and respect
    # locking of semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from s.get(url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
        return body, qid
    return http_get

def run_experiment(urls, _session):
    http_client = chunked_http_client(num_chunks=concurrent, s=_session)
    # http_client returns futures, save all the futures to a list
    tasks = [http_client(url, qid) for url, qid in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0]
        response.append(out)
    return response

# Run:
with aiohttp.ClientSession() as session:  # We create a persistent connection
    loop = asyncio.get_event_loop()
    calc_routes = loop.run_until_complete(run_experiment(url_routes, session))

3 – Importance of Multi-Tasking

Multi-tasking comes in the form of multi-threading or multi-processing. Threads run in the same memory space (care must be taken that threads don’t write to the same memory at the same time, i.e.. an operation should be thread-safe), while processes have separate memory (everything has to be pickled across – it’s a bit harder to share).

In cPython, threading is subject to the Global Interpretor Lock [https://wiki.python.org/moin/GlobalInterpreterLock] and thus we are restricted to only one CPU-core, however with multiprocessing we can take full advantage of multiple cores and multiple CPUs.

Similar to the example above – here is a version using multi-threading with a HTTPConnectionPool to persist 200 connections at a time:

Example F: 1500 RPS

import threading
from urllib3 import HTTPConnectionPool
from multiprocessing import Queue, JoinableQueue

class Consumer(threading.Thread):
    def __init__(self, qin, qout):
        threading.Thread.__init__(self)
        self._qin = qin
        self._qout = qout

    def run(self):
        while True:
            msg = self._qin.get()
            ul, qid = msg
            try:
				response = conn_pool.request('GET', ul)
				s = float(response.status)
				if s == 200:
					json_geocode = json.loads(response.data.decode('utf-8'))
					tot_time_s = json_geocode['paths'][0]['time']
					tot_dist_m = json_geocode['paths'][0]['distance']
					out = [qid, s, tot_time_s, tot_dist_m]
				elif s == 400:
					print("Done but no route for row: ", qid)
					out = [qid, 999, 0, 0]
				else:
					print("Done but unknown error for: ", s)
					out = [qid, 999, 0, 0]
			except Exception as err:
				print(err)
				out = [qid, 999, 0, 0]
            self._qout.put(out)
            self._qin.task_done()

num_threads = 200
# Create thread-safe connection pool
conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=num_threads)
# Create queue and fill
qout = Queue() # Collects output
qin = JoinableQueue()  # We will fill with inputs
[qin.put(url_q) for url_q in url_routes]
[Consumer(qin, qout).start() for _ in range(num_threads)]
# Block until all urls in qin are processed
qin.join()
# Get the results
calc_routes = []
while not qout.empty():
    calc_routes.append(qout.get())

Here is an example using multiprocessing which is just a tiny bit faster:

Example G: 1544 RPS

from urllib3 import HTTPConnectionPool
from multiprocessing import Process, cpu_count, Queue, JoinableQueue

class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout

    def run(self):
		# Connection pool per process
        conn_pool = HTTPConnectionPool(host=ghost, port=gport, maxsize=1)
        # Close once queue empty (otherwise process will linger)
        while not self._qin.empty():
            msg = self._qin.get()
            ul, qid = msg
            try:
                response = conn_pool.request('GET', ul)
                s = float(response.status)
                if s == 200:
                    json_geocode = json.loads(response.data.decode('utf-8'))
                    tot_time_s = json_geocode['paths'][0]['time']
                    tot_dist_m = json_geocode['paths'][0]['distance']
                    out = [qid, s, tot_time_s, tot_dist_m]
                elif s == 400:
                    # print("Done but no route for row: ", qid)
                    out = [qid, 999, 0, 0]
                else:
                    print("Done but unknown error for: ", s)
                    out = [qid, 999, 0, 0]
            except Exception as err:
                print(err)
                out = [qid, 999, 0, 0]
            # print(out)
            self._qout.put(out)
            self._qin.task_done()

if '__name__' == '__main__':
	# The above line is necessary to protect the execution

	# Create queue and fill
	qout = Queue()  # Collects output
	qin = JoinableQueue()  # We will fill with inputs
	[qin.put(url_q) for url_q in url_routes]
	# Start cpu_count number of processes
	[Worker(qin, qout).start() for _ in range(cpu_count())]
	# Block until all urls in qin are processed
	qin.join()
	# Fill routes
	calc_routes = []
	while not qout.empty():
		calc_routes.append(qout.get())

Perhaps a (much) simpler way of writing the above could be like so (using multiprocessing.Pool() to avoid creating queues and worrying about closing) – and it appears a bit faster (however I prefer the structure of the above)

Example H: 1620 RPS

from urllib3 import HTTPConnectionPool
from multiprocessing import Pool

conn_pool = None
def makePool(host, port):
    global conn_pool
    conn_pool = HTTPConnectionPool(host=host, port=port, maxsize=1)

def ReqOsrm(data):
    url, qid = data
    try:
        response = conn_pool.request('GET', url)
        json_geocode = json.loads(response.data.decode('utf-8'))
        tot_time_s = json_geocode['paths'][0]['time']
        tot_dist_m = json_geocode['paths'][0]['distance']
        return [qid, 200, tot_time_s, tot_dist_m]
    except Exception as err:
        print("Error: ", err, qid, url)
        return [qid, 999, 0, 0]

if __name__ == '__main__':
	pool = Pool(initializer=makePool, initargs=(ghost, gport))
	calc_routes = pool.map(ReqOsrm, url_routes)  # ReqOsrm is repeately called with a value from the iterable
	pool.close()
	pool.join()

At this point (since I am hosting the server too) I think it will not be possible to go above 1500 RPS (since the server has to process them!). If I get a chance I will try working on a more powerful computer (or hosting the server on a different computer)

However, just for fun – to attempt to replicate Intel hyper threading – here is an implementation that uses multi-processing and multi-threading:

Example I: 1700 RPS

from threading import Thread
from urllib3 import HTTPConnectionPool
from multiprocessing import Process, cpu_count, Queue, JoinableQueue, Event

class Consumer(Thread):
    def __init__(self, qin, qout, conn_pool):
        Thread.__init__(self)
        self.__qin = qin
        self.__qout = qout
        self.__conn_pool = conn_pool

    def run(self):
        # Close once queue empty (otherwise process will linger)
        while not self.__qin.empty():
            msg = self.__qin.get()
            ul, qid = msg
            try:
                response = self.__conn_pool.request('GET', ul)
                s = float(response.status)
                if s == 200:
                    json_geocode = json.loads(response.data.decode('utf-8'))
                    tot_time_s = json_geocode['paths'][0]['time']
                    tot_dist_m = json_geocode['paths'][0]['distance']
                    out = [qid, s, tot_time_s, tot_dist_m]
                elif s == 400:
                    #print("Done but no route for row: ", qid)
                    out = [qid, 999, 0, 0]
                else:
                    print("Done but unknown error for: ", s)
                    out = [qid, 999, 0, 0]
            except Exception as err:
                print(err)
                out = [qid, 999, 0, 0]
            #print(out)
            self.__qout.put(out)
            self.__qin.task_done()
        return


class Worker(Process):
    def __init__(self, qin, qout, *args, **kwargs):
        super(Worker, self).__init__(*args, **kwargs)
        self._qin = qin
        self._qout = qout
        self.exit = Event()

    def run(self):
        # Create thread-safe connection pool
        concurrent = 10
        with HTTPConnectionPool(host=ghost, port=gport, maxsize=concurrent) as conn_pool:
            num_threads = concurrent
            # Start threads (concurrent) per process
            for _ in range(num_threads):
                Consumer(self._qin, self._qout, conn_pool).start()
            # Block until all urls in self._qin are processed
            self._qin.join()
        return

if __name__ == '__main__':
	# Fill queue input
	qin = JoinableQueue()
        for url_q in url_routes:
	    qin.put(url_q)
	# Queue to collect output
	qout = Queue()
	# Start cpu_count number of processes (which will launch threads and sessions)
        for _ in range(cpu_count():
            Worker(qin, qout).start())
	# Block until all urls in qin are processed
	qin.join()
	# Fill routes
	calc_routes = []
	while not qout.empty():
		calc_routes.append(qout.get())

4 – Importance of adapting code to situation

The best code in this case may not be in another. For example we won’t get higher RPS with threads or async approaches unless I/O takes more time than calculations (e.g. high network latency, large responses), which is not the case here since I am hosting the server! Threads are affected by GIL and since it appears the bulk of the cost in this example is sending and parsing the requests (not the I/O bit in the middle) it looks like it makes sense that the multiprocessing code works fastest. Although threads or async libs can improve performance, running the same threaded or asynchronous code in multiple processes will give you even more performance anyway.

However, these different approaches can be applied to many different situations. For example – if we don’t have a local-server (contrary to above), and instead query something like Google Maps API where we are limited to 1000 requests in 10 seconds we will find that instead of using multi-processing we want to use the asynchronous technique in example-E and introduce a delay so that we can control how many requests we are generating a second using:

yield from asyncio.sleep(concurrent/desired_rps)

For example:

RPS  = 234 (gave 209)

goog_234

RPS = 44 (gave 42)

goog_44

RPS = 10 (gave 10)

goog_10

Here is the full code (which is basically example-E with the above line added)

concurrent = 200
desired_rps = 100 # This will define the maximum (average) requests per second

def handle_req(data, qid):
    json_geocode = json.loads(data.decode('utf-8'))
    tot_time_s = json_geocode['rows'][0]['elements'][0]['duration']['value']
    tot_dist_m = json_geocode['rows'][0]['elements'][0]['distance']['value']
    out = [qid, 200, tot_time_s, tot_dist_m]
    print(out)
    return out


def chunked_http_client(num_chunks, s):
    # Use semaphore to limit number of requests
    semaphore = asyncio.Semaphore(num_chunks)
    @asyncio.coroutine
    # Return co-routine that will work asynchronously and respect
    # locking of semaphore
    def http_get(url, qid):
        nonlocal semaphore
        with (yield from semaphore):
            response = yield from s.get(url)
            body = yield from response.content.read()
            yield from response.wait_for_close()
			# Introduce a delay to control requests-per-second
            yield from asyncio.sleep(concurrent/desired_rps)
        return body, qid
    return http_get


def run_experiment(urls, _session):
    http_client = chunked_http_client(num_chunks= concurrent, s=_session)
    # http_client returns futures, save all the futures to a list
    tasks = [http_client(url, qid) for url, qid, mode in urls]
    response = []
    # wait for futures to be ready then iterate over them
    for future in asyncio.as_completed(tasks):
        data, qid = yield from future
        try:
            out = handle_req(data, qid)
        except Exception as err:
            print("Error for {0} - {1}".format(qid,err))
            out = [qid, 999, 0, 0]
        response.append(out)
    return response
	
if __name__ == '__main__':
    # Use asynchronous methods
		with aiohttp.ClientSession() as session:  # We create a persistent connection
			loop = asyncio.get_event_loop()
			calc_routes = loop.run_until_complete(run_experiment(url_routes, session))