Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 77 additions & 11 deletions Lib/multiprocessing/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,13 @@
import time
import traceback
import warnings
from queue import Empty

# If threading is available then ThreadPool should be provided. Therefore
# we avoid top-level imports which are liable to fail on some systems.
from . import util
from . import get_context, TimeoutError
from .connection import wait

#
# Constants representing the state of a pool
Expand Down Expand Up @@ -145,6 +147,29 @@ def _helper_reraises_exception(ex):
# Class representing a process pool
#

class _PoolCache(dict):
"""
Class that implements a cache for the Pool class that will notify
the pool management threads every time the cache is emptied. The
notification is done by the use of a queue that is provided when
instantiating the cache.
"""
def __init__(self, *args, notifier=None, **kwds):
self.notifier = notifier
super().__init__(*args, **kwds)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PEP 8, please add an empty line between methods.


def __delitem__(self, item):
super().__delitem__(item)

# Notify that the cache is empty. This is important because the
# pool keeps maintaining workers until the cache gets drained. This
# eliminates a race condition in which a task is finished after the
# the pool's _handle_workers method has enter another iteration of the
# loop. In this situation, the only event that can wake up the pool
# is the cache to be emptied (no more tasks available).
if not self:
self.notifier.put(None)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you add a comment explaining why it's important to wake up when the cache is emptied?


class Pool(object):
'''
Class which supports an async version of applying functions to arguments.
Expand All @@ -165,7 +190,11 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._ctx = context or get_context()
self._setup_queues()
self._taskqueue = queue.SimpleQueue()
self._cache = {}
# The _change_notifier queue exist to wake up self._handle_workers()
# when the cache (self._cache) is empty or when there is a change in
# the _state variable of the thread that runs _handle_workers.
self._change_notifier = self._ctx.SimpleQueue()
self._cache = _PoolCache(notifier=self._change_notifier)
self._maxtasksperchild = maxtasksperchild
self._initializer = initializer
self._initargs = initargs
Expand All @@ -189,12 +218,14 @@ def __init__(self, processes=None, initializer=None, initargs=(),
p.join()
raise

sentinels = self._get_sentinels()

self._worker_handler = threading.Thread(
target=Pool._handle_workers,
args=(self._cache, self._taskqueue, self._ctx, self.Process,
self._processes, self._pool, self._inqueue, self._outqueue,
self._initializer, self._initargs, self._maxtasksperchild,
self._wrap_exception)
self._wrap_exception, sentinels, self._change_notifier)
)
self._worker_handler.daemon = True
self._worker_handler._state = RUN
Expand All @@ -221,7 +252,7 @@ def __init__(self, processes=None, initializer=None, initargs=(),
self._terminate = util.Finalize(
self, self._terminate_pool,
args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
self._worker_handler, self._task_handler,
self._change_notifier, self._worker_handler, self._task_handler,
self._result_handler, self._cache),
exitpriority=15
)
Expand All @@ -233,13 +264,25 @@ def __del__(self, _warn=warnings.warn, RUN=RUN):
if self._state == RUN:
_warn(f"unclosed running multiprocessing pool {self!r}",
ResourceWarning, source=self)
if getattr(self, '_change_notifier', None) is not None:
self._change_notifier.put(None)

def __repr__(self):
cls = self.__class__
return (f'<{cls.__module__}.{cls.__qualname__} '
f'state={self._state} '
f'pool_size={len(self._pool)}>')

def _get_sentinels(self):
task_queue_sentinels = [self._outqueue._reader]
self_notifier_sentinels = [self._change_notifier._reader]
return [*task_queue_sentinels, *self_notifier_sentinels]

@staticmethod
def _get_worker_sentinels(workers):
return [worker.sentinel for worker in
workers if hasattr(worker, "sentinel")]

@staticmethod
def _join_exited_workers(pool):
"""Cleanup after any worker processes which have exited due to reaching
Expand Down Expand Up @@ -452,18 +495,28 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None,
return result

@staticmethod
def _handle_workers(cache, taskqueue, ctx, Process, processes, pool,
inqueue, outqueue, initializer, initargs,
maxtasksperchild, wrap_exception):
def _wait_for_updates(sentinels, change_notifier, timeout=None):
wait(sentinels, timeout=timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you please add a comment on wait() to explain that it completes when at least one sentinel is set and that it's important to not wait until all sentinels completed, but exit frequently to refresh the pool.

This point is non-trivial and it surprised me when I wrote PR #11136, my comment #11136 (comment):

My change doesn't work: self._worker_state_event isn't set when a worker completes, whereas _maintain_pool() should be called frequently to check when a worker completed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't ask to documen the behavior of wait, but more explicit that we stop as soon as the first event complete on purpose.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks obvious to me, especially as the function is named wait_for_updates, but I guess it doesn't hurt to add a comment.

while not change_notifier.empty():
change_notifier.get()

@classmethod
def _handle_workers(cls, cache, taskqueue, ctx, Process, processes,
pool, inqueue, outqueue, initializer, initargs,
maxtasksperchild, wrap_exception, sentinels,
change_notifier):
thread = threading.current_thread()

# Keep maintaining workers until the cache gets drained, unless the pool
# is terminated.
while thread._state == RUN or (cache and thread._state != TERMINATE):
Pool._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)
time.sleep(0.1)
cls._maintain_pool(ctx, Process, processes, pool, inqueue,
outqueue, initializer, initargs,
maxtasksperchild, wrap_exception)

current_sentinels = [*cls._get_worker_sentinels(pool), *sentinels]

cls._wait_for_updates(current_sentinels, change_notifier)
# send sentinel to stop workers
taskqueue.put(None)
util.debug('worker handler exiting')
Expand Down Expand Up @@ -593,11 +646,13 @@ def close(self):
if self._state == RUN:
self._state = CLOSE
self._worker_handler._state = CLOSE
self._change_notifier.put(None)

def terminate(self):
util.debug('terminating pool')
self._state = TERMINATE
self._worker_handler._state = TERMINATE
self._change_notifier.put(None)
self._terminate()

def join(self):
Expand All @@ -622,7 +677,7 @@ def _help_stuff_finish(inqueue, task_handler, size):
time.sleep(0)

@classmethod
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
worker_handler, task_handler, result_handler, cache):
# this is guaranteed to only be called once
util.debug('finalizing pool')
Expand All @@ -638,6 +693,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
"Cannot have cache with result_hander not alive")

result_handler._state = TERMINATE
change_notifier.put(None)
outqueue.put(None) # sentinel

# We must wait for the worker handler to exit before terminating
Expand Down Expand Up @@ -871,6 +927,13 @@ def _setup_queues(self):
self._quick_put = self._inqueue.put
self._quick_get = self._outqueue.get

def _get_sentinels(self):
return [self._change_notifier._reader]

@staticmethod
def _get_worker_sentinels(workers):
return []

@staticmethod
def _help_stuff_finish(inqueue, task_handler, size):
# drain inqueue, and put sentinels at its head to make workers finish
Expand All @@ -881,3 +944,6 @@ def _help_stuff_finish(inqueue, task_handler, size):
pass
for i in range(size):
inqueue.put(None)

def _wait_for_updates(self, sentinels, change_notifier, timeout):
time.sleep(timeout)
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
Use :func:`multiprocessing.connection.wait` instead of polling each 0.2
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, I clicked on the wrong button :-( Your NEWS entry still doesn't explain that only process pools are affected.

Another issue: "polling each 0.2 seconds": currently the code uses 0.1 seconds.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry, I completely forgot about that while fighting Windows issues :/

seconds for worker updates in :class:`multiprocessing.Pool`. Patch by Pablo
Galindo.