From 548d5fca3e1998d975ac128208829caec373888b Mon Sep 17 00:00:00 2001
From: Batuhan Taskaya
Date: Sun, 15 Mar 2020 05:43:27 +0300
Subject: [PATCH 1/4] bpo-39360: Ensure all workers exit when finalizing a
multiprocessing.Pool
---
Lib/multiprocessing/pool.py | 4 ++--
Lib/test/_test_multiprocessing.py | 15 +++++++++++++++
.../2020-03-15-05-41-05.bpo-39360.cmcU5p.rst | 2 ++
3 files changed, 19 insertions(+), 2 deletions(-)
create mode 100644 Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index b223d6aa724bb6..55cdb609a5558c 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -651,8 +651,6 @@ def close(self):
def terminate(self):
util.debug('terminating pool')
self._state = TERMINATE
- self._worker_handler._state = TERMINATE
- self._change_notifier.put(None)
self._terminate()
def join(self):
@@ -683,6 +681,8 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
util.debug('finalizing pool')
worker_handler._state = TERMINATE
+ change_notifier.put(None)
+
task_handler._state = TERMINATE
util.debug('helping task handler/workers to finish')
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index b985d51508cb75..e8c9227216c4d6 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -2780,6 +2780,21 @@ def test_pool_worker_lifetime_early_close(self):
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
+ def test_pool_hang(self):
+ # tests cases against bpo-38744 and bpo-39360
+ cmd = '''if 1:
+ from multiprocessing import Pool
+ class A:
+ def init(self):
+ self.pool = Pool(processes=1)
+ def do_something(x):
+ return x1
+ problem = A()
+ problem.pool.map(do_something, [1,2,3])
+ '''
+ rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
+ self.assertEqual(rc, 0)
+
#
# Test of creating a customized manager class
#
diff --git a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
new file mode 100644
index 00000000000000..85afe726dd20fe
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
@@ -0,0 +1,2 @@
+Ensure all workers exit when finalizing a :class:`multiprocessing.Pool`.
+Patch by Batuhan Taskaya.
From 207d611c9f832956b7a990108849ef3dc04bbcc9 Mon Sep 17 00:00:00 2001
From: =?UTF-8?q?Batuhan=20Ta=C5=9Fkaya?=
Date: Sun, 15 Mar 2020 06:02:12 +0300
Subject: [PATCH 2/4] Update
Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
Co-Authored-By: Pablo Galindo
---
Lib/test/_test_multiprocessing.py | 4 ++--
.../next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst | 5 +++--
2 files changed, 5 insertions(+), 4 deletions(-)
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index e8c9227216c4d6..3c0d3c030bf3f4 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -2785,10 +2785,10 @@ def test_pool_hang(self):
cmd = '''if 1:
from multiprocessing import Pool
class A:
- def init(self):
+ def __init__(self):
self.pool = Pool(processes=1)
def do_something(x):
- return x1
+ return x + 1
problem = A()
problem.pool.map(do_something, [1,2,3])
'''
diff --git a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
index 85afe726dd20fe..86b96012498f6d 100644
--- a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
+++ b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
@@ -1,2 +1,3 @@
-Ensure all workers exit when finalizing a :class:`multiprocessing.Pool`.
-Patch by Batuhan Taskaya.
+Ensure all workers exit when finalizing a :class:`multiprocessing.Pool` implicitly via the module finalization
+handlers of multiprocessing. This fixes a deadlock situation that can be experienced when the Pool is not
+properly finalized via the context manager or a call to ``multiprocessing.Pool.terminate``. Patch by Batuhan Taskaya.
From 9f17cba72cb4e28aff254a7d44225f88ef873f2b Mon Sep 17 00:00:00 2001
From: Batuhan Taskaya
Date: Sun, 15 Mar 2020 14:09:48 +0300
Subject: [PATCH 3/4] some changes
---
Lib/multiprocessing/pool.py | 12 ++++++++++--
Lib/test/_test_multiprocessing.py | 2 +-
.../Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst | 3 ++-
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 55cdb609a5558c..8bd9608b0e7dd7 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -651,6 +651,8 @@ def close(self):
def terminate(self):
util.debug('terminating pool')
self._state = TERMINATE
+ self._worker_handler._state = TERMINATE
+ self._change_notifier.put(None)
self._terminate()
def join(self):
@@ -680,8 +682,14 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
# this is guaranteed to only be called once
util.debug('finalizing pool')
- worker_handler._state = TERMINATE
- change_notifier.put(None)
+ # Explicitly do the cleanup here if it didn't come from terminate()
+ # (required for if the queue will block, if it is already closed)
+ if worker_handler._state != TERMINATE:
+ # Notify that the worker_handler state has been changed so the
+ # _handle_workers loop can be unblocked (and exited) in order to
+ # send the finalization sentinel all the workers.
+ worker_handler._state = TERMINATE
+ change_notifier.put(None)
task_handler._state = TERMINATE
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 3c0d3c030bf3f4..028d12112154ef 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -2780,7 +2780,7 @@ def test_pool_worker_lifetime_early_close(self):
for (j, res) in enumerate(results):
self.assertEqual(res.get(), sqr(j))
- def test_pool_hang(self):
+ def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
# tests cases against bpo-38744 and bpo-39360
cmd = '''if 1:
from multiprocessing import Pool
diff --git a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
index 86b96012498f6d..148878886e6ee5 100644
--- a/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
+++ b/Misc/NEWS.d/next/Library/2020-03-15-05-41-05.bpo-39360.cmcU5p.rst
@@ -1,3 +1,4 @@
Ensure all workers exit when finalizing a :class:`multiprocessing.Pool` implicitly via the module finalization
handlers of multiprocessing. This fixes a deadlock situation that can be experienced when the Pool is not
-properly finalized via the context manager or a call to ``multiprocessing.Pool.terminate``. Patch by Batuhan Taskaya.
+properly finalized via the context manager or a call to ``multiprocessing.Pool.terminate``. Patch by Batuhan Taskaya
+and Pablo Galindo.
From 0d008712438f310ac001d320e2d884ee794115e4 Mon Sep 17 00:00:00 2001
From: Pablo Galindo
Date: Sun, 15 Mar 2020 18:01:41 +0000
Subject: [PATCH 4/4] Make it work with spawn
---
Lib/multiprocessing/pool.py | 15 +++++----------
Lib/test/_test_multiprocessing.py | 11 +++++++----
2 files changed, 12 insertions(+), 14 deletions(-)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
index 8bd9608b0e7dd7..41dd923d4f9740 100644
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -651,8 +651,6 @@ def close(self):
def terminate(self):
util.debug('terminating pool')
self._state = TERMINATE
- self._worker_handler._state = TERMINATE
- self._change_notifier.put(None)
self._terminate()
def join(self):
@@ -682,14 +680,11 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier,
# this is guaranteed to only be called once
util.debug('finalizing pool')
- # Explicitly do the cleanup here if it didn't come from terminate()
- # (required for if the queue will block, if it is already closed)
- if worker_handler._state != TERMINATE:
- # Notify that the worker_handler state has been changed so the
- # _handle_workers loop can be unblocked (and exited) in order to
- # send the finalization sentinel all the workers.
- worker_handler._state = TERMINATE
- change_notifier.put(None)
+ # Notify that the worker_handler state has been changed so the
+ # _handle_workers loop can be unblocked (and exited) in order to
+ # send the finalization sentinel all the workers.
+ worker_handler._state = TERMINATE
+ change_notifier.put(None)
task_handler._state = TERMINATE
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
index 028d12112154ef..4a87b1761f9efe 100644
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -2784,13 +2784,16 @@ def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
# tests cases against bpo-38744 and bpo-39360
cmd = '''if 1:
from multiprocessing import Pool
+ problem = None
class A:
def __init__(self):
self.pool = Pool(processes=1)
- def do_something(x):
- return x + 1
- problem = A()
- problem.pool.map(do_something, [1,2,3])
+ def test():
+ global problem
+ problem = A()
+ problem.pool.map(float, tuple(range(10)))
+ if __name__ == "__main__":
+ test()
'''
rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
self.assertEqual(rc, 0)