Initial implementation of PEP 3148

This commit is contained in:
Brian Quinlan
2010-09-18 22:35:02 +00:00
parent 679e0f2328
commit 81c4d36928
8 changed files with 2217 additions and 0 deletions

View File

@@ -0,0 +1,366 @@
:mod:`concurrent.futures` --- Concurrent computation
====================================================
.. module:: concurrent.futures
:synopsis: Execute computations concurrently using threads or processes.
The :mod:`concurrent.futures` module provides a high-level interface for
asynchronously executing callables.
The asynchronous execution can be be performed by threads using
:class:`ThreadPoolExecutor` or seperate processes using
:class:`ProcessPoolExecutor`. Both implement the same interface, which is
defined by the abstract :class:`Executor` class.
Executor Objects
^^^^^^^^^^^^^^^^
:class:`Executor` is an abstract class that provides methods to execute calls
asynchronously. It should not be used directly, but through its two
subclasses: :class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`.
.. class:: Executor()
An abstract class that provides methods to execute calls asynchronously. It
should not be used directly, but through its two subclasses:
:class:`ThreadPoolExecutor` and :class:`ProcessPoolExecutor`.
.. method:: submit(fn, *args, **kwargs)
Schedules the callable to be executed as *fn*(*\*args*, *\*\*kwargs*) and
returns a :class:`Future` representing the execution of the callable.
::
with ThreadPoolExecutor(max_workers=1) as executor:
future = executor.submit(pow, 323, 1235)
print(future.result())
.. method:: map(func, *iterables, timeout=None)
Equivalent to `map(*func*, *\*iterables*)` but func is executed
asynchronously and several calls to *func* may be made concurrently. The
returned iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is
called and the result isn't available after *timeout* seconds from the
original call to :meth:`Executor.map()`. *timeout* can be an int or
float. If *timeout* is not specified or ``None`` then there is no limit
to the wait time. If a call raises an exception then that exception will
be raised when its value is retrieved from the iterator.
.. method:: shutdown(wait=True)
Signal the executor that it should free any resources that it is using
when the currently pending futures are done executing. Calls to
:meth:`Executor.submit` and :meth:`Executor.map` made after shutdown will
raise :exc:`RuntimeError`.
If *wait* is `True` then this method will not return until all the
pending futures are done executing and the resources associated with the
executor have been freed. If *wait* is `False` then this method will
return immediately and the resources associated with the executor will
be freed when all pending futures are done executing. Regardless of the
value of *wait*, the entire Python program will not exit until all
pending futures are done executing.
You can avoid having to call this method explicitly if you use the `with`
statement, which will shutdown the `Executor` (waiting as if
`Executor.shutdown` were called with *wait* set to `True`):
::
import shutil
with ThreadPoolExecutor(max_workers=4) as e:
e.submit(shutil.copy, 'src1.txt', 'dest1.txt')
e.submit(shutil.copy, 'src2.txt', 'dest2.txt')
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src3.txt', 'dest4.txt')
ThreadPoolExecutor
^^^^^^^^^^^^^^^^^^
The :class:`ThreadPoolExecutor` class is an :class:`Executor` subclass that uses
a pool of threads to execute calls asynchronously.
Deadlock can occur when the callable associated with a :class:`Future` waits on
the results of another :class:`Future`. For example:
::
import time
def wait_on_b():
time.sleep(5)
print(b.result()) # b will never complete because it is waiting on a.
return 5
def wait_on_a():
time.sleep(5)
print(a.result()) # a will never complete because it is waiting on b.
return 6
executor = ThreadPoolExecutor(max_workers=2)
a = executor.submit(wait_on_b)
b = executor.submit(wait_on_a)
And:
::
def wait_on_future():
f = executor.submit(pow, 5, 2)
# This will never complete because there is only one worker thread and
# it is executing this function.
print(f.result())
executor = ThreadPoolExecutor(max_workers=1)
executor.submit(wait_on_future)
.. class:: ThreadPoolExecutor(max_workers)
An :class:`Executor` subclass that uses a pool of at most *max_workers*
threads to execute calls asynchronously.
Deadlock can occur when the callable associated with a :class:`Future` waits
on the results of another :class:`Future`.
.. _threadpoolexecutor-example:
ThreadPoolExecutor Example
^^^^^^^^^^^^^^^^^^^^^^^^^^
::
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
def load_url(url, timeout):
return urllib.request.urlopen(url, timeout=timeout).read()
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
future_to_url = dict((executor.submit(load_url, url, 60), url)
for url in URLS)
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
if future.exception() is not None:
print('%r generated an exception: %s' % (url,
future.exception()))
else:
print('%r page is %d bytes' % (url, len(future.result())))
ProcessPoolExecutor
^^^^^^^^^^^^^^^^^^^
The :class:`ProcessPoolExecutor` class is an :class:`Executor` subclass that
uses a pool of processes to execute calls asynchronously.
:class:`ProcessPoolExecutor` uses the :mod:`multiprocessing` module, which
allows it to side-step the :term:`Global Interpreter Lock` but also means that
only picklable objects can be executed and returned.
Calling :class:`Executor` or :class:`Future` methods from a callable submitted
to a :class:`ProcessPoolExecutor` will result in deadlock.
.. class:: ProcessPoolExecutor(max_workers=None)
An :class:`Executor` subclass that executes calls asynchronously using a
pool of at most *max_workers* processes. If *max_workers* is ``None`` or
not given then as many worker processes will be created as the machine has
processors.
.. _processpoolexecutor-example:
ProcessPoolExecutor Example
^^^^^^^^^^^^^^^^^^^^^^^^^^^
::
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()
Future Objects
^^^^^^^^^^^^^^
The :class:`Future` class encapulates the asynchronous execution of a callable.
:class:`Future` instances are created by :meth:`Executor.submit`.
.. class:: Future()
Encapulates the asynchronous execution of a callable. :class:`Future`
instances are created by :meth:`Executor.submit` and should not be created
directly except for testing.
.. method:: cancel()
Attempt to cancel the call. If the call is currently being executed then
it cannot be cancelled and the method will return `False`, otherwise the
call will be cancelled and the method will return `True`.
.. method:: cancelled()
Return `True` if the call was successfully cancelled.
.. method:: running()
Return `True` if the call is currently being executed and cannot be
cancelled.
.. method:: done()
Return `True` if the call was successfully cancelled or finished running.
.. method:: result(timeout=None)
Return the value returned by the call. If the call hasn't yet completed
then this method will wait up to *timeout* seconds. If the call hasn't
completed in *timeout* seconds then a :exc:`TimeoutError` will be
raised. *timeout* can be an int or float.If *timeout* is not specified
or ``None`` then there is no limit to the wait time.
If the future is cancelled before completing then :exc:`CancelledError`
will be raised.
If the call raised then this method will raise the same exception.
.. method:: exception(timeout=None)
Return the exception raised by the call. If the call hasn't yet completed
then this method will wait up to *timeout* seconds. If the call hasn't
completed in *timeout* seconds then a :exc:`TimeoutError` will be raised.
*timeout* can be an int or float. If *timeout* is not specified or
``None`` then there is no limit to the wait time.
If the future is cancelled before completing then :exc:`CancelledError`
will be raised.
If the call completed without raising then ``None`` is returned.
.. method:: add_done_callback(fn)
Attaches the callable *fn* to the future. *fn* will be called, with the
future as its only argument, when the future is cancelled or finishes
running.
Added callables are called in the order that they were added and are
always called in a thread belonging to the process that added them. If
the callable raises an :exc:`Exception` then it will be logged and
ignored. If the callable raises another :exc:`BaseException` then the
behavior is not defined.
If the future has already completed or been cancelled then *fn* will be
called immediately.
The following :class:`Future` methods are meant for use in unit tests and
:class:`Executor` implementations.
.. method:: set_running_or_notify_cancel()
This method should only be called by :class:`Executor` implementations
before executing the work associated with the :class:`Future` and by
unit tests.
If the method returns `False` then the :class:`Future` was cancelled i.e.
:meth:`Future.cancel` was called and returned `True`. Any threads waiting
on the :class:`Future` completing (i.e. through :func:`as_completed` or
:func:`wait`) will be woken up.
If the method returns `True` then the :class:`Future` was not cancelled
and has been put in the running state i.e. calls to
:meth:`Future.running` will return `True`.
This method can only be called once and cannot be called after
:meth:`Future.set_result` or :meth:`Future.set_exception` have been
called.
.. method:: set_result(result)
Sets the result of the work associated with the :class:`Future` to
*result*.
This method should only be used by :class:`Executor` implementations and
unit tests.
.. method:: set_exception(exception)
Sets the result of the work associated with the :class:`Future` to the
:class:`Exception` *exception*.
This method should only be used by :class:`Executor` implementations and
unit tests.
Module Functions
^^^^^^^^^^^^^^^^
.. function:: wait(fs, timeout=None, return_when=ALL_COMPLETED)
Wait for the :class:`Future` instances (possibly created by different
:class:`Executor` instances) given by *fs* to complete. Returns a named
2-tuple of sets. The first set, named "done", contains the futures that
completed (finished or were cancelled) before the wait completed. The second
set, named "not_done", contains uncompleted futures.
*timeout* can be used to control the maximum number of seconds to wait before
returning. *timeout* can be an int or float. If *timeout* is not specified or
``None`` then there is no limit to the wait time.
*return_when* indicates when this function should return. It must be one of
the following constants:
+-----------------------------+----------------------------------------+
| Constant | Description |
+=============================+========================================+
| :const:`FIRST_COMPLETED` | The function will return when any |
| | future finishes or is cancelled. |
+-----------------------------+----------------------------------------+
| :const:`FIRST_EXCEPTION` | The function will return when any |
| | future finishes by raising an |
| | exception. If no future raises an |
| | exception then it is equivalent to |
| | `ALL_COMPLETED`. |
+-----------------------------+----------------------------------------+
| :const:`ALL_COMPLETED` | The function will return when all |
| | futures finish or are cancelled. |
+-----------------------------+----------------------------------------+
.. function:: as_completed(fs, timeout=None)
Returns an iterator over the :class:`Future` instances (possibly created
by different :class:`Executor` instances) given by *fs* that yields futures
as they complete (finished or were cancelled). Any futures that completed
before :func:`as_completed()` was called will be yielded first. The returned
iterator raises a :exc:`TimeoutError` if :meth:`__next__()` is called and
the result isn't available after *timeout* seconds from the original call
to :func:`as_completed()`. *timeout* can be an int or float. If *timeout*
is not specified or ``None`` then there is no limit to the wait time.

View File

@@ -17,6 +17,7 @@ some other systems as well (e.g. Windows). Here's an overview:
dummy_threading.rst
_thread.rst
_dummy_thread.rst
concurrent.futures.rst
multiprocessing.rst
mmap.rst
readline.rst

View File

@@ -0,0 +1 @@
# This directory is a Python package.

View File

@@ -0,0 +1,18 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Execute computations asynchronously using threads or processes."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
from concurrent.futures._base import (FIRST_COMPLETED,
FIRST_EXCEPTION,
ALL_COMPLETED,
CancelledError,
TimeoutError,
Future,
Executor,
wait,
as_completed)
from concurrent.futures.process import ProcessPoolExecutor
from concurrent.futures.thread import ThreadPoolExecutor

View File

@@ -0,0 +1,541 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import collections
import functools
import logging
import threading
import time
FIRST_COMPLETED = 'FIRST_COMPLETED'
FIRST_EXCEPTION = 'FIRST_EXCEPTION'
ALL_COMPLETED = 'ALL_COMPLETED'
# Possible future states (for internal use by the futures package).
PENDING = 'PENDING'
RUNNING = 'RUNNING'
# The future was cancelled by the user...
CANCELLED = 'CANCELLED'
# ...and _Waiter.add_cancelled() was called by a worker.
CANCELLED_AND_NOTIFIED = 'CANCELLED_AND_NOTIFIED'
FINISHED = 'FINISHED'
_FUTURE_STATES = [
PENDING,
RUNNING,
CANCELLED,
CANCELLED_AND_NOTIFIED,
FINISHED
]
_STATE_TO_DESCRIPTION_MAP = {
PENDING: "pending",
RUNNING: "running",
CANCELLED: "cancelled",
CANCELLED_AND_NOTIFIED: "cancelled",
FINISHED: "finished"
}
# Logger for internal use by the futures package.
LOGGER = logging.getLogger("concurrent.futures")
_handler = logging.StreamHandler()
LOGGER.addHandler(_handler)
del _handler
class Error(Exception):
"""Base class for all future-related exceptions."""
pass
class CancelledError(Error):
"""The Future was cancelled."""
pass
class TimeoutError(Error):
"""The operation exceeded the given deadline."""
pass
class _Waiter(object):
"""Provides the event that wait() and as_completed() block on."""
def __init__(self):
self.event = threading.Event()
self.finished_futures = []
def add_result(self, future):
self.finished_futures.append(future)
def add_exception(self, future):
self.finished_futures.append(future)
def add_cancelled(self, future):
self.finished_futures.append(future)
class _FirstCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_COMPLETED) and as_completed()."""
def add_result(self, future):
super().add_result(future)
self.event.set()
def add_exception(self, future):
super().add_exception(future)
self.event.set()
def add_cancelled(self, future):
super().add_cancelled(future)
self.event.set()
class _AllCompletedWaiter(_Waiter):
"""Used by wait(return_when=FIRST_EXCEPTION and ALL_COMPLETED)."""
def __init__(self, num_pending_calls, stop_on_exception):
self.num_pending_calls = num_pending_calls
self.stop_on_exception = stop_on_exception
super().__init__()
def _decrement_pending_calls(self):
self.num_pending_calls -= 1
if not self.num_pending_calls:
self.event.set()
def add_result(self, future):
super().add_result(future)
self._decrement_pending_calls()
def add_exception(self, future):
super().add_exception(future)
if self.stop_on_exception:
self.event.set()
else:
self._decrement_pending_calls()
def add_cancelled(self, future):
super().add_cancelled(future)
self._decrement_pending_calls()
class _AcquireFutures(object):
"""A context manager that does an ordered acquire of Future conditions."""
def __init__(self, futures):
self.futures = sorted(futures, key=id)
def __enter__(self):
for future in self.futures:
future._condition.acquire()
def __exit__(self, *args):
for future in self.futures:
future._condition.release()
def _create_and_install_waiters(fs, return_when):
if return_when == FIRST_COMPLETED:
waiter = _FirstCompletedWaiter()
else:
pending_count = sum(
f._state not in [CANCELLED_AND_NOTIFIED, FINISHED] for f in fs)
if return_when == FIRST_EXCEPTION:
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=True)
elif return_when == ALL_COMPLETED:
waiter = _AllCompletedWaiter(pending_count, stop_on_exception=False)
else:
raise ValueError("Invalid return condition: %r" % return_when)
for f in fs:
f._waiters.append(waiter)
return waiter
def as_completed(fs, timeout=None):
"""An iterator over the given futures that yields each as it completes.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
iterate over.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator that yields the given Futures as they complete (finished or
cancelled).
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
"""
if timeout is not None:
end_time = timeout + time.time()
with _AcquireFutures(fs):
finished = set(
f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
pending = set(fs) - finished
waiter = _create_and_install_waiters(fs, FIRST_COMPLETED)
try:
for future in finished:
yield future
while pending:
if timeout is None:
wait_timeout = None
else:
wait_timeout = end_time - time.time()
if wait_timeout < 0:
raise TimeoutError(
'%d (of %d) futures unfinished' % (
len(pending), len(fs)))
waiter.event.wait(timeout)
for future in waiter.finished_futures[:]:
yield future
waiter.finished_futures.remove(future)
pending.remove(future)
finally:
for f in fs:
f._waiters.remove(waiter)
DoneAndNotDoneFutures = collections.namedtuple(
'DoneAndNotDoneFutures', 'done not_done')
def wait(fs, timeout=None, return_when=ALL_COMPLETED):
"""Wait for the futures in the given sequence to complete.
Args:
fs: The sequence of Futures (possibly created by different Executors) to
wait upon.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
return_when: Indicates when this function should return. The options
are:
FIRST_COMPLETED - Return when any future finishes or is
cancelled.
FIRST_EXCEPTION - Return when any future finishes by raising an
exception. If no future raises an exception
then it is equivalent to ALL_COMPLETED.
ALL_COMPLETED - Return when all futures finish or are cancelled.
Returns:
A named 2-tuple of sets. The first set, named 'done', contains the
futures that completed (is finished or cancelled) before the wait
completed. The second set, named 'not_done', contains uncompleted
futures.
"""
with _AcquireFutures(fs):
done = set(f for f in fs
if f._state in [CANCELLED_AND_NOTIFIED, FINISHED])
not_done = set(fs) - done
if (return_when == FIRST_COMPLETED) and done:
return DoneAndNotDoneFutures(done, not_done)
elif (return_when == FIRST_EXCEPTION) and done:
if any(f for f in done
if not f.cancelled() and f.exception() is not None):
return DoneAndNotDoneFutures(done, not_done)
if len(done) == len(fs):
return DoneAndNotDoneFutures(done, not_done)
waiter = _create_and_install_waiters(fs, return_when)
waiter.event.wait(timeout)
for f in fs:
f._waiters.remove(waiter)
done.update(waiter.finished_futures)
return DoneAndNotDoneFutures(done, set(fs) - done)
class Future(object):
"""Represents the result of an asynchronous computation."""
def __init__(self):
"""Initializes the future. Should not be called by clients."""
self._condition = threading.Condition()
self._state = PENDING
self._result = None
self._exception = None
self._waiters = []
self._done_callbacks = []
def _invoke_callbacks(self):
for callback in self._done_callbacks:
try:
callback(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
def __repr__(self):
with self._condition:
if self._state == FINISHED:
if self._exception:
return '<Future at %s state=%s raised %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._exception.__class__.__name__)
else:
return '<Future at %s state=%s returned %s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state],
self._result.__class__.__name__)
return '<Future at %s state=%s>' % (
hex(id(self)),
_STATE_TO_DESCRIPTION_MAP[self._state])
def cancel(self):
"""Cancel the future if possible.
Returns True if the future was cancelled, False otherwise. A future
cannot be cancelled if it is running or has already completed.
"""
with self._condition:
if self._state in [RUNNING, FINISHED]:
return False
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
return True
self._state = CANCELLED
self._condition.notify_all()
self._invoke_callbacks()
return True
def cancelled(self):
"""Return True if the future has cancelled."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]
def running(self):
"""Return True if the future is currently executing."""
with self._condition:
return self._state == RUNNING
def done(self):
"""Return True of the future was cancelled or finished executing."""
with self._condition:
return self._state in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]
def __get_result(self):
if self._exception:
raise self._exception
else:
return self._result
def add_done_callback(self, fn):
"""Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
fn(self)
def result(self, timeout=None):
"""Return the result of the call that the future represents.
Args:
timeout: The number of seconds to wait for the result if the future
isn't done. If None, then there is no limit on the wait time.
Returns:
The result of the call that the future represents.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
Exception: If the call raised then that exception will be raised.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self.__get_result()
else:
raise TimeoutError()
def exception(self, timeout=None):
"""Return the exception raised by the call that the future represents.
Args:
timeout: The number of seconds to wait for the exception if the
future isn't done. If None, then there is no limit on the wait
time.
Returns:
The exception raised by the call that the future represents or None
if the call completed without raising.
Raises:
CancelledError: If the future was cancelled.
TimeoutError: If the future didn't finish executing before the given
timeout.
"""
with self._condition:
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
self._condition.wait(timeout)
if self._state in [CANCELLED, CANCELLED_AND_NOTIFIED]:
raise CancelledError()
elif self._state == FINISHED:
return self._exception
else:
raise TimeoutError()
# The following methods should only be used by Executors and in tests.
def set_running_or_notify_cancel(self):
"""Mark the future as running or process any cancel notifications.
Should only be used by Executor implementations and unit tests.
If the future has been cancelled (cancel() was called and returned
True) then any threads waiting on the future completing (though calls
to as_completed() or wait()) are notified and False is returned.
If the future was not cancelled then it is put in the running state
(future calls to running() will return True) and True is returned.
This method should be called by Executor implementations before
executing the work associated with this future. If this method returns
False then the work should not be executed.
Returns:
False if the Future was cancelled, True otherwise.
Raises:
RuntimeError: if this method was already called or if set_result()
or set_exception() was called.
"""
with self._condition:
if self._state == CANCELLED:
self._state = CANCELLED_AND_NOTIFIED
for waiter in self._waiters:
waiter.add_cancelled(self)
# self._condition.notify_all() is not necessary because
# self.cancel() triggers a notification.
return False
elif self._state == PENDING:
self._state = RUNNING
return True
else:
LOGGER.critical('Future %s in unexpected state: %s',
id(self.future),
self.future._state)
raise RuntimeError('Future in unexpected state')
def set_result(self, result):
"""Sets the return value of work associated with the future.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
self._result = result
self._state = FINISHED
for waiter in self._waiters:
waiter.add_result(self)
self._condition.notify_all()
self._invoke_callbacks()
def set_exception(self, exception):
"""Sets the result of the future as being the given exception.
Should only be used by Executor implementations and unit tests.
"""
with self._condition:
self._exception = exception
self._state = FINISHED
for waiter in self._waiters:
waiter.add_exception(self)
self._condition.notify_all()
self._invoke_callbacks()
class Executor(object):
"""This is an abstract base class for concrete asynchronous executors."""
def submit(self, fn, *args, **kwargs):
"""Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
raise NotImplementedError()
def map(self, fn, *iterables, timeout=None):
"""Returns a iterator equivalent to map(fn, iter).
Args:
fn: A callable that will take take as many arguments as there are
passed iterables.
timeout: The maximum number of seconds to wait. If None, then there
is no limit on the wait time.
Returns:
An iterator equivalent to: map(func, *iterables) but the calls may
be evaluated out-of-order.
Raises:
TimeoutError: If the entire result iterator could not be generated
before the given timeout.
Exception: If fn(*args) raises for any values.
"""
if timeout is not None:
end_time = timeout + time.time()
fs = [self.submit(fn, *args) for args in zip(*iterables)]
try:
for future in fs:
if timeout is None:
yield future.result()
else:
yield future.result(end_time - time.time())
finally:
for future in fs:
future.cancel()
def shutdown(self, wait=True):
"""Clean-up the resources associated with the Executor.
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
Args:
wait: If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
"""
pass
def __enter__(self):
return self
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False

View File

@@ -0,0 +1,337 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ProcessPoolExecutor.
The follow diagram and text describe the data-flow through the system:
|======================= In-process =====================|== Out-of-process ==|
+----------+ +----------+ +--------+ +-----------+ +---------+
| | => | Work Ids | => | | => | Call Q | => | |
| | +----------+ | | +-----------+ | |
| | | ... | | | | ... | | |
| | | 6 | | | | 5, call() | | |
| | | 7 | | | | ... | | |
| Process | | ... | | Local | +-----------+ | Process |
| Pool | +----------+ | Worker | | #1..n |
| Executor | | Thread | | |
| | +----------- + | | +-----------+ | |
| | <=> | Work Items | <=> | | <= | Result Q | <= | |
| | +------------+ | | +-----------+ | |
| | | 6: call() | | | | ... | | |
| | | future | | | | 4, result | | |
| | | ... | | | | 3, except | | |
+----------+ +------------+ +--------+ +-----------+ +---------+
Executor.submit() called:
- creates a uniquely numbered _WorkItem and adds it to the "Work Items" dict
- adds the id of the _WorkItem to the "Work Ids" queue
Local worker thread:
- reads work ids from the "Work Ids" queue and looks up the corresponding
WorkItem from the "Work Items" dict: if the work item has been cancelled then
it is simply removed from the dict, otherwise it is repackaged as a
_CallItem and put in the "Call Q". New _CallItems are put in the "Call Q"
until "Call Q" is full. NOTE: the size of the "Call Q" is kept small because
calls placed in the "Call Q" can no longer be cancelled with Future.cancel().
- reads _ResultItems from "Result Q", updates the future stored in the
"Work Items" dict and deletes the dict entry
Process #1..n:
- reads _CallItems from "Call Q", executes the calls, and puts the resulting
_ResultItems in "Request Q"
"""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
from concurrent.futures import _base
import queue
import multiprocessing
import threading
import weakref
# Workers are created as daemon threads and processes. This is done to allow the
# interpreter to exit when there are still idle processes in a
# ProcessPoolExecutor's process pool (i.e. shutdown() was not called). However,
# allowing workers to die with the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads/processes finish.
_thread_references = set()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
for thread_reference in _thread_references:
thread = thread_reference()
if thread is not None:
thread.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
>>> ... t = ThreadPoolExecutor(max_workers=5)
>>> ... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
# Controls how many more calls than processes will be queued in the call queue.
# A smaller number will mean that processes spend more time idle waiting for
# work while a larger number will make Future.cancel() succeed less frequently
# (Futures in the call queue cannot be cancelled).
EXTRA_QUEUED_CALLS = 1
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
class _ResultItem(object):
def __init__(self, work_id, exception=None, result=None):
self.work_id = work_id
self.exception = exception
self.result = result
class _CallItem(object):
def __init__(self, work_id, fn, args, kwargs):
self.work_id = work_id
self.fn = fn
self.args = args
self.kwargs = kwargs
def _process_worker(call_queue, result_queue, shutdown):
"""Evaluates calls from call_queue and places the results in result_queue.
This worker is run in a seperate process.
Args:
call_queue: A multiprocessing.Queue of _CallItems that will be read and
evaluated by the worker.
result_queue: A multiprocessing.Queue of _ResultItems that will written
to by the worker.
shutdown: A multiprocessing.Event that will be set as a signal to the
worker that it should exit when call_queue is empty.
"""
while True:
try:
call_item = call_queue.get(block=True, timeout=0.1)
except queue.Empty:
if shutdown.is_set():
return
else:
try:
r = call_item.fn(*call_item.args, **call_item.kwargs)
except BaseException as e:
result_queue.put(_ResultItem(call_item.work_id,
exception=e))
else:
result_queue.put(_ResultItem(call_item.work_id,
result=r))
def _add_call_item_to_queue(pending_work_items,
work_ids,
call_queue):
"""Fills call_queue with _WorkItems from pending_work_items.
This function never blocks.
Args:
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids: A queue.Queue of work ids e.g. Queue([5, 6, ...]). Work ids
are consumed and the corresponding _WorkItems from
pending_work_items are transformed into _CallItems and put in
call_queue.
call_queue: A multiprocessing.Queue that will be filled with _CallItems
derived from _WorkItems.
"""
while True:
if call_queue.full():
return
try:
work_id = work_ids.get(block=False)
except queue.Empty:
return
else:
work_item = pending_work_items[work_id]
if work_item.future.set_running_or_notify_cancel():
call_queue.put(_CallItem(work_id,
work_item.fn,
work_item.args,
work_item.kwargs),
block=True)
else:
del pending_work_items[work_id]
continue
def _queue_manangement_worker(executor_reference,
processes,
pending_work_items,
work_ids_queue,
call_queue,
result_queue,
shutdown_process_event):
"""Manages the communication between this process and the worker processes.
This function is run in a local thread.
Args:
executor_reference: A weakref.ref to the ProcessPoolExecutor that owns
this thread. Used to determine if the ProcessPoolExecutor has been
garbage collected and that this function can exit.
process: A list of the multiprocessing.Process instances used as
workers.
pending_work_items: A dict mapping work ids to _WorkItems e.g.
{5: <_WorkItem...>, 6: <_WorkItem...>, ...}
work_ids_queue: A queue.Queue of work ids e.g. Queue([5, 6, ...]).
call_queue: A multiprocessing.Queue that will be filled with _CallItems
derived from _WorkItems for processing by the process workers.
result_queue: A multiprocessing.Queue of _ResultItems generated by the
process workers.
shutdown_process_event: A multiprocessing.Event used to signal the
process workers that they should exit when their work queue is
empty.
"""
while True:
_add_call_item_to_queue(pending_work_items,
work_ids_queue,
call_queue)
try:
result_item = result_queue.get(block=True, timeout=0.1)
except queue.Empty:
executor = executor_reference()
# No more work items can be added if:
# - The interpreter is shutting down OR
# - The executor that owns this worker has been collected OR
# - The executor that owns this worker has been shutdown.
if _shutdown or executor is None or executor._shutdown_thread:
# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
shutdown_process_event.set()
# If .join() is not called on the created processes then
# some multiprocessing.Queue methods may deadlock on Mac OS
# X.
for p in processes:
p.join()
return
del executor
else:
work_item = pending_work_items[result_item.work_id]
del pending_work_items[result_item.work_id]
if result_item.exception:
work_item.future.set_exception(result_item.exception)
else:
work_item.future.set_result(result_item.result)
class ProcessPoolExecutor(_base.Executor):
def __init__(self, max_workers=None):
"""Initializes a new ProcessPoolExecutor instance.
Args:
max_workers: The maximum number of processes that can be used to
execute the given calls. If None or not given then as many
worker processes will be created as the machine has processors.
"""
_remove_dead_thread_references()
if max_workers is None:
self._max_workers = multiprocessing.cpu_count()
else:
self._max_workers = max_workers
# Make the call queue slightly larger than the number of processes to
# prevent the worker processes from idling. But don't make it too big
# because futures in the call queue cannot be cancelled.
self._call_queue = multiprocessing.Queue(self._max_workers +
EXTRA_QUEUED_CALLS)
self._result_queue = multiprocessing.Queue()
self._work_ids = queue.Queue()
self._queue_management_thread = None
self._processes = set()
# Shutdown is a two-step process.
self._shutdown_thread = False
self._shutdown_process_event = multiprocessing.Event()
self._shutdown_lock = threading.Lock()
self._queue_count = 0
self._pending_work_items = {}
def _start_queue_management_thread(self):
if self._queue_management_thread is None:
self._queue_management_thread = threading.Thread(
target=_queue_manangement_worker,
args=(weakref.ref(self),
self._processes,
self._pending_work_items,
self._work_ids,
self._call_queue,
self._result_queue,
self._shutdown_process_event))
self._queue_management_thread.daemon = True
self._queue_management_thread.start()
_thread_references.add(weakref.ref(self._queue_management_thread))
def _adjust_process_count(self):
for _ in range(len(self._processes), self._max_workers):
p = multiprocessing.Process(
target=_process_worker,
args=(self._call_queue,
self._result_queue,
self._shutdown_process_event))
p.start()
self._processes.add(p)
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown_thread:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._pending_work_items[self._queue_count] = w
self._work_ids.put(self._queue_count)
self._queue_count += 1
self._start_queue_management_thread()
self._adjust_process_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown_thread = True
if wait:
if self._queue_management_thread:
self._queue_management_thread.join()
# To reduce the risk of openning too many files, remove references to
# objects that use file descriptors.
self._queue_management_thread = None
self._call_queue = None
self._result_queue = None
self._shutdown_process_event = None
self._processes = None
shutdown.__doc__ = _base.Executor.shutdown.__doc__
atexit.register(_python_exit)

View File

@@ -0,0 +1,136 @@
# Copyright 2009 Brian Quinlan. All Rights Reserved.
# Licensed to PSF under a Contributor Agreement.
"""Implements ThreadPoolExecutor."""
__author__ = 'Brian Quinlan (brian@sweetapp.com)'
import atexit
from concurrent.futures import _base
import queue
import threading
import weakref
# Workers are created as daemon threads. This is done to allow the interpreter
# to exit when there are still idle threads in a ThreadPoolExecutor's thread
# pool (i.e. shutdown() was not called). However, allowing workers to die with
# the interpreter has two undesirable properties:
# - The workers would still be running during interpretor shutdown,
# meaning that they would fail in unpredictable ways.
# - The workers could be killed while evaluating a work item, which could
# be bad if the callable being evaluated has external side-effects e.g.
# writing to a file.
#
# To work around this problem, an exit handler is installed which tells the
# workers to exit when their work queues are empty and then waits until the
# threads finish.
_thread_references = set()
_shutdown = False
def _python_exit():
global _shutdown
_shutdown = True
for thread_reference in _thread_references:
thread = thread_reference()
if thread is not None:
thread.join()
def _remove_dead_thread_references():
"""Remove inactive threads from _thread_references.
Should be called periodically to prevent memory leaks in scenarios such as:
>>> while True:
... t = ThreadPoolExecutor(max_workers=5)
... t.map(int, ['1', '2', '3', '4', '5'])
"""
for thread_reference in set(_thread_references):
if thread_reference() is None:
_thread_references.discard(thread_reference)
atexit.register(_python_exit)
class _WorkItem(object):
def __init__(self, future, fn, args, kwargs):
self.future = future
self.fn = fn
self.args = args
self.kwargs = kwargs
def run(self):
if not self.future.set_running_or_notify_cancel():
return
try:
result = self.fn(*self.args, **self.kwargs)
except BaseException as e:
self.future.set_exception(e)
else:
self.future.set_result(result)
def _worker(executor_reference, work_queue):
try:
while True:
try:
work_item = work_queue.get(block=True, timeout=0.1)
except queue.Empty:
executor = executor_reference()
# Exit if:
# - The interpreter is shutting down OR
# - The executor that owns the worker has been collected OR
# - The executor that owns the worker has been shutdown.
if _shutdown or executor is None or executor._shutdown:
return
del executor
else:
work_item.run()
except BaseException as e:
_base.LOGGER.critical('Exception in worker', exc_info=True)
class ThreadPoolExecutor(_base.Executor):
def __init__(self, max_workers):
"""Initializes a new ThreadPoolExecutor instance.
Args:
max_workers: The maximum number of threads that can be used to
execute the given calls.
"""
_remove_dead_thread_references()
self._max_workers = max_workers
self._work_queue = queue.Queue()
self._threads = set()
self._shutdown = False
self._shutdown_lock = threading.Lock()
def submit(self, fn, *args, **kwargs):
with self._shutdown_lock:
if self._shutdown:
raise RuntimeError('cannot schedule new futures after shutdown')
f = _base.Future()
w = _WorkItem(f, fn, args, kwargs)
self._work_queue.put(w)
self._adjust_thread_count()
return f
submit.__doc__ = _base.Executor.submit.__doc__
def _adjust_thread_count(self):
# TODO(bquinlan): Should avoid creating new threads if there are more
# idle threads than items in the work queue.
if len(self._threads) < self._max_workers:
t = threading.Thread(target=_worker,
args=(weakref.ref(self), self._work_queue))
t.daemon = True
t.start()
self._threads.add(t)
_thread_references.add(weakref.ref(t))
def shutdown(self, wait=True):
with self._shutdown_lock:
self._shutdown = True
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__

View File

@@ -0,0 +1,817 @@
import test.support
# Skip tests if _multiprocessing wasn't built.
test.support.import_module('_multiprocessing')
# Skip tests if sem_open implementation is broken.
test.support.import_module('multiprocessing.synchronize')
# import threading after _multiprocessing to raise a more revelant error
# message: "No module named _multiprocessing". _multiprocessing is not compiled
# without thread support.
test.support.import_module('threading')
import multiprocessing
import sys
import threading
import time
import unittest
if sys.platform.startswith('win'):
import ctypes
import ctypes.wintypes
from concurrent import futures
from concurrent.futures._base import (
PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, wait)
import concurrent.futures.process
def create_future(state=PENDING, exception=None, result=None):
f = Future()
f._state = state
f._exception = exception
f._result = result
return f
PENDING_FUTURE = create_future(state=PENDING)
RUNNING_FUTURE = create_future(state=RUNNING)
CANCELLED_FUTURE = create_future(state=CANCELLED)
CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
EXCEPTION_FUTURE = create_future(state=FINISHED, exception=IOError())
SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
def mul(x, y):
return x * y
class Call(object):
"""A call that can be submitted to a future.Executor for testing.
The call signals when it is called and waits for an event before finishing.
"""
CALL_LOCKS = {}
def _create_event(self):
if sys.platform.startswith('win'):
class SECURITY_ATTRIBUTES(ctypes.Structure):
_fields_ = [("nLength", ctypes.wintypes.DWORD),
("lpSecurityDescriptor", ctypes.wintypes.LPVOID),
("bInheritHandle", ctypes.wintypes.BOOL)]
s = SECURITY_ATTRIBUTES()
s.nLength = ctypes.sizeof(s)
s.lpSecurityDescriptor = None
s.bInheritHandle = True
handle = ctypes.windll.kernel32.CreateEventA(ctypes.pointer(s),
True,
False,
None)
assert handle is not None
return handle
else:
event = multiprocessing.Event()
self.CALL_LOCKS[id(event)] = event
return id(event)
def _wait_on_event(self, handle):
if sys.platform.startswith('win'):
r = ctypes.windll.kernel32.WaitForSingleObject(handle, 5 * 1000)
assert r == 0
else:
self.CALL_LOCKS[handle].wait()
def _signal_event(self, handle):
if sys.platform.startswith('win'):
r = ctypes.windll.kernel32.SetEvent(handle)
assert r != 0
else:
self.CALL_LOCKS[handle].set()
def __init__(self, manual_finish=False, result=42):
self._called_event = self._create_event()
self._can_finish = self._create_event()
self._result = result
if not manual_finish:
self._signal_event(self._can_finish)
def wait_on_called(self):
self._wait_on_event(self._called_event)
def set_can(self):
self._signal_event(self._can_finish)
def __call__(self):
self._signal_event(self._called_event)
self._wait_on_event(self._can_finish)
return self._result
def close(self):
self.set_can()
if sys.platform.startswith('win'):
ctypes.windll.kernel32.CloseHandle(self._called_event)
ctypes.windll.kernel32.CloseHandle(self._can_finish)
else:
del self.CALL_LOCKS[self._called_event]
del self.CALL_LOCKS[self._can_finish]
class ExceptionCall(Call):
def __call__(self):
self._signal_event(self._called_event)
self._wait_on_event(self._can_finish)
raise ZeroDivisionError()
class MapCall(Call):
def __init__(self, result=42):
super().__init__(manual_finish=True, result=result)
def __call__(self, manual_finish):
if manual_finish:
super().__call__()
return self._result
class ExecutorShutdownTest(unittest.TestCase):
def test_run_after_shutdown(self):
self.executor.shutdown()
self.assertRaises(RuntimeError,
self.executor.submit,
pow, 2, 5)
def _start_some_futures(self):
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
call3 = Call(manual_finish=True)
try:
self.executor.submit(call1)
self.executor.submit(call2)
self.executor.submit(call3)
call1.wait_on_called()
call2.wait_on_called()
call3.wait_on_called()
call1.set_can()
call2.set_can()
call3.set_can()
finally:
call1.close()
call2.close()
call3.close()
class ThreadPoolShutdownTest(ExecutorShutdownTest):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=5)
def tearDown(self):
self.executor.shutdown(wait=True)
def test_threads_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._threads), 3)
self.executor.shutdown()
for t in self.executor._threads:
t.join()
def test_context_manager_shutdown(self):
with futures.ThreadPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for t in executor._threads:
t.join()
def test_del_shutdown(self):
executor = futures.ThreadPoolExecutor(max_workers=5)
executor.map(abs, range(-5, 5))
threads = executor._threads
del executor
for t in threads:
t.join()
class ProcessPoolShutdownTest(ExecutorShutdownTest):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=5)
def tearDown(self):
self.executor.shutdown(wait=True)
def test_processes_terminate(self):
self._start_some_futures()
self.assertEqual(len(self.executor._processes), 5)
processes = self.executor._processes
self.executor.shutdown()
for p in processes:
p.join()
def test_context_manager_shutdown(self):
with futures.ProcessPoolExecutor(max_workers=5) as e:
executor = e
self.assertEqual(list(e.map(abs, range(-5, 5))),
[5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
for p in self.executor._processes:
p.join()
def test_del_shutdown(self):
executor = futures.ProcessPoolExecutor(max_workers=5)
list(executor.map(abs, range(-5, 5)))
queue_management_thread = executor._queue_management_thread
processes = executor._processes
del executor
queue_management_thread.join()
for p in processes:
p.join()
class WaitTests(unittest.TestCase):
def test_first_completed(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
done, not_done = futures.wait(
[CANCELLED_FUTURE, future1, future2],
return_when=futures.FIRST_COMPLETED)
self.assertEquals(set([future1]), done)
self.assertEquals(set([CANCELLED_FUTURE, future2]), not_done)
finally:
call1.close()
call2.close()
def test_first_completed_one_already_completed(self):
call1 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE, future1],
return_when=futures.FIRST_COMPLETED)
self.assertEquals(set([SUCCESSFUL_FUTURE]), finished)
self.assertEquals(set([future1]), pending)
finally:
call1.close()
def test_first_exception(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call2.set_can()
call1 = Call(manual_finish=True)
call2 = ExceptionCall(manual_finish=True)
call3 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
future3 = self.executor.submit(call3)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[future1, future2, future3],
return_when=futures.FIRST_EXCEPTION)
self.assertEquals(set([future1, future2]), finished)
self.assertEquals(set([future3]), pending)
finally:
call1.close()
call2.close()
call3.close()
def test_first_exception_some_already_complete(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call1 = ExceptionCall(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2],
return_when=futures.FIRST_EXCEPTION)
self.assertEquals(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1]), finished)
self.assertEquals(set([CANCELLED_FUTURE, future2]), pending)
finally:
call1.close()
call2.close()
def test_first_exception_one_already_failed(self):
call1 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
finished, pending = futures.wait(
[EXCEPTION_FUTURE, future1],
return_when=futures.FIRST_EXCEPTION)
self.assertEquals(set([EXCEPTION_FUTURE]), finished)
self.assertEquals(set([future1]), pending)
finally:
call1.close()
def test_all_completed(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call2.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[future1, future2],
return_when=futures.ALL_COMPLETED)
self.assertEquals(set([future1, future2]), finished)
self.assertEquals(set(), pending)
finally:
call1.close()
call2.close()
def test_all_completed_some_already_completed(self):
def wait_test():
while not future1._waiters:
pass
future4.cancel()
call1.set_can()
call2.set_can()
call3.set_can()
self.assertLessEqual(
futures.process.EXTRA_QUEUED_CALLS,
1,
'this test assumes that future4 will be cancelled before it is '
'queued to run - which might not be the case if '
'ProcessPoolExecutor is too aggresive in scheduling futures')
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
call3 = Call(manual_finish=True)
call4 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
future3 = self.executor.submit(call3)
future4 = self.executor.submit(call4)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2, future3, future4],
return_when=futures.ALL_COMPLETED)
self.assertEquals(set([SUCCESSFUL_FUTURE,
CANCELLED_AND_NOTIFIED_FUTURE,
future1, future2, future3, future4]),
finished)
self.assertEquals(set(), pending)
finally:
call1.close()
call2.close()
call3.close()
call4.close()
def test_timeout(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
finished, pending = futures.wait(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2],
timeout=1,
return_when=futures.ALL_COMPLETED)
self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1]), finished)
self.assertEquals(set([future2]), pending)
finally:
call1.close()
call2.close()
class ThreadPoolWaitTests(WaitTests):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ProcessPoolWaitTests(WaitTests):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class AsCompletedTests(unittest.TestCase):
# TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
def test_no_timeout(self):
def wait_test():
while not future1._waiters:
pass
call1.set_can()
call2.set_can()
call1 = Call(manual_finish=True)
call2 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
future2 = self.executor.submit(call2)
t = threading.Thread(target=wait_test)
t.start()
completed = set(futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]))
self.assertEquals(set(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1, future2]),
completed)
finally:
call1.close()
call2.close()
def test_zero_timeout(self):
call1 = Call(manual_finish=True)
try:
future1 = self.executor.submit(call1)
completed_futures = set()
try:
for future in futures.as_completed(
[CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE,
future1],
timeout=0):
completed_futures.add(future)
except futures.TimeoutError:
pass
self.assertEquals(set([CANCELLED_AND_NOTIFIED_FUTURE,
EXCEPTION_FUTURE,
SUCCESSFUL_FUTURE]),
completed_futures)
finally:
call1.close()
class ThreadPoolAsCompletedTests(AsCompletedTests):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ProcessPoolAsCompletedTests(AsCompletedTests):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ExecutorTest(unittest.TestCase):
# Executor.shutdown() and context manager usage is tested by
# ExecutorShutdownTest.
def test_submit(self):
future = self.executor.submit(pow, 2, 8)
self.assertEquals(256, future.result())
def test_submit_keyword(self):
future = self.executor.submit(mul, 2, y=8)
self.assertEquals(16, future.result())
def test_map(self):
self.assertEqual(
list(self.executor.map(pow, range(10), range(10))),
list(map(pow, range(10), range(10))))
def test_map_exception(self):
i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
self.assertEqual(i.__next__(), (0, 1))
self.assertEqual(i.__next__(), (0, 1))
self.assertRaises(ZeroDivisionError, i.__next__)
def test_map_timeout(self):
results = []
timeout_call = MapCall()
try:
try:
for i in self.executor.map(timeout_call,
[False, False, True],
timeout=1):
results.append(i)
except futures.TimeoutError:
pass
else:
self.fail('expected TimeoutError')
finally:
timeout_call.close()
self.assertEquals([42, 42], results)
class ThreadPoolExecutorTest(ExecutorTest):
def setUp(self):
self.executor = futures.ThreadPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class ProcessPoolExecutorTest(ExecutorTest):
def setUp(self):
self.executor = futures.ProcessPoolExecutor(max_workers=1)
def tearDown(self):
self.executor.shutdown(wait=True)
class FutureTests(unittest.TestCase):
def test_done_callback_with_result(self):
callback_result = None
def fn(callback_future):
nonlocal callback_result
callback_result = callback_future.result()
f = Future()
f.add_done_callback(fn)
f.set_result(5)
self.assertEquals(5, callback_result)
def test_done_callback_with_exception(self):
callback_exception = None
def fn(callback_future):
nonlocal callback_exception
callback_exception = callback_future.exception()
f = Future()
f.add_done_callback(fn)
f.set_exception(Exception('test'))
self.assertEquals(('test',), callback_exception.args)
def test_done_callback_with_cancel(self):
was_cancelled = None
def fn(callback_future):
nonlocal was_cancelled
was_cancelled = callback_future.cancelled()
f = Future()
f.add_done_callback(fn)
self.assertTrue(f.cancel())
self.assertTrue(was_cancelled)
def test_done_callback_raises(self):
raising_was_called = False
fn_was_called = False
def raising_fn(callback_future):
nonlocal raising_was_called
raising_was_called = True
raise Exception('doh!')
def fn(callback_future):
nonlocal fn_was_called
fn_was_called = True
f = Future()
f.add_done_callback(raising_fn)
f.add_done_callback(fn)
f.set_result(5)
self.assertTrue(raising_was_called)
self.assertTrue(fn_was_called)
def test_done_callback_already_successful(self):
callback_result = None
def fn(callback_future):
nonlocal callback_result
callback_result = callback_future.result()
f = Future()
f.set_result(5)
f.add_done_callback(fn)
self.assertEquals(5, callback_result)
def test_done_callback_already_failed(self):
callback_exception = None
def fn(callback_future):
nonlocal callback_exception
callback_exception = callback_future.exception()
f = Future()
f.set_exception(Exception('test'))
f.add_done_callback(fn)
self.assertEquals(('test',), callback_exception.args)
def test_done_callback_already_cancelled(self):
was_cancelled = None
def fn(callback_future):
nonlocal was_cancelled
was_cancelled = callback_future.cancelled()
f = Future()
self.assertTrue(f.cancel())
f.add_done_callback(fn)
self.assertTrue(was_cancelled)
def test_repr(self):
self.assertRegexpMatches(repr(PENDING_FUTURE),
'<Future at 0x[0-9a-f]+ state=pending>')
self.assertRegexpMatches(repr(RUNNING_FUTURE),
'<Future at 0x[0-9a-f]+ state=running>')
self.assertRegexpMatches(repr(CANCELLED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegexpMatches(repr(CANCELLED_AND_NOTIFIED_FUTURE),
'<Future at 0x[0-9a-f]+ state=cancelled>')
self.assertRegexpMatches(
repr(EXCEPTION_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished raised IOError>')
self.assertRegexpMatches(
repr(SUCCESSFUL_FUTURE),
'<Future at 0x[0-9a-f]+ state=finished returned int>')
def test_cancel(self):
f1 = create_future(state=PENDING)
f2 = create_future(state=RUNNING)
f3 = create_future(state=CANCELLED)
f4 = create_future(state=CANCELLED_AND_NOTIFIED)
f5 = create_future(state=FINISHED, exception=IOError())
f6 = create_future(state=FINISHED, result=5)
self.assertTrue(f1.cancel())
self.assertEquals(f1._state, CANCELLED)
self.assertFalse(f2.cancel())
self.assertEquals(f2._state, RUNNING)
self.assertTrue(f3.cancel())
self.assertEquals(f3._state, CANCELLED)
self.assertTrue(f4.cancel())
self.assertEquals(f4._state, CANCELLED_AND_NOTIFIED)
self.assertFalse(f5.cancel())
self.assertEquals(f5._state, FINISHED)
self.assertFalse(f6.cancel())
self.assertEquals(f6._state, FINISHED)
def test_cancelled(self):
self.assertFalse(PENDING_FUTURE.cancelled())
self.assertFalse(RUNNING_FUTURE.cancelled())
self.assertTrue(CANCELLED_FUTURE.cancelled())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
self.assertFalse(EXCEPTION_FUTURE.cancelled())
self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
def test_done(self):
self.assertFalse(PENDING_FUTURE.done())
self.assertFalse(RUNNING_FUTURE.done())
self.assertTrue(CANCELLED_FUTURE.done())
self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
self.assertTrue(EXCEPTION_FUTURE.done())
self.assertTrue(SUCCESSFUL_FUTURE.done())
def test_running(self):
self.assertFalse(PENDING_FUTURE.running())
self.assertTrue(RUNNING_FUTURE.running())
self.assertFalse(CANCELLED_FUTURE.running())
self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
self.assertFalse(EXCEPTION_FUTURE.running())
self.assertFalse(SUCCESSFUL_FUTURE.running())
def test_result_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.result, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.result, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
self.assertRaises(IOError, EXCEPTION_FUTURE.result, timeout=0)
self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
def test_result_with_success(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.set_result(42)
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertEquals(f1.result(timeout=5), 42)
def test_result_with_cancel(self):
# TODO(brian@sweetapp.com): This test is timing dependant.
def notification():
# Wait until the main thread is waiting for the result.
time.sleep(1)
f1.cancel()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertRaises(futures.CancelledError, f1.result, timeout=5)
def test_exception_with_timeout(self):
self.assertRaises(futures.TimeoutError,
PENDING_FUTURE.exception, timeout=0)
self.assertRaises(futures.TimeoutError,
RUNNING_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_FUTURE.exception, timeout=0)
self.assertRaises(futures.CancelledError,
CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
IOError))
self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
def test_exception_with_success(self):
def notification():
# Wait until the main thread is waiting for the exception.
time.sleep(1)
with f1._condition:
f1._state = FINISHED
f1._exception = IOError()
f1._condition.notify_all()
f1 = create_future(state=PENDING)
t = threading.Thread(target=notification)
t.start()
self.assertTrue(isinstance(f1.exception(timeout=5), IOError))
def test_main():
test.support.run_unittest(ProcessPoolExecutorTest,
ThreadPoolExecutorTest,
ProcessPoolWaitTests,
ThreadPoolWaitTests,
ProcessPoolAsCompletedTests,
ThreadPoolAsCompletedTests,
FutureTests,
ProcessPoolShutdownTest,
ThreadPoolShutdownTest)
if __name__ == "__main__":
test_main()