Using requests with Threading

New in version 0.4.0.

The toolbelt provides a simple API for using requests with threading.

A requests Session is documented as threadsafe but there are still a couple corner cases where it isn’t perfectly threadsafe. The best way to use a Session is to use one per thread.

The implementation provided by the toolbelt is naïve. This means that we use one session per thread and we make no effort to synchronize attributes (e.g., authentication, cookies, etc.). It also means that we make no attempt to direct a request to a session that has already handled a request to the same domain. In other words, if you’re making requests to multiple domains, the toolbelt’s Pool will not try to send requests to the same domain to the same thread.

This module provides three classes:

In 98% of the situations you’ll want to just use a Pool and you’ll treat a ThreadResponse as if it were a regular requests.Response.

Here’s an example:

# This example assumes Python 3
import queue
from requests_toolbelt.threaded import pool

jobs = queue.Queue()
urls = [
    # My list of URLs to get
]

for url in urls:
    queue.put({'method': 'GET', 'url': url})

p = pool.Pool(job_queue=q)
p.join_all()

for response in p.responses():
    print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
                                          response.status_code))

This is clearly a bit underwhelming. This is why there’s a short-cut class method to create a Pool from a list of URLs.

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

for response in p.responses():
    print('GET {0}. Returned {1}.'.format(response.request_kwargs['url'],
                                          response.status_code))

If one of the URLs in your list throws an exception, it will be accessible from the exceptions() generator.

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

for exc in p.exceptions():
    print('GET {0}. Raised {1}.'.format(exc.request_kwargs['url'],
                                        exc.message))

If instead, you want to retry the exceptions that have been raised you can do the following:

from requests_toolbelt.threaded import pool

urls = [
    # My list of URLs to get
]

p = pool.Pool.from_urls(urls)
p.join_all()

new_pool = pool.Pool.from_exceptions(p.exceptions())
new_pool.join_all()

Not all requests are advisable to retry without checking if they should be retried. You would normally check if you want to retry it.

The Pool object takes 4 other keyword arguments:

  • initializer

    This is a callback that will initialize things on every session created. The callback must return the session.

  • auth_generator

    This is a callback that is called after the initializer callback has modified the session. This callback must also return the session.

  • num_processes

    By passing a positive integer that indicates how many threads to use. It is None by default, and will use the result of multiproccessing.cpu_count().

  • session

    You can pass an alternative constructor or any callable that returns a requests.Sesssion like object. It will not be passed any arguments because a requests.Session does not accept any arguments.

Finally, if you don’t want to worry about Queue or Pool management, you can try the following:

from requests_toolbelt import threaded

requests = [{
    'method': 'GET',
    'url': 'https://httpbin.org/get',
    # ...
}, {
    # ...
}, {
    # ...
}]

responses_generator, exceptions_generator = threaded.map(requests)
for response in responses_generator:
   # Do something

API and Module Auto-Generated Documentation

This module provides the API for requests_toolbelt.threaded.

The module provides a clean and simple API for making requests via a thread pool. The thread pool will use sessions for increased performance.

A simple use-case is:

from requests_toolbelt import threaded

urls_to_get = [{
    'url': 'https://api.github.com/users/sigmavirus24',
    'method': 'GET',
}, {
    'url': 'https://api.github.com/repos/requests/toolbelt',
    'method': 'GET',
}, {
    'url': 'https://google.com',
    'method': 'GET',
}]
responses, errors = threaded.map(urls_to_get)

By default, the threaded submodule will detect the number of CPUs your computer has and use that if no other number of processes is selected. To change this, always use the keyword argument num_processes. Using the above example, we would expand it like so:

responses, errors = threaded.map(urls_to_get, num_processes=10)

You can also customize how a requests.Session is initialized by creating a callback function:

from requests_toolbelt import user_agent

def initialize_session(session):
    session.headers['User-Agent'] = user_agent('my-scraper', '0.1')
    session.headers['Accept'] = 'application/json'

responses, errors = threaded.map(urls_to_get,
                                 initializer=initialize_session)
requests_toolbelt.threaded.map(requests, **kwargs)

Simple interface to the threaded Pool object.

This function takes a list of dictionaries representing requests to make using Sessions in threads and returns a tuple where the first item is a generator of successful responses and the second is a generator of exceptions.

Parameters
  • requests (list) – Collection of dictionaries representing requests to make with the Pool object.

  • **kwargs – Keyword arguments that are passed to the Pool object.

Returns

Tuple of responses and exceptions from the pool

Return type

(ThreadResponse, ThreadException)

Inspiration is blatantly drawn from the standard library’s multiprocessing library. See the following references:

class requests_toolbelt.threaded.pool.Pool(job_queue, initializer=None, auth_generator=None, num_processes=None, session=<class 'requests.sessions.Session'>)

Pool that manages the threads containing sessions.

Parameters
  • queue (queue.Queue) – The queue you’re expected to use to which you should add items.

  • initializer (collections.Callable) – Function used to initialize an instance of session.

  • auth_generator (collections.Callable) – Function used to generate new auth credentials for the session.

  • num_process (int) – Number of threads to create.

  • session (requests.Session) –

exceptions()

Iterate over all the exceptions in the pool.

Returns

Generator of ThreadException

classmethod from_exceptions(exceptions, **kwargs)

Create a Pool from an ThreadExceptions.

Provided an iterable that provides ThreadException objects, this classmethod will generate a new pool to retry the requests that caused the exceptions.

Parameters
  • exceptions (iterable) – Iterable that returns ThreadException

  • kwargs – Keyword arguments passed to the Pool initializer.

Returns

An initialized Pool object.

Return type

Pool

classmethod from_urls(urls, request_kwargs=None, **kwargs)

Create a Pool from an iterable of URLs.

Parameters
  • urls (iterable) – Iterable that returns URLs with which we create a pool.

  • request_kwargs (dict) – Dictionary of other keyword arguments to provide to the request method.

  • kwargs – Keyword arguments passed to the Pool initializer.

Returns

An initialized Pool object.

Return type

Pool

get_exception()

Get an exception from the pool.

Return type

ThreadException

get_response()

Get a response from the pool.

Return type

ThreadResponse

join_all()

Join all the threads to the master thread.

responses()

Iterate over all the responses in the pool.

Returns

Generator of ThreadResponse

class requests_toolbelt.threaded.pool.ThreadResponse(request_kwargs, response)

A wrapper around a requests Response object.

This will proxy most attribute access actions to the Response object. For example, if you wanted the parsed JSON from the response, you might do:

thread_response = pool.get_response()
json = thread_response.json()
request_kwargs

The original keyword arguments provided to the queue

response

The wrapped response

class requests_toolbelt.threaded.pool.ThreadException(request_kwargs, exception)

A wrapper around an exception raised during a request.

This will proxy most attribute access actions to the exception object. For example, if you wanted the message from the exception, you might do:

thread_exc = pool.get_exception()
msg = thread_exc.message
exception

The captured and wrapped exception

request_kwargs

The original keyword arguments provided to the queue