bpo-28053: Complete and fix custom reducers in multiprocessing.#9959
bpo-28053: Complete and fix custom reducers in multiprocessing.#9959pablogsal wants to merge 13 commits intopython:mainfrom
Conversation
bc59cf2 to
199325d
Compare
11763cf to
0745094
Compare
|
Thanks @pablogsal for attacking this issue :-) |
6ee544d to
e27684c
Compare
|
@pitrou Thank you very much for the review! I have simplified the API. Now setting a custom reducer looks like this: import multiprocessing
from multiprocessing.reduction import AbstractReducer, ForkingPickler
class ForkingPicklerProtocol2(ForkingPickler):
@classmethod
def dumps(cls, obj, pickle_protocol=2):
return super().dumps(obj, protocol=pickle_protocol)
class PickleProtocol2Reducer(AbstractReducer):
def get_pickler_class(self):
return ForkingPicklerProtocol2
multiprocessing.set_reducer(PickleProtocol2Reducer)I am making the interface a bit more strict, so |
8a68606 to
5e2fe4f
Compare
|
@pablogsal Do you need another review on this? |
a16e157 to
856716c
Compare
|
@pitrou It took me a while but I have stabilized all tests and fixed some details on Windows. I have also added Listener and Client to the context so they also can benefit from custom reducers. Please, check my previous comment regarding some details. This patch is already very big and very very complex and when errors happen they are extremely obscure or platform dependent, so I apologize in advance if I miss something obvious, but I have too many spinning plates. Could you take another look? |
715733f to
7ee45e1
Compare
pitrou
left a comment
There was a problem hiding this comment.
Thanks for the update. It seems there are test failures on all 3 CI platforms...
|
|
||
| Defaults to :meth:`pickle.Pickle.dump` | ||
|
|
||
| .. classmethod:: loads(bytes_object, *, fix_imports=True, encoding="ASCII", errors="strict") |
There was a problem hiding this comment.
Are the optional arguments required? Does multiprocessing ever pass them explicitly?
There was a problem hiding this comment.
Also, the method / classmethod asymmetry is weird and doesn't help designing an implementation. Do you think that can be fixed (one way or the other)?
There was a problem hiding this comment.
Yes, I find this very weird as well. We can make the load() be a method but that would require instantiating a Pickler() object for no reason (AbstractPickler must inherit from Pickler to make the dump work correctly). It will help with the asymmetry, though.
What do you think?
There was a problem hiding this comment.
Notice that to instantiate the Pickler class we need to provide a dummy file-like object (probably a StringIO instance). I find that suboptimal as well.
There was a problem hiding this comment.
The other possibility is making dump a method. In that case, we would need to create a Pickler instance and copy and update the dispatch table over it every time is called.
|
|
||
| .. method:: get_pickler_class(): | ||
|
|
||
| This method must return an subclass of :class:`pickler.Pickler` to be used by |
There was a problem hiding this comment.
This does not make it very clear the relation ship between pickler.Pickler and AbstractPickler.
There was a problem hiding this comment.
That should be AbstractPickler
| impossible to be sure where the message boundaries lie. | ||
|
|
||
| Custom Reduction | ||
| ~~~~~~~~~~~~~~~~ |
There was a problem hiding this comment.
You'll need some versionadded directive at some point.
There was a problem hiding this comment.
Should we add this? Technically this PR is fixing the previous implementation, although as the old one was broken, one could argue that we are adding the feature.
|
|
||
| loads = pickle.loads | ||
| @classmethod | ||
| def loads(cls, bytes_object, *, fix_imports=True, |
There was a problem hiding this comment.
Uh... I hadn't noticed these were class methods...
There was a problem hiding this comment.
The problem is that loads do not need to instantiate a Pickler class so it was designed here as a class method.
Would you prefer it to be a regular method that does the same (defers the call to pickle.loads)?
|
|
||
|
|
||
| def loads(s, *, fix_imports=True, encoding="ASCII", errors="strict"): | ||
| return ForkingPickler.loads(s, fix_imports=fix_imports, |
There was a problem hiding this comment.
By the way, I see that sharedctypes is still using _ForkingPickler directly. Should it be fixed as well?
Lib/test/_test_multiprocessing.py
Outdated
|
|
||
| @classmethod | ||
| def _put_and_get_in_queue(cls, queue, parent_can_continue): | ||
| parent_can_continue.set() |
There was a problem hiding this comment.
If you want to re-use the event afterwards, you have to clear it at some point. But I'm not sure all this synchronization is necessary (the queue already synchronizes for you).
Lib/test/_test_multiprocessing.py
Outdated
| p = self.custom_ctx.Process(target=self._put_and_get_in_queue, | ||
| args=(queue, parent_can_continue)) | ||
| p.start() | ||
| parent_can_continue.wait() |
There was a problem hiding this comment.
You shouldn't need this wait, AFAICT.
Lib/test/_test_multiprocessing.py
Outdated
| parent_can_continue.set() | ||
| queue.put("Something") | ||
| queue.get(timeout=TIMEOUT2) | ||
| close_queue(queue) |
There was a problem hiding this comment.
Is it useful to close the queue explicitly?
There was a problem hiding this comment.
Yep, there were a bunch of race conditions in the CI related to this process not finishing. It seems is related to the queue thread doing something. I did not dig deeper but as soon as I added the extra synchronization, the failures went away.
I will increase the timeouts and try to remove this to see what happens.
Lib/test/_test_multiprocessing.py
Outdated
| element = queue.get(timeout=TIMEOUT3) | ||
| self.assertEqual(element, "Something") | ||
| queue.put("Other_Something") | ||
| parent_can_continue.wait() |
There was a problem hiding this comment.
Not sure you need this, either (and I don't think you have to close the queue, unless it helps test something?).
There was a problem hiding this comment.
Check the previous comment.
I will try to increase the timeouts and remove the event to see if the failures do not appear.
|
|
||
|
|
||
| @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") | ||
| def test_queue_custom_reducer_over_default_context(self): |
There was a problem hiding this comment.
Same comments as above about events and queues.
a6734a0 to
f20e142
Compare
|
@pitrou There is some failures in Windows that I am investigating but I found a problem. In multiprocessing/context.py there is no way of passing down the current context to the Process class. The This is the only exception as any other class is defined in I don't know how to solve this, but basically tests like Trying to do something like: fails because As you are more used to the architecture of the multiprocessing module, do you see a way of solving this? If you don't see a way, I'm afraid that custom reducers per context cannot be implemented because of the way multiprocessing is architected. |
|
@pablogsal thanks a lot for putting this PR together, this is great work. This feature is very promising. Regarding the issue concerning the See proof of concept below @property
def Process(self):
if not self._custom_reduction_enabled:
# Ensure backward compatibility by returning a class when no
# custom reducer was specified
return _Process # ForkProcess for ForkContext, Process for BaseContext etc.
else:
return self.process_factory
def process_factory(self, *args, **kwargs):
p = Process(*args, **kwargs)
p._ctx = self.get_context()
return pMore complete implementation here: Gist showing a usage example and its behavior: What do you think? EDIT: Another question is whether or not we consider that cpython/Doc/library/multiprocessing.rst Lines 166 to 168 in 5a7132f If we want to enable |
|
Another topic that appears many times in this PR is the strange A good way IMO to re-establish the symmetry here would be to use actual Thus, we could add an optional |
This PR tries to complete and fix the implementation of the custom reducer classes in
multiprocessing.Important
I have marked the PR as
DO-NOT-MERGEbecause I have still several doubts about the previous implemented API, regarding theAbstractReducerbase class and the methods that the user needs to implement and how the rest of the library interacts withmultiprocessing.reducer. For example:I am not sure
multiprocessing.reducer.dumpsandmultiprocessing.reducer.registerare needed outside theForklingPicklerclass and how that interacts with the ABC.I am not sure the
AbstractReduceris implemented completely (there is no abstract methods marked).This PR is a draft implementation of the complete API, tests and documentation so we can discuss how to implement these correctly in a better way.
https://bugs.python.org/issue28053