-
-
Notifications
You must be signed in to change notification settings - Fork 34.1k
bpo-35493: Use Process.sentinel instead of sleeping for polling worker status in multiprocessing.Pool #11488
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
b5c7d83
839f2e2
46d9625
9224c33
e1ee023
8d72dc8
ab44556
41cf470
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,11 +21,13 @@ | |
| import time | ||
| import traceback | ||
| import warnings | ||
| from queue import Empty | ||
|
|
||
| # If threading is available then ThreadPool should be provided. Therefore | ||
| # we avoid top-level imports which are liable to fail on some systems. | ||
| from . import util | ||
| from . import get_context, TimeoutError | ||
| from .connection import wait | ||
|
|
||
| # | ||
| # Constants representing the state of a pool | ||
|
|
@@ -145,6 +147,29 @@ def _helper_reraises_exception(ex): | |
| # Class representing a process pool | ||
| # | ||
|
|
||
| class _PoolCache(dict): | ||
| """ | ||
| Class that implements a cache for the Pool class that will notify | ||
| the pool management threads every time the cache is emptied. The | ||
| notification is done by the use of a queue that is provided when | ||
| instantiating the cache. | ||
| """ | ||
| def __init__(self, *args, notifier=None, **kwds): | ||
| self.notifier = notifier | ||
| super().__init__(*args, **kwds) | ||
|
|
||
| def __delitem__(self, item): | ||
| super().__delitem__(item) | ||
|
|
||
| # Notify that the cache is empty. This is important because the | ||
| # pool keeps maintaining workers until the cache gets drained. This | ||
| # eliminates a race condition in which a task is finished after the | ||
| # the pool's _handle_workers method has enter another iteration of the | ||
| # loop. In this situation, the only event that can wake up the pool | ||
| # is the cache to be emptied (no more tasks available). | ||
| if not self: | ||
| self.notifier.put(None) | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you add a comment explaining why it's important to wake up when the cache is emptied? |
||
|
|
||
| class Pool(object): | ||
| ''' | ||
| Class which supports an async version of applying functions to arguments. | ||
|
|
@@ -165,7 +190,11 @@ def __init__(self, processes=None, initializer=None, initargs=(), | |
| self._ctx = context or get_context() | ||
| self._setup_queues() | ||
| self._taskqueue = queue.SimpleQueue() | ||
| self._cache = {} | ||
| # The _change_notifier queue exist to wake up self._handle_workers() | ||
| # when the cache (self._cache) is empty or when there is a change in | ||
| # the _state variable of the thread that runs _handle_workers. | ||
| self._change_notifier = self._ctx.SimpleQueue() | ||
vstinner marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| self._cache = _PoolCache(notifier=self._change_notifier) | ||
| self._maxtasksperchild = maxtasksperchild | ||
| self._initializer = initializer | ||
| self._initargs = initargs | ||
|
|
@@ -189,12 +218,14 @@ def __init__(self, processes=None, initializer=None, initargs=(), | |
| p.join() | ||
| raise | ||
|
|
||
| sentinels = self._get_sentinels() | ||
|
|
||
| self._worker_handler = threading.Thread( | ||
| target=Pool._handle_workers, | ||
| args=(self._cache, self._taskqueue, self._ctx, self.Process, | ||
| self._processes, self._pool, self._inqueue, self._outqueue, | ||
| self._initializer, self._initargs, self._maxtasksperchild, | ||
| self._wrap_exception) | ||
| self._wrap_exception, sentinels, self._change_notifier) | ||
| ) | ||
| self._worker_handler.daemon = True | ||
| self._worker_handler._state = RUN | ||
|
|
@@ -221,7 +252,7 @@ def __init__(self, processes=None, initializer=None, initargs=(), | |
| self._terminate = util.Finalize( | ||
| self, self._terminate_pool, | ||
| args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, | ||
| self._worker_handler, self._task_handler, | ||
| self._change_notifier, self._worker_handler, self._task_handler, | ||
| self._result_handler, self._cache), | ||
| exitpriority=15 | ||
| ) | ||
|
|
@@ -233,13 +264,25 @@ def __del__(self, _warn=warnings.warn, RUN=RUN): | |
| if self._state == RUN: | ||
| _warn(f"unclosed running multiprocessing pool {self!r}", | ||
| ResourceWarning, source=self) | ||
| if getattr(self, '_change_notifier', None) is not None: | ||
| self._change_notifier.put(None) | ||
|
|
||
| def __repr__(self): | ||
| cls = self.__class__ | ||
| return (f'<{cls.__module__}.{cls.__qualname__} ' | ||
| f'state={self._state} ' | ||
| f'pool_size={len(self._pool)}>') | ||
|
|
||
| def _get_sentinels(self): | ||
| task_queue_sentinels = [self._outqueue._reader] | ||
| self_notifier_sentinels = [self._change_notifier._reader] | ||
| return [*task_queue_sentinels, *self_notifier_sentinels] | ||
|
|
||
| @staticmethod | ||
| def _get_worker_sentinels(workers): | ||
| return [worker.sentinel for worker in | ||
| workers if hasattr(worker, "sentinel")] | ||
pitrou marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| @staticmethod | ||
| def _join_exited_workers(pool): | ||
| """Cleanup after any worker processes which have exited due to reaching | ||
|
|
@@ -452,18 +495,28 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, | |
| return result | ||
|
|
||
| @staticmethod | ||
| def _handle_workers(cache, taskqueue, ctx, Process, processes, pool, | ||
| inqueue, outqueue, initializer, initargs, | ||
| maxtasksperchild, wrap_exception): | ||
| def _wait_for_updates(sentinels, change_notifier, timeout=None): | ||
| wait(sentinels, timeout=timeout) | ||
pablogsal marked this conversation as resolved.
Show resolved
Hide resolved
vstinner marked this conversation as resolved.
Outdated
Show resolved
Hide resolved
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you please add a comment on wait() to explain that it completes when at least one sentinel is set and that it's important to not wait until all sentinels completed, but exit frequently to refresh the pool. This point is non-trivial and it surprised me when I wrote PR #11136, my comment #11136 (comment):
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't ask to documen the behavior of wait, but more explicit that we stop as soon as the first event complete on purpose.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It looks obvious to me, especially as the function is named |
||
| while not change_notifier.empty(): | ||
| change_notifier.get() | ||
|
|
||
| @classmethod | ||
| def _handle_workers(cls, cache, taskqueue, ctx, Process, processes, | ||
| pool, inqueue, outqueue, initializer, initargs, | ||
| maxtasksperchild, wrap_exception, sentinels, | ||
| change_notifier): | ||
| thread = threading.current_thread() | ||
|
|
||
| # Keep maintaining workers until the cache gets drained, unless the pool | ||
| # is terminated. | ||
| while thread._state == RUN or (cache and thread._state != TERMINATE): | ||
| Pool._maintain_pool(ctx, Process, processes, pool, inqueue, | ||
| outqueue, initializer, initargs, | ||
| maxtasksperchild, wrap_exception) | ||
| time.sleep(0.1) | ||
| cls._maintain_pool(ctx, Process, processes, pool, inqueue, | ||
| outqueue, initializer, initargs, | ||
| maxtasksperchild, wrap_exception) | ||
|
|
||
| current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels] | ||
|
|
||
| cls._wait_for_updates(current_sentinels, change_notifier) | ||
| # send sentinel to stop workers | ||
| taskqueue.put(None) | ||
| util.debug('worker handler exiting') | ||
|
|
@@ -593,11 +646,13 @@ def close(self): | |
| if self._state == RUN: | ||
| self._state = CLOSE | ||
| self._worker_handler._state = CLOSE | ||
| self._change_notifier.put(None) | ||
|
|
||
| def terminate(self): | ||
| util.debug('terminating pool') | ||
| self._state = TERMINATE | ||
| self._worker_handler._state = TERMINATE | ||
| self._change_notifier.put(None) | ||
| self._terminate() | ||
|
|
||
| def join(self): | ||
|
|
@@ -622,7 +677,7 @@ def _help_stuff_finish(inqueue, task_handler, size): | |
| time.sleep(0) | ||
|
|
||
| @classmethod | ||
| def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, | ||
| def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, | ||
| worker_handler, task_handler, result_handler, cache): | ||
| # this is guaranteed to only be called once | ||
| util.debug('finalizing pool') | ||
|
|
@@ -638,6 +693,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, | |
| "Cannot have cache with result_hander not alive") | ||
|
|
||
| result_handler._state = TERMINATE | ||
| change_notifier.put(None) | ||
| outqueue.put(None) # sentinel | ||
|
|
||
| # We must wait for the worker handler to exit before terminating | ||
|
|
@@ -871,6 +927,13 @@ def _setup_queues(self): | |
| self._quick_put = self._inqueue.put | ||
| self._quick_get = self._outqueue.get | ||
|
|
||
| def _get_sentinels(self): | ||
| return [self._change_notifier._reader] | ||
|
|
||
| @staticmethod | ||
| def _get_worker_sentinels(workers): | ||
| return [] | ||
|
|
||
| @staticmethod | ||
| def _help_stuff_finish(inqueue, task_handler, size): | ||
| # drain inqueue, and put sentinels at its head to make workers finish | ||
|
|
@@ -881,3 +944,6 @@ def _help_stuff_finish(inqueue, task_handler, size): | |
| pass | ||
| for i in range(size): | ||
| inqueue.put(None) | ||
|
|
||
| def _wait_for_updates(self, sentinels, change_notifier, timeout): | ||
| time.sleep(timeout) | ||
pablogsal marked this conversation as resolved.
Show resolved
Hide resolved
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| Use :func:`multiprocessing.connection.wait` instead of polling each 0.2 | ||
vstinner marked this conversation as resolved.
Show resolved
Hide resolved
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Oops, I clicked on the wrong button :-( Your NEWS entry still doesn't explain that only process pools are affected. Another issue: "polling each 0.2 seconds": currently the code uses 0.1 seconds.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I completely forgot about that while fighting Windows issues :/ |
||
| seconds for worker updates in :class:`multiprocessing.Pool`. Patch by Pablo | ||
| Galindo. | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
PEP 8, please add an empty line between methods.