[Python-checkins] cpython: Issue #28053: Applying refactorings, docs and other cleanup to follow.

davin.potts python-checkins at python.org
Fri Sep 9 19:03:27 EDT 2016


https://hg.python.org/cpython/rev/7381b1b50e00
changeset:   103497:7381b1b50e00
user:        Davin Potts <python at discontinuity.net>
date:        Fri Sep 09 18:03:10 2016 -0500
summary:
  Issue #28053: Applying refactorings, docs and other cleanup to follow.

files:
  Lib/multiprocessing/connection.py        |   8 +-
  Lib/multiprocessing/context.py           |  13 +++-
  Lib/multiprocessing/forkserver.py        |   2 +-
  Lib/multiprocessing/heap.py              |   5 +-
  Lib/multiprocessing/managers.py          |   5 +-
  Lib/multiprocessing/popen_forkserver.py  |   7 +-
  Lib/multiprocessing/popen_spawn_posix.py |   7 +-
  Lib/multiprocessing/popen_spawn_win32.py |   9 +-
  Lib/multiprocessing/queues.py            |  10 +-
  Lib/multiprocessing/reduction.py         |  34 ++++++++++++
  Lib/multiprocessing/resource_sharer.py   |   2 +-
  Lib/multiprocessing/sharedctypes.py      |   6 +-
  Lib/multiprocessing/spawn.py             |   9 +-
  13 files changed, 77 insertions(+), 40 deletions(-)


diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -20,11 +20,11 @@
 
 import _multiprocessing
 
-from . import reduction
 from . import util
 
 from . import AuthenticationError, BufferTooShort
-from .reduction import ForkingPickler
+from .context import reduction
+_ForkingPickler = reduction.ForkingPickler
 
 try:
     import _winapi
@@ -203,7 +203,7 @@
         """Send a (picklable) object"""
         self._check_closed()
         self._check_writable()
-        self._send_bytes(ForkingPickler.dumps(obj))
+        self._send_bytes(_ForkingPickler.dumps(obj))
 
     def recv_bytes(self, maxlength=None):
         """
@@ -248,7 +248,7 @@
         self._check_closed()
         self._check_readable()
         buf = self._recv_bytes()
-        return ForkingPickler.loads(buf.getbuffer())
+        return _ForkingPickler.loads(buf.getbuffer())
 
     def poll(self, timeout=0.0):
         """Whether there is any input available to be read"""
diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py
--- a/Lib/multiprocessing/context.py
+++ b/Lib/multiprocessing/context.py
@@ -3,6 +3,7 @@
 import threading
 
 from . import process
+from . import reduction
 
 __all__ = []            # things are copied from here to __init__.py
 
@@ -198,6 +199,16 @@
     def set_start_method(self, method=None):
         raise ValueError('cannot set start method of concrete context')
 
+    @property
+    def reducer(self):
+        '''Controls how objects will be reduced to a form that can be
+        shared with other processes.'''
+        return globals().get('reduction')
+
+    @reducer.setter
+    def reducer(self, reduction):
+        globals()['reduction'] = reduction
+
     def _check_available(self):
         pass
 
@@ -245,7 +256,6 @@
         if sys.platform == 'win32':
             return ['spawn']
         else:
-            from . import reduction
             if reduction.HAVE_SEND_HANDLE:
                 return ['fork', 'spawn', 'forkserver']
             else:
@@ -292,7 +302,6 @@
         _name = 'forkserver'
         Process = ForkServerProcess
         def _check_available(self):
-            from . import reduction
             if not reduction.HAVE_SEND_HANDLE:
                 raise ValueError('forkserver start method not available')
 
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -9,7 +9,7 @@
 
 from . import connection
 from . import process
-from . import reduction
+from .context import reduction
 from . import semaphore_tracker
 from . import spawn
 from . import util
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -14,8 +14,7 @@
 import tempfile
 import threading
 
-from . import context
-from . import reduction
+from .context import reduction, assert_spawning
 from . import util
 
 __all__ = ['BufferWrapper']
@@ -48,7 +47,7 @@
             self._state = (self.size, self.name)
 
         def __getstate__(self):
-            context.assert_spawning(self)
+            assert_spawning(self)
             return self._state
 
         def __setstate__(self, state):
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -23,10 +23,9 @@
 from traceback import format_exc
 
 from . import connection
-from . import context
+from .context import reduction, get_spawning_popen
 from . import pool
 from . import process
-from . import reduction
 from . import util
 from . import get_context
 
@@ -833,7 +832,7 @@
 
     def __reduce__(self):
         kwds = {}
-        if context.get_spawning_popen() is not None:
+        if get_spawning_popen() is not None:
             kwds['authkey'] = self._authkey
 
         if getattr(self, '_isauto', False):
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
--- a/Lib/multiprocessing/popen_forkserver.py
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -1,10 +1,9 @@
 import io
 import os
 
-from . import reduction
+from .context import reduction, set_spawning_popen
 if not reduction.HAVE_SEND_HANDLE:
     raise ImportError('No support for sending fds between processes')
-from . import context
 from . import forkserver
 from . import popen_fork
 from . import spawn
@@ -42,12 +41,12 @@
     def _launch(self, process_obj):
         prep_data = spawn.get_preparation_data(process_obj._name)
         buf = io.BytesIO()
-        context.set_spawning_popen(self)
+        set_spawning_popen(self)
         try:
             reduction.dump(prep_data, buf)
             reduction.dump(process_obj, buf)
         finally:
-            context.set_spawning_popen(None)
+            set_spawning_popen(None)
 
         self.sentinel, w = forkserver.connect_to_new_process(self._fds)
         util.Finalize(self, os.close, (self.sentinel,))
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -1,9 +1,8 @@
 import io
 import os
 
-from . import context
+from .context import reduction, set_spawning_popen
 from . import popen_fork
-from . import reduction
 from . import spawn
 from . import util
 
@@ -42,12 +41,12 @@
         self._fds.append(tracker_fd)
         prep_data = spawn.get_preparation_data(process_obj._name)
         fp = io.BytesIO()
-        context.set_spawning_popen(self)
+        set_spawning_popen(self)
         try:
             reduction.dump(prep_data, fp)
             reduction.dump(process_obj, fp)
         finally:
-            context.set_spawning_popen(None)
+            set_spawning_popen(None)
 
         parent_r = child_w = child_r = parent_w = None
         try:
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
--- a/Lib/multiprocessing/popen_spawn_win32.py
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -4,9 +4,8 @@
 import sys
 import _winapi
 
-from . import context
+from .context import reduction, get_spawning_popen, set_spawning_popen
 from . import spawn
-from . import reduction
 from . import util
 
 __all__ = ['Popen']
@@ -60,15 +59,15 @@
             util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
 
             # send information to child
-            context.set_spawning_popen(self)
+            set_spawning_popen(self)
             try:
                 reduction.dump(prep_data, to_child)
                 reduction.dump(process_obj, to_child)
             finally:
-                context.set_spawning_popen(None)
+                set_spawning_popen(None)
 
     def duplicate_for_child(self, handle):
-        assert self is context.get_spawning_popen()
+        assert self is get_spawning_popen()
         return reduction.duplicate(handle, self.sentinel)
 
     def wait(self, timeout=None):
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -23,9 +23,9 @@
 
 from . import connection
 from . import context
+_ForkingPickler = context.reduction.ForkingPickler
 
 from .util import debug, info, Finalize, register_after_fork, is_exiting
-from .reduction import ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -110,7 +110,7 @@
             finally:
                 self._rlock.release()
         # unserialize the data after having released the lock
-        return ForkingPickler.loads(res)
+        return _ForkingPickler.loads(res)
 
     def qsize(self):
         # Raises NotImplementedError on Mac OSX because of broken sem_getvalue()
@@ -238,7 +238,7 @@
                             return
 
                         # serialize the data before acquiring the lock
-                        obj = ForkingPickler.dumps(obj)
+                        obj = _ForkingPickler.dumps(obj)
                         if wacquire is None:
                             send_bytes(obj)
                         else:
@@ -342,11 +342,11 @@
         with self._rlock:
             res = self._reader.recv_bytes()
         # unserialize the data after having released the lock
-        return ForkingPickler.loads(res)
+        return _ForkingPickler.loads(res)
 
     def put(self, obj):
         # serialize the data before acquiring the lock
-        obj = ForkingPickler.dumps(obj)
+        obj = _ForkingPickler.dumps(obj)
         if self._wlock is None:
             # writes to a message oriented win32 pipe are atomic
             self._writer.send_bytes(obj)
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -7,6 +7,7 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
+from abc import ABCMeta, abstractmethod
 import copyreg
 import functools
 import io
@@ -238,3 +239,36 @@
         fd = df.detach()
         return socket.socket(family, type, proto, fileno=fd)
     register(socket.socket, _reduce_socket)
+
+
+class AbstractReducer(metaclass=ABCMeta):
+    '''Abstract base class for use in implementing a Reduction class
+    suitable for use in replacing the standard reduction mechanism
+    used in multiprocessing.'''
+    ForkingPickler = ForkingPickler
+    register = register
+    dump = dump
+    send_handle = send_handle
+    recv_handle = recv_handle
+
+    if sys.platform == 'win32':
+        steal_handle = steal_handle
+        duplicate = duplicate
+        DupHandle = DupHandle
+    else:
+        sendfds = sendfds
+        recvfds = recvfds
+        DupFd = DupFd
+
+    _reduce_method = _reduce_method
+    _reduce_method_descriptor = _reduce_method_descriptor
+    _rebuild_partial = _rebuild_partial
+    _reduce_socket = _reduce_socket
+    _rebuild_socket = _rebuild_socket
+
+    def __init__(self, *args):
+        register(type(_C().f), _reduce_method)
+        register(type(list.append), _reduce_method_descriptor)
+        register(type(int.__add__), _reduce_method_descriptor)
+        register(functools.partial, _reduce_partial)
+        register(socket.socket, _reduce_socket)
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
--- a/Lib/multiprocessing/resource_sharer.py
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -15,7 +15,7 @@
 import threading
 
 from . import process
-from . import reduction
+from .context import reduction
 from . import util
 
 __all__ = ['stop']
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -13,8 +13,8 @@
 from . import heap
 from . import get_context
 
-from .context import assert_spawning
-from .reduction import ForkingPickler
+from .context import reduction, assert_spawning
+_ForkingPickler = reduction.ForkingPickler
 
 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
 
@@ -134,7 +134,7 @@
 def rebuild_ctype(type_, wrapper, length):
     if length is not None:
         type_ = type_ * length
-    ForkingPickler.register(type_, reduce_ctype)
+    _ForkingPickler.register(type_, reduce_ctype)
     buf = wrapper.create_memoryview()
     obj = type_.from_buffer(buf)
     obj._wrapper = wrapper
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -9,13 +9,13 @@
 #
 
 import os
-import pickle
 import sys
 import runpy
 import types
 
 from . import get_start_method, set_start_method
 from . import process
+from .context import reduction
 from . import util
 
 __all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
@@ -96,8 +96,7 @@
     assert is_forking(sys.argv)
     if sys.platform == 'win32':
         import msvcrt
-        from .reduction import steal_handle
-        new_handle = steal_handle(parent_pid, pipe_handle)
+        new_handle = reduction.steal_handle(parent_pid, pipe_handle)
         fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
     else:
         from . import semaphore_tracker
@@ -111,9 +110,9 @@
     with os.fdopen(fd, 'rb', closefd=True) as from_parent:
         process.current_process()._inheriting = True
         try:
-            preparation_data = pickle.load(from_parent)
+            preparation_data = reduction.pickle.load(from_parent)
             prepare(preparation_data)
-            self = pickle.load(from_parent)
+            self = reduction.pickle.load(from_parent)
         finally:
             del process.current_process()._inheriting
     return self._bootstrap()

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list