Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions Doc/library/multiprocessing.rst
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,81 @@ For example:
the data in the pipe is likely to become corrupted, because it may become
impossible to be sure where the message boundaries lie.

Custom Reduction
~~~~~~~~~~~~~~~~
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You'll need some versionadded directive at some point.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add this? Technically this PR is fixing the previous implementation, although as the old one was broken, one could argue that we are adding the feature.


.. currentmodule:: multiprocessing

Several primitives of the :mod:`multiprocessing` module such as
:class:`multiprocessing.Queue`, :class:`multiprocessing.connection.Listener` or
:class:`multiprocessing.connection.Server` need to serialize and deserialize Python
objects to communicate between processes. Sometimes it is useful to control what
serialization is to be used for the transport of data in order to support communication
with different versions of Python, use more performant third party tools or custom
strategies.

For this purpose a set of hooks is available to provide alternate implementations of
the reduction mechanism:

.. currentmodule:: multiprocessing.reduction

.. class:: AbstractPickler()
Abstract base class that can be implemented in order to override specidifc
methods of the reduction machinery used by multiprocessing.

.. method:: dump(obj)
Write a pickled representation of obj to the open file.

Defaults to :meth:`pickle.Pickle.dump`

.. classmethod:: loads(bytes_object, *, fix_imports=True, encoding="ASCII", errors="strict")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are the optional arguments required? Does multiprocessing ever pass them explicitly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, the method / classmethod asymmetry is weird and doesn't help designing an implementation. Do you think that can be fixed (one way or the other)?

Copy link
Member Author

@pablogsal pablogsal May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I find this very weird as well. We can make the load() be a method but that would require instantiating a Pickler() object for no reason (AbstractPickler must inherit from Pickler to make the dump work correctly). It will help with the asymmetry, though.

What do you think?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Notice that to instantiate the Pickler class we need to provide a dummy file-like object (probably a StringIO instance). I find that suboptimal as well.

Copy link
Member Author

@pablogsal pablogsal May 27, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The other possibility is making dump a method. In that case, we would need to create a Pickler instance and copy and update the dispatch table over it every time is called.

Read a pickled object hierarchy from a bytes object and return the reconstituted
object hierarchy specified therein.

Defaults to :func:`pickle.loads`

.. class:: AbstractReducer()

Abstract base class that can be implemented in order to replace the standard
reduction mechanism used in multiprocessing

.. method:: get_pickler_class():

This method must return an subclass of :class:`pickler.Pickler` to be used by
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This does not make it very clear the relation ship between pickler.Pickler and AbstractPickler.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That should be AbstractPickler

the multiprocessing reducer mechanism.

.. currentmodule:: multiprocessing

.. method:: set_reducer(reduction)

Sets a reduction instance to be used for serialization and deserialization
by the module primitive internals. **reduction** must be an instance of a
subclass of :class:`multiprocessing.reduction.AbstractReducer`.

.. method:: get_reducer()

Gets the current reduction class in use by the module's primitive internals.

For example, substituting the reducer class to use the :mod:`pickle` protocol
version 2 to be able to communicate with a Python 2.x programs.::

import multiprocessing
from multiprocessing.reduction import AbstractReducer, AbstractPickler

class PicklerProtocol2(AbstractPickler):
@classmethod
def dumps(cls, obj, protocol=2):
return super().dumps(obj, protocol=protocol)

class PickleProtocol2Reducer(AbstractReducer):
def get_pickler_class(self):
return PicklerProtocol2

multiprocessing.set_reducer(PickleProtocol2Reducer())

Notice that using :meth:`multiprocessing.set_reducer` changes the reducer globally. If
changing this setting globally is undesirable you could call :meth:`context.set_reducer`,
where *context* is a context object obtained by calling :func:`get_context`.

Synchronization primitives
~~~~~~~~~~~~~~~~~~~~~~~~~~
Expand Down
51 changes: 27 additions & 24 deletions Lib/multiprocessing/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
from . import util

from . import AuthenticationError, BufferTooShort
from .context import reduction
_ForkingPickler = reduction.ForkingPickler
from . import context
from . import get_context

try:
import _winapi
Expand Down Expand Up @@ -114,7 +114,8 @@ def address_type(address):
class _ConnectionBase:
_handle = None

def __init__(self, handle, readable=True, writable=True):
def __init__(self, handle, readable=True, writable=True, ctx=None):
self._ctx = ctx or get_context()
handle = handle.__index__()
if handle < 0:
raise ValueError("invalid handle")
Expand Down Expand Up @@ -203,7 +204,7 @@ def send(self, obj):
"""Send a (picklable) object"""
self._check_closed()
self._check_writable()
self._send_bytes(_ForkingPickler.dumps(obj))
self._send_bytes(self._ctx.get_reducer().dumps(obj))

def recv_bytes(self, maxlength=None):
"""
Expand Down Expand Up @@ -248,7 +249,7 @@ def recv(self):
self._check_closed()
self._check_readable()
buf = self._recv_bytes()
return _ForkingPickler.loads(buf.getbuffer())
return self._ctx.get_reducer().loads(buf.getbuffer())

def poll(self, timeout=0.0):
"""Whether there is any input available to be read"""
Expand Down Expand Up @@ -436,16 +437,16 @@ class Listener(object):
This is a wrapper for a bound socket which is 'listening' for
connections, or for a Windows named pipe.
'''
def __init__(self, address=None, family=None, backlog=1, authkey=None):
def __init__(self, address=None, family=None, backlog=1, authkey=None, *, ctx=None):
family = family or (address and address_type(address)) \
or default_family
address = address or arbitrary_address(family)

_validate_family(family)
if family == 'AF_PIPE':
self._listener = PipeListener(address, backlog)
self._listener = PipeListener(address, backlog, ctx=ctx)
else:
self._listener = SocketListener(address, family, backlog)
self._listener = SocketListener(address, family, backlog, ctx=ctx)

if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')
Expand Down Expand Up @@ -490,16 +491,16 @@ def __exit__(self, exc_type, exc_value, exc_tb):
self.close()


def Client(address, family=None, authkey=None):
def Client(address, family=None, authkey=None, *, ctx=None):
'''
Returns a connection to the address of a `Listener`
'''
family = family or address_type(address)
_validate_family(family)
if family == 'AF_PIPE':
c = PipeClient(address)
c = PipeClient(address, ctx=ctx)
else:
c = SocketClient(address)
c = SocketClient(address, ctx=ctx)

if authkey is not None and not isinstance(authkey, bytes):
raise TypeError('authkey should be a byte string')
Expand Down Expand Up @@ -580,7 +581,7 @@ class SocketListener(object):
'''
Representation of a socket which is bound to an address and listening
'''
def __init__(self, address, family, backlog=1):
def __init__(self, address, family, backlog=1, *, ctx=None):
self._socket = socket.socket(getattr(socket, family))
try:
# SO_REUSEADDR has different semantics on Windows (issue #2550).
Expand All @@ -603,11 +604,12 @@ def __init__(self, address, family, backlog=1):
)
else:
self._unlink = None
self._ctx = ctx

def accept(self):
s, self._last_accepted = self._socket.accept()
s.setblocking(True)
return Connection(s.detach())
return Connection(s.detach(), ctx=self._ctx)

def close(self):
try:
Expand All @@ -619,15 +621,15 @@ def close(self):
unlink()


def SocketClient(address):
def SocketClient(address, *, ctx=None):
'''
Return a connection object connected to the socket given by `address`
'''
family = address_type(address)
with socket.socket( getattr(socket, family) ) as s:
s.setblocking(True)
s.connect(address)
return Connection(s.detach())
return Connection(s.detach(), ctx=ctx)

#
# Definitions for connections based on named pipes
Expand All @@ -639,7 +641,7 @@ class PipeListener(object):
'''
Representation of a named pipe
'''
def __init__(self, address, backlog=None):
def __init__(self, address, backlog=None, *, ctx=None):
self._address = address
self._handle_queue = [self._new_handle(first=True)]

Expand All @@ -649,6 +651,7 @@ def __init__(self, address, backlog=None):
self, PipeListener._finalize_pipe_listener,
args=(self._handle_queue, self._address), exitpriority=0
)
self._ctx = ctx

def _new_handle(self, first=False):
flags = _winapi.PIPE_ACCESS_DUPLEX | _winapi.FILE_FLAG_OVERLAPPED
Expand Down Expand Up @@ -683,15 +686,15 @@ def accept(self):
finally:
_, err = ov.GetOverlappedResult(True)
assert err == 0
return PipeConnection(handle)
return PipeConnection(handle, ctx=self._ctx)

@staticmethod
def _finalize_pipe_listener(queue, address):
util.sub_debug('closing listener with address=%r', address)
for handle in queue:
_winapi.CloseHandle(handle)

def PipeClient(address):
def PipeClient(address, *, ctx=None):
'''
Return a connection object connected to the pipe given by `address`
'''
Expand All @@ -716,7 +719,7 @@ def PipeClient(address):
_winapi.SetNamedPipeHandleState(
h, _winapi.PIPE_READMODE_MESSAGE, None, None
)
return PipeConnection(h)
return PipeConnection(h, ctx=ctx)

#
# Authentication stuff
Expand Down Expand Up @@ -950,23 +953,23 @@ def reduce_connection(conn):
def rebuild_connection(ds, readable, writable):
sock = ds.detach()
return Connection(sock.detach(), readable, writable)
reduction.register(Connection, reduce_connection)
context.reduction.register(Connection, reduce_connection)

def reduce_pipe_connection(conn):
access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
(_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
dh = reduction.DupHandle(conn.fileno(), access)
dh = context.reduction.DupHandle(conn.fileno(), access)
return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
def rebuild_pipe_connection(dh, readable, writable):
handle = dh.detach()
return PipeConnection(handle, readable, writable)
reduction.register(PipeConnection, reduce_pipe_connection)
context.reduction.register(PipeConnection, reduce_pipe_connection)

else:
def reduce_connection(conn):
df = reduction.DupFd(conn.fileno())
df = context.reduction.DupFd(conn.fileno())
return rebuild_connection, (df, conn.readable, conn.writable)
def rebuild_connection(df, readable, writable):
fd = df.detach()
return Connection(fd, readable, writable)
reduction.register(Connection, reduce_connection)
context.reduction.register(Connection, reduce_connection)
Loading