[Python-checkins] cpython: Issue #18999: Make multiprocessing use context objects.

richard.oudkerk python-checkins at python.org
Wed Oct 16 17:44:04 CEST 2013


http://hg.python.org/cpython/rev/72a5ac909c7a
changeset:   86387:72a5ac909c7a
user:        Richard Oudkerk <shibturn at gmail.com>
date:        Wed Oct 16 16:41:56 2013 +0100
summary:
  Issue #18999: Make multiprocessing use context objects.

This allows different parts of a program to use different methods for
starting processes without interfering with each other.

files:
  Doc/library/multiprocessing.rst          |   78 ++-
  Lib/multiprocessing/__init__.py          |  260 +--------
  Lib/multiprocessing/context.py           |  348 +++++++++++
  Lib/multiprocessing/forkserver.py        |  224 +++---
  Lib/multiprocessing/heap.py              |    4 +-
  Lib/multiprocessing/managers.py          |   11 +-
  Lib/multiprocessing/pool.py              |   12 +-
  Lib/multiprocessing/popen.py             |   78 --
  Lib/multiprocessing/popen_fork.py        |    4 -
  Lib/multiprocessing/popen_forkserver.py  |   12 +-
  Lib/multiprocessing/popen_spawn_posix.py |   13 +-
  Lib/multiprocessing/popen_spawn_win32.py |   12 +-
  Lib/multiprocessing/process.py           |   21 +-
  Lib/multiprocessing/queues.py            |   29 +-
  Lib/multiprocessing/reduction.py         |    4 +-
  Lib/multiprocessing/semaphore_tracker.py |  100 +-
  Lib/multiprocessing/sharedctypes.py      |   37 +-
  Lib/multiprocessing/spawn.py             |    8 +-
  Lib/multiprocessing/synchronize.py       |   53 +-
  Lib/test/_test_multiprocessing.py        |   42 +-
  20 files changed, 736 insertions(+), 614 deletions(-)


diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -98,8 +98,8 @@
 
 
 
-Start methods
-~~~~~~~~~~~~~
+Contexts and start methods
+~~~~~~~~~~~~~~~~~~~~~~~~~~
 
 Depending on the platform, :mod:`multiprocessing` supports three ways
 to start a process.  These *start methods* are
@@ -132,7 +132,7 @@
     unnecessary resources are inherited.
 
     Available on Unix platforms which support passing file descriptors
-    over unix pipes.
+    over Unix pipes.
 
 Before Python 3.4 *fork* was the only option available on Unix.  Also,
 prior to Python 3.4, child processes would inherit all the parents
@@ -153,18 +153,46 @@
 
        import multiprocessing as mp
 
-       def foo():
-           print('hello')
+       def foo(q):
+           q.put('hello')
 
        if __name__ == '__main__':
            mp.set_start_method('spawn')
-           p = mp.Process(target=foo)
+           q = mp.Queue()
+           p = mp.Process(target=foo, args=(q,))
            p.start()
+           print(q.get())
            p.join()
 
 :func:`set_start_method` should not be used more than once in the
 program.
 
+Alternatively, you can use :func:`get_context` to obtain a context
+object.  Context objects have the same API as the multiprocessing
+module, and allow one to use multiple start methods in the same
+program. ::
+
+       import multiprocessing as mp
+
+       def foo(q):
+           q.put('hello')
+
+       if __name__ == '__main__':
+           ctx = mp.get_context('spawn')
+           q = ctx.Queue()
+           p = ctx.Process(target=foo, args=(q,))
+           p.start()
+           print(q.get())
+           p.join()
+
+Note that objects related to one context may not be compatible with
+processes for a different context.  In particular, locks created using
+the *fork* context cannot be passed to a processes started using the
+*spawn* or *forkserver* start methods.
+
+A library which wants to use a particular start method should probably
+use :func:`get_context` to avoid interfering with the choice of the
+library user.
 
 
 Exchanging objects between processes
@@ -859,11 +887,30 @@
 
    .. versionadded:: 3.4
 
-.. function:: get_start_method()
-
-   Return the current start method.  This can be ``'fork'``,
-   ``'spawn'`` or ``'forkserver'``.  ``'fork'`` is the default on
-   Unix, while ``'spawn'`` is the default on Windows.
+.. function:: get_context(method=None)
+
+   Return a context object which has the same attributes as the
+   :mod:`multiprocessing` module.
+
+   If *method* is *None* then the default context is returned.
+   Otherwise *method* should be ``'fork'``, ``'spawn'``,
+   ``'forkserver'``.  :exc:`ValueError` is raised if the specified
+   start method is not available.
+
+   .. versionadded:: 3.4
+
+.. function:: get_start_method(allow_none=False)
+
+   Return the name of start method used for starting processes.
+
+   If the start method has not been fixed and *allow_none* is false,
+   then the start method is fixed to the default and the name is
+   returned.  If the start method has not been fixed and *allow_none*
+   is true then *None* is returned.
+
+   The return value can be ``'fork'``, ``'spawn'``, ``'forkserver'``
+   or *None*.  ``'fork'`` is the default on Unix, while ``'spawn'`` is
+   the default on Windows.
 
    .. versionadded:: 3.4
 
@@ -1785,7 +1832,7 @@
 One can create a pool of processes which will carry out tasks submitted to it
 with the :class:`Pool` class.
 
-.. class:: Pool([processes[, initializer[, initargs[, maxtasksperchild]]]])
+.. class:: Pool([processes[, initializer[, initargs[, maxtasksperchild [, context]]]]])
 
    A process pool object which controls a pool of worker processes to which jobs
    can be submitted.  It supports asynchronous results with timeouts and
@@ -1805,6 +1852,13 @@
       unused resources to be freed. The default *maxtasksperchild* is None, which
       means worker processes will live as long as the pool.
 
+   .. versionadded:: 3.4
+      *context* can be used to specify the context used for starting
+      the worker processes.  Usually a pool is created using the
+      function :func:`multiprocessing.Pool` or the :meth:`Pool` method
+      of a context object.  In both cases *context* is set
+      appropriately.
+
    .. note::
 
       Worker processes within a :class:`Pool` typically live for the complete
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -12,27 +12,16 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
-__version__ = '0.70a1'
-
-__all__ = [
-    'Process', 'current_process', 'active_children', 'freeze_support',
-    'Manager', 'Pipe', 'cpu_count', 'log_to_stderr', 'get_logger',
-    'allow_connection_pickling', 'BufferTooShort', 'TimeoutError',
-    'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
-    'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool',
-    'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
-    'set_executable', 'set_start_method', 'get_start_method',
-    'get_all_start_methods', 'set_forkserver_preload'
-    ]
+import sys
+from . import context
 
 #
-# Imports
+# Copy stuff from default context
 #
 
-import os
-import sys
-
-from .process import Process, current_process, active_children
+globals().update((name, getattr(context._default_context, name))
+                 for name in context._default_context.__all__)
+__all__ = context._default_context.__all__
 
 #
 # XXX These should not really be documented or public.
@@ -47,240 +36,3 @@
 
 if '__main__' in sys.modules:
     sys.modules['__mp_main__'] = sys.modules['__main__']
-
-#
-# Exceptions
-#
-
-class ProcessError(Exception):
-    pass
-
-class BufferTooShort(ProcessError):
-    pass
-
-class TimeoutError(ProcessError):
-    pass
-
-class AuthenticationError(ProcessError):
-    pass
-
-#
-# Definitions not depending on native semaphores
-#
-
-def Manager():
-    '''
-    Returns a manager associated with a running server process
-
-    The managers methods such as `Lock()`, `Condition()` and `Queue()`
-    can be used to create shared objects.
-    '''
-    from .managers import SyncManager
-    m = SyncManager()
-    m.start()
-    return m
-
-def Pipe(duplex=True):
-    '''
-    Returns two connection object connected by a pipe
-    '''
-    from .connection import Pipe
-    return Pipe(duplex)
-
-def cpu_count():
-    '''
-    Returns the number of CPUs in the system
-    '''
-    num = os.cpu_count()
-    if num is None:
-        raise NotImplementedError('cannot determine number of cpus')
-    else:
-        return num
-
-def freeze_support():
-    '''
-    Check whether this is a fake forked process in a frozen executable.
-    If so then run code specified by commandline and exit.
-    '''
-    if sys.platform == 'win32' and getattr(sys, 'frozen', False):
-        from .spawn import freeze_support
-        freeze_support()
-
-def get_logger():
-    '''
-    Return package logger -- if it does not already exist then it is created
-    '''
-    from .util import get_logger
-    return get_logger()
-
-def log_to_stderr(level=None):
-    '''
-    Turn on logging and add a handler which prints to stderr
-    '''
-    from .util import log_to_stderr
-    return log_to_stderr(level)
-
-def allow_connection_pickling():
-    '''
-    Install support for sending connections and sockets between processes
-    '''
-    # This is undocumented.  In previous versions of multiprocessing
-    # its only effect was to make socket objects inheritable on Windows.
-    from . import connection
-
-#
-# Definitions depending on native semaphores
-#
-
-def Lock():
-    '''
-    Returns a non-recursive lock object
-    '''
-    from .synchronize import Lock
-    return Lock()
-
-def RLock():
-    '''
-    Returns a recursive lock object
-    '''
-    from .synchronize import RLock
-    return RLock()
-
-def Condition(lock=None):
-    '''
-    Returns a condition object
-    '''
-    from .synchronize import Condition
-    return Condition(lock)
-
-def Semaphore(value=1):
-    '''
-    Returns a semaphore object
-    '''
-    from .synchronize import Semaphore
-    return Semaphore(value)
-
-def BoundedSemaphore(value=1):
-    '''
-    Returns a bounded semaphore object
-    '''
-    from .synchronize import BoundedSemaphore
-    return BoundedSemaphore(value)
-
-def Event():
-    '''
-    Returns an event object
-    '''
-    from .synchronize import Event
-    return Event()
-
-def Barrier(parties, action=None, timeout=None):
-    '''
-    Returns a barrier object
-    '''
-    from .synchronize import Barrier
-    return Barrier(parties, action, timeout)
-
-def Queue(maxsize=0):
-    '''
-    Returns a queue object
-    '''
-    from .queues import Queue
-    return Queue(maxsize)
-
-def JoinableQueue(maxsize=0):
-    '''
-    Returns a queue object
-    '''
-    from .queues import JoinableQueue
-    return JoinableQueue(maxsize)
-
-def SimpleQueue():
-    '''
-    Returns a queue object
-    '''
-    from .queues import SimpleQueue
-    return SimpleQueue()
-
-def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
-    '''
-    Returns a process pool object
-    '''
-    from .pool import Pool
-    return Pool(processes, initializer, initargs, maxtasksperchild)
-
-def RawValue(typecode_or_type, *args):
-    '''
-    Returns a shared object
-    '''
-    from .sharedctypes import RawValue
-    return RawValue(typecode_or_type, *args)
-
-def RawArray(typecode_or_type, size_or_initializer):
-    '''
-    Returns a shared array
-    '''
-    from .sharedctypes import RawArray
-    return RawArray(typecode_or_type, size_or_initializer)
-
-def Value(typecode_or_type, *args, lock=True):
-    '''
-    Returns a synchronized shared object
-    '''
-    from .sharedctypes import Value
-    return Value(typecode_or_type, *args, lock=lock)
-
-def Array(typecode_or_type, size_or_initializer, *, lock=True):
-    '''
-    Returns a synchronized shared array
-    '''
-    from .sharedctypes import Array
-    return Array(typecode_or_type, size_or_initializer, lock=lock)
-
-#
-#
-#
-
-def set_executable(executable):
-    '''
-    Sets the path to a python.exe or pythonw.exe binary used to run
-    child processes instead of sys.executable when using the 'spawn'
-    start method.  Useful for people embedding Python.
-    '''
-    from .spawn import set_executable
-    set_executable(executable)
-
-def set_start_method(method):
-    '''
-    Set method for starting processes: 'fork', 'spawn' or 'forkserver'.
-    '''
-    from .popen import set_start_method
-    set_start_method(method)
-
-def get_start_method():
-    '''
-    Get method for starting processes: 'fork', 'spawn' or 'forkserver'.
-    '''
-    from .popen import get_start_method
-    return get_start_method()
-
-def get_all_start_methods():
-    '''
-    Get list of availables start methods, default first.
-    '''
-    from .popen import get_all_start_methods
-    return get_all_start_methods()
-
-def set_forkserver_preload(module_names):
-    '''
-    Set list of module names to try to load in the forkserver process
-    when it is started.  Properly chosen this can significantly reduce
-    the cost of starting a new process using the forkserver method.
-    The default list is ['__main__'].
-    '''
-    try:
-        from .forkserver import set_forkserver_preload
-    except ImportError:
-        pass
-    else:
-        set_forkserver_preload(module_names)
diff --git a/Lib/multiprocessing/context.py b/Lib/multiprocessing/context.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/context.py
@@ -0,0 +1,348 @@
+import os
+import sys
+import threading
+
+from . import process
+
+__all__ = []            # things are copied from here to __init__.py
+
+#
+# Exceptions
+#
+
+class ProcessError(Exception):
+    pass
+
+class BufferTooShort(ProcessError):
+    pass
+
+class TimeoutError(ProcessError):
+    pass
+
+class AuthenticationError(ProcessError):
+    pass
+
+#
+# Base type for contexts
+#
+
+class BaseContext(object):
+
+    ProcessError = ProcessError
+    BufferTooShort = BufferTooShort
+    TimeoutError = TimeoutError
+    AuthenticationError = AuthenticationError
+
+    current_process = staticmethod(process.current_process)
+    active_children = staticmethod(process.active_children)
+
+    def cpu_count(self):
+        '''Returns the number of CPUs in the system'''
+        num = os.cpu_count()
+        if num is None:
+            raise NotImplementedError('cannot determine number of cpus')
+        else:
+            return num
+
+    def Manager(self):
+        '''Returns a manager associated with a running server process
+
+        The managers methods such as `Lock()`, `Condition()` and `Queue()`
+        can be used to create shared objects.
+        '''
+        from .managers import SyncManager
+        m = SyncManager(ctx=self.get_context())
+        m.start()
+        return m
+
+    def Pipe(self, duplex=True):
+        '''Returns two connection object connected by a pipe'''
+        from .connection import Pipe
+        return Pipe(duplex)
+
+    def Lock(self):
+        '''Returns a non-recursive lock object'''
+        from .synchronize import Lock
+        return Lock(ctx=self.get_context())
+
+    def RLock(self):
+        '''Returns a recursive lock object'''
+        from .synchronize import RLock
+        return RLock(ctx=self.get_context())
+
+    def Condition(self, lock=None):
+        '''Returns a condition object'''
+        from .synchronize import Condition
+        return Condition(lock, ctx=self.get_context())
+
+    def Semaphore(self, value=1):
+        '''Returns a semaphore object'''
+        from .synchronize import Semaphore
+        return Semaphore(value, ctx=self.get_context())
+
+    def BoundedSemaphore(self, value=1):
+        '''Returns a bounded semaphore object'''
+        from .synchronize import BoundedSemaphore
+        return BoundedSemaphore(value, ctx=self.get_context())
+
+    def Event(self):
+        '''Returns an event object'''
+        from .synchronize import Event
+        return Event(ctx=self.get_context())
+
+    def Barrier(self, parties, action=None, timeout=None):
+        '''Returns a barrier object'''
+        from .synchronize import Barrier
+        return Barrier(parties, action, timeout, ctx=self.get_context())
+
+    def Queue(self, maxsize=0):
+        '''Returns a queue object'''
+        from .queues import Queue
+        return Queue(maxsize, ctx=self.get_context())
+
+    def JoinableQueue(self, maxsize=0):
+        '''Returns a queue object'''
+        from .queues import JoinableQueue
+        return JoinableQueue(maxsize, ctx=self.get_context())
+
+    def SimpleQueue(self):
+        '''Returns a queue object'''
+        from .queues import SimpleQueue
+        return SimpleQueue(ctx=self.get_context())
+
+    def Pool(self, processes=None, initializer=None, initargs=(),
+             maxtasksperchild=None):
+        '''Returns a process pool object'''
+        from .pool import Pool
+        return Pool(processes, initializer, initargs, maxtasksperchild,
+                    context=self.get_context())
+
+    def RawValue(self, typecode_or_type, *args):
+        '''Returns a shared object'''
+        from .sharedctypes import RawValue
+        return RawValue(typecode_or_type, *args)
+
+    def RawArray(self, typecode_or_type, size_or_initializer):
+        '''Returns a shared array'''
+        from .sharedctypes import RawArray
+        return RawArray(typecode_or_type, size_or_initializer)
+
+    def Value(self, typecode_or_type, *args, lock=True):
+        '''Returns a synchronized shared object'''
+        from .sharedctypes import Value
+        return Value(typecode_or_type, *args, lock=lock,
+                     ctx=self.get_context())
+
+    def Array(self, typecode_or_type, size_or_initializer, *, lock=True):
+        '''Returns a synchronized shared array'''
+        from .sharedctypes import Array
+        return Array(typecode_or_type, size_or_initializer, lock=lock,
+                     ctx=self.get_context())
+
+    def freeze_support(self):
+        '''Check whether this is a fake forked process in a frozen executable.
+        If so then run code specified by commandline and exit.
+        '''
+        if sys.platform == 'win32' and getattr(sys, 'frozen', False):
+            from .spawn import freeze_support
+            freeze_support()
+
+    def get_logger(self):
+        '''Return package logger -- if it does not already exist then
+        it is created.
+        '''
+        from .util import get_logger
+        return get_logger()
+
+    def log_to_stderr(self, level=None):
+        '''Turn on logging and add a handler which prints to stderr'''
+        from .util import log_to_stderr
+        return log_to_stderr(level)
+
+    def allow_connection_pickling(self):
+        '''Install support for sending connections and sockets
+        between processes
+        '''
+        # This is undocumented.  In previous versions of multiprocessing
+        # its only effect was to make socket objects inheritable on Windows.
+        from . import connection
+
+    def set_executable(self, executable):
+        '''Sets the path to a python.exe or pythonw.exe binary used to run
+        child processes instead of sys.executable when using the 'spawn'
+        start method.  Useful for people embedding Python.
+        '''
+        from .spawn import set_executable
+        set_executable(executable)
+
+    def set_forkserver_preload(self, module_names):
+        '''Set list of module names to try to load in forkserver process.
+        This is really just a hint.
+        '''
+        from .forkserver import set_forkserver_preload
+        set_forkserver_preload(module_names)
+
+    def get_context(self, method=None):
+        if method is None:
+            return self
+        try:
+            ctx = _concrete_contexts[method]
+        except KeyError:
+            raise ValueError('cannot find context for %r' % method)
+        ctx._check_available()
+        return ctx
+
+    def get_start_method(self, allow_none=False):
+        return self._name
+
+    def set_start_method(self, method=None):
+        raise ValueError('cannot set start method of concrete context')
+
+    def _check_available(self):
+        pass
+
+#
+# Type of default context -- underlying context can be set at most once
+#
+
+class Process(process.BaseProcess):
+    _start_method = None
+    @staticmethod
+    def _Popen(process_obj):
+        return _default_context.get_context().Process._Popen(process_obj)
+
+class DefaultContext(BaseContext):
+    Process = Process
+
+    def __init__(self, context):
+        self._default_context = context
+        self._actual_context = None
+
+    def get_context(self, method=None):
+        if method is None:
+            if self._actual_context is None:
+                self._actual_context = self._default_context
+            return self._actual_context
+        else:
+            return super().get_context(method)
+
+    def set_start_method(self, method, force=False):
+        if self._actual_context is not None and not force:
+            raise RuntimeError('context has already been set')
+        if method is None and force:
+            self._actual_context = None
+            return
+        self._actual_context = self.get_context(method)
+
+    def get_start_method(self, allow_none=False):
+        if self._actual_context is None:
+            if allow_none:
+                return None
+            self._actual_context = self._default_context
+        return self._actual_context._name
+
+    def get_all_start_methods(self):
+        if sys.platform == 'win32':
+            return ['spawn']
+        else:
+            from . import reduction
+            if reduction.HAVE_SEND_HANDLE:
+                return ['fork', 'spawn', 'forkserver']
+            else:
+                return ['fork', 'spawn']
+
+DefaultContext.__all__ = list(x for x in dir(DefaultContext) if x[0] != '_')
+
+#
+# Context types for fixed start method
+#
+
+if sys.platform != 'win32':
+
+    class ForkProcess(process.BaseProcess):
+        _start_method = 'fork'
+        @staticmethod
+        def _Popen(process_obj):
+            from .popen_fork import Popen
+            return Popen(process_obj)
+
+    class SpawnProcess(process.BaseProcess):
+        _start_method = 'spawn'
+        @staticmethod
+        def _Popen(process_obj):
+            from .popen_spawn_posix import Popen
+            return Popen(process_obj)
+
+    class ForkServerProcess(process.BaseProcess):
+        _start_method = 'forkserver'
+        @staticmethod
+        def _Popen(process_obj):
+            from .popen_forkserver import Popen
+            return Popen(process_obj)
+
+    class ForkContext(BaseContext):
+        _name = 'fork'
+        Process = ForkProcess
+
+    class SpawnContext(BaseContext):
+        _name = 'spawn'
+        Process = SpawnProcess
+
+    class ForkServerContext(BaseContext):
+        _name = 'forkserver'
+        Process = ForkServerProcess
+        def _check_available(self):
+            from . import reduction
+            if not reduction.HAVE_SEND_HANDLE:
+                raise ValueError('forkserver start method not available')
+
+    _concrete_contexts = {
+        'fork': ForkContext(),
+        'spawn': SpawnContext(),
+        'forkserver': ForkServerContext(),
+    }
+    _default_context = DefaultContext(_concrete_contexts['fork'])
+
+else:
+
+    class SpawnProcess(process.BaseProcess):
+        _start_method = 'spawn'
+        @staticmethod
+        def _Popen(process_obj):
+            from .popen_spawn_win32 import Popen
+            return Popen(process_obj)
+
+    class SpawnContext(BaseContext):
+        _name = 'spawn'
+        Process = SpawnProcess
+
+    _concrete_contexts = {
+        'spawn': SpawnContext(),
+    }
+    _default_context = DefaultContext(_concrete_contexts['spawn'])
+
+#
+# Force the start method
+#
+
+def _force_start_method(method):
+    _default_context._actual_context = _concrete_contexts[method]
+
+#
+# Check that the current thread is spawning a child process
+#
+
+_tls = threading.local()
+
+def get_spawning_popen():
+    return getattr(_tls, 'spawning_popen', None)
+
+def set_spawning_popen(popen):
+    _tls.spawning_popen = popen
+
+def assert_spawning(obj):
+    if get_spawning_popen() is None:
+        raise RuntimeError(
+            '%s objects should only be shared between processes'
+            ' through inheritance' % type(obj).__name__
+            )
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
--- a/Lib/multiprocessing/forkserver.py
+++ b/Lib/multiprocessing/forkserver.py
@@ -24,105 +24,113 @@
 MAXFDS_TO_SEND = 256
 UNSIGNED_STRUCT = struct.Struct('Q')     # large enough for pid_t
 
-_forkserver_address = None
-_forkserver_alive_fd = None
-_inherited_fds = None
-_lock = threading.Lock()
-_preload_modules = ['__main__']
+#
+# Forkserver class
+#
+
+class ForkServer(object):
+
+    def __init__(self):
+        self._forkserver_address = None
+        self._forkserver_alive_fd = None
+        self._inherited_fds = None
+        self._lock = threading.Lock()
+        self._preload_modules = ['__main__']
+
+    def set_forkserver_preload(self, modules_names):
+        '''Set list of module names to try to load in forkserver process.'''
+        if not all(type(mod) is str for mod in self._preload_modules):
+            raise TypeError('module_names must be a list of strings')
+        self._preload_modules = modules_names
+
+    def get_inherited_fds(self):
+        '''Return list of fds inherited from parent process.
+
+        This returns None if the current process was not started by fork
+        server.
+        '''
+        return self._inherited_fds
+
+    def connect_to_new_process(self, fds):
+        '''Request forkserver to create a child process.
+
+        Returns a pair of fds (status_r, data_w).  The calling process can read
+        the child process's pid and (eventually) its returncode from status_r.
+        The calling process should write to data_w the pickled preparation and
+        process data.
+        '''
+        self.ensure_running()
+        if len(fds) + 4 >= MAXFDS_TO_SEND:
+            raise ValueError('too many fds')
+        with socket.socket(socket.AF_UNIX) as client:
+            client.connect(self._forkserver_address)
+            parent_r, child_w = os.pipe()
+            child_r, parent_w = os.pipe()
+            allfds = [child_r, child_w, self._forkserver_alive_fd,
+                      semaphore_tracker.getfd()]
+            allfds += fds
+            try:
+                reduction.sendfds(client, allfds)
+                return parent_r, parent_w
+            except:
+                os.close(parent_r)
+                os.close(parent_w)
+                raise
+            finally:
+                os.close(child_r)
+                os.close(child_w)
+
+    def ensure_running(self):
+        '''Make sure that a fork server is running.
+
+        This can be called from any process.  Note that usually a child
+        process will just reuse the forkserver started by its parent, so
+        ensure_running() will do nothing.
+        '''
+        with self._lock:
+            semaphore_tracker.ensure_running()
+            if self._forkserver_alive_fd is not None:
+                return
+
+            cmd = ('from multiprocessing.forkserver import main; ' +
+                   'main(%d, %d, %r, **%r)')
+
+            if self._preload_modules:
+                desired_keys = {'main_path', 'sys_path'}
+                data = spawn.get_preparation_data('ignore')
+                data = dict((x,y) for (x,y) in data.items()
+                            if x in desired_keys)
+            else:
+                data = {}
+
+            with socket.socket(socket.AF_UNIX) as listener:
+                address = connection.arbitrary_address('AF_UNIX')
+                listener.bind(address)
+                os.chmod(address, 0o600)
+                listener.listen(100)
+
+                # all client processes own the write end of the "alive" pipe;
+                # when they all terminate the read end becomes ready.
+                alive_r, alive_w = os.pipe()
+                try:
+                    fds_to_pass = [listener.fileno(), alive_r]
+                    cmd %= (listener.fileno(), alive_r, self._preload_modules,
+                            data)
+                    exe = spawn.get_executable()
+                    args = [exe] + util._args_from_interpreter_flags()
+                    args += ['-c', cmd]
+                    pid = util.spawnv_passfds(exe, args, fds_to_pass)
+                except:
+                    os.close(alive_w)
+                    raise
+                finally:
+                    os.close(alive_r)
+                self._forkserver_address = address
+                self._forkserver_alive_fd = alive_w
 
 #
-# Public function
 #
-
-def set_forkserver_preload(modules_names):
-    '''Set list of module names to try to load in forkserver process.'''
-    global _preload_modules
-    _preload_modules = modules_names
-
-
-def get_inherited_fds():
-    '''Return list of fds inherited from parent process.
-
-    This returns None if the current process was not started by fork server.
-    '''
-    return _inherited_fds
-
-
-def connect_to_new_process(fds):
-    '''Request forkserver to create a child process.
-
-    Returns a pair of fds (status_r, data_w).  The calling process can read
-    the child process's pid and (eventually) its returncode from status_r.
-    The calling process should write to data_w the pickled preparation and
-    process data.
-    '''
-    if len(fds) + 4 >= MAXFDS_TO_SEND:
-        raise ValueError('too many fds')
-    with socket.socket(socket.AF_UNIX) as client:
-        client.connect(_forkserver_address)
-        parent_r, child_w = os.pipe()
-        child_r, parent_w = os.pipe()
-        allfds = [child_r, child_w, _forkserver_alive_fd,
-                  semaphore_tracker._semaphore_tracker_fd]
-        allfds += fds
-        try:
-            reduction.sendfds(client, allfds)
-            return parent_r, parent_w
-        except:
-            os.close(parent_r)
-            os.close(parent_w)
-            raise
-        finally:
-            os.close(child_r)
-            os.close(child_w)
-
-
-def ensure_running():
-    '''Make sure that a fork server is running.
-
-    This can be called from any process.  Note that usually a child
-    process will just reuse the forkserver started by its parent, so
-    ensure_running() will do nothing.
-    '''
-    global _forkserver_address, _forkserver_alive_fd
-    with _lock:
-        if _forkserver_alive_fd is not None:
-            return
-
-        assert all(type(mod) is str for mod in _preload_modules)
-        cmd = ('from multiprocessing.forkserver import main; ' +
-               'main(%d, %d, %r, **%r)')
-
-        if _preload_modules:
-            desired_keys = {'main_path', 'sys_path'}
-            data = spawn.get_preparation_data('ignore')
-            data = dict((x,y) for (x,y) in data.items() if x in desired_keys)
-        else:
-            data = {}
-
-        with socket.socket(socket.AF_UNIX) as listener:
-            address = connection.arbitrary_address('AF_UNIX')
-            listener.bind(address)
-            os.chmod(address, 0o600)
-            listener.listen(100)
-
-            # all client processes own the write end of the "alive" pipe;
-            # when they all terminate the read end becomes ready.
-            alive_r, alive_w = os.pipe()
-            try:
-                fds_to_pass = [listener.fileno(), alive_r]
-                cmd %= (listener.fileno(), alive_r, _preload_modules, data)
-                exe = spawn.get_executable()
-                args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
-                pid = util.spawnv_passfds(exe, args, fds_to_pass)
-            except:
-                os.close(alive_w)
-                raise
-            finally:
-                os.close(alive_r)
-            _forkserver_address = address
-            _forkserver_alive_fd = alive_w
-
+#
 
 def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
     '''Run forkserver.'''
@@ -151,8 +159,7 @@
     handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
     with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener, \
          selectors.DefaultSelector() as selector:
-        global _forkserver_address
-        _forkserver_address = listener.getsockname()
+        _forkserver._forkserver_address = listener.getsockname()
 
         selector.register(listener, selectors.EVENT_READ)
         selector.register(alive_r, selectors.EVENT_READ)
@@ -187,13 +194,7 @@
                 if e.errno != errno.ECONNABORTED:
                     raise
 
-#
-# Code to bootstrap new process
-#
-
 def _serve_one(s, listener, alive_r, handler):
-    global _inherited_fds, _forkserver_alive_fd
-
     # close unnecessary stuff and reset SIGCHLD handler
     listener.close()
     os.close(alive_r)
@@ -203,8 +204,9 @@
     fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
     s.close()
     assert len(fds) <= MAXFDS_TO_SEND
-    child_r, child_w, _forkserver_alive_fd, stfd, *_inherited_fds = fds
-    semaphore_tracker._semaphore_tracker_fd = stfd
+    (child_r, child_w, _forkserver._forkserver_alive_fd,
+     stfd, *_forkserver._inherited_fds) = fds
+    semaphore_tracker._semaphore_tracker._fd = stfd
 
     # send pid to client processes
     write_unsigned(child_w, os.getpid())
@@ -253,3 +255,13 @@
         if nbytes == 0:
             raise RuntimeError('should not get here')
         msg = msg[nbytes:]
+
+#
+#
+#
+
+_forkserver = ForkServer()
+ensure_running = _forkserver.ensure_running
+get_inherited_fds = _forkserver.get_inherited_fds
+connect_to_new_process = _forkserver.connect_to_new_process
+set_forkserver_preload = _forkserver.set_forkserver_preload
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -16,7 +16,7 @@
 import threading
 import _multiprocessing
 
-from . import popen
+from . import context
 from . import reduction
 from . import util
 
@@ -50,7 +50,7 @@
             self._state = (self.size, self.name)
 
         def __getstate__(self):
-            popen.assert_spawning(self)
+            context.assert_spawning(self)
             return self._state
 
         def __setstate__(self, state):
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -23,11 +23,12 @@
 from traceback import format_exc
 
 from . import connection
+from . import context
 from . import pool
 from . import process
-from . import popen
 from . import reduction
 from . import util
+from . import get_context
 
 #
 # Register some things for pickling
@@ -438,7 +439,8 @@
     _registry = {}
     _Server = Server
 
-    def __init__(self, address=None, authkey=None, serializer='pickle'):
+    def __init__(self, address=None, authkey=None, serializer='pickle',
+                 ctx=None):
         if authkey is None:
             authkey = process.current_process().authkey
         self._address = address     # XXX not final address if eg ('', 0)
@@ -447,6 +449,7 @@
         self._state.value = State.INITIAL
         self._serializer = serializer
         self._Listener, self._Client = listener_client[serializer]
+        self._ctx = ctx or get_context()
 
     def get_server(self):
         '''
@@ -478,7 +481,7 @@
         reader, writer = connection.Pipe(duplex=False)
 
         # spawn process which runs a server
-        self._process = process.Process(
+        self._process = self._ctx.Process(
             target=type(self)._run_server,
             args=(self._registry, self._address, self._authkey,
                   self._serializer, writer, initializer, initargs),
@@ -800,7 +803,7 @@
 
     def __reduce__(self):
         kwds = {}
-        if popen.get_spawning_popen() is not None:
+        if context.get_spawning_popen() is not None:
             kwds['authkey'] = self._authkey
 
         if getattr(self, '_isauto', False):
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -24,7 +24,7 @@
 # If threading is available then ThreadPool should be provided.  Therefore
 # we avoid top-level imports which are liable to fail on some systems.
 from . import util
-from . import Process, cpu_count, TimeoutError, SimpleQueue
+from . import get_context, cpu_count, TimeoutError
 
 #
 # Constants representing the state of a pool
@@ -137,10 +137,12 @@
     '''
     Class which supports an async version of applying functions to arguments.
     '''
-    Process = Process
+    def Process(self, *args, **kwds):
+        return self._ctx.Process(*args, **kwds)
 
     def __init__(self, processes=None, initializer=None, initargs=(),
-                 maxtasksperchild=None):
+                 maxtasksperchild=None, context=None):
+        self._ctx = context or get_context()
         self._setup_queues()
         self._taskqueue = queue.Queue()
         self._cache = {}
@@ -232,8 +234,8 @@
             self._repopulate_pool()
 
     def _setup_queues(self):
-        self._inqueue = SimpleQueue()
-        self._outqueue = SimpleQueue()
+        self._inqueue = self._ctx.SimpleQueue()
+        self._outqueue = self._ctx.SimpleQueue()
         self._quick_put = self._inqueue._writer.send
         self._quick_get = self._outqueue._reader.recv
 
diff --git a/Lib/multiprocessing/popen.py b/Lib/multiprocessing/popen.py
deleted file mode 100644
--- a/Lib/multiprocessing/popen.py
+++ /dev/null
@@ -1,78 +0,0 @@
-import sys
-import threading
-
-__all__ = ['Popen', 'get_spawning_popen', 'set_spawning_popen',
-           'assert_spawning']
-
-#
-# Check that the current thread is spawning a child process
-#
-
-_tls = threading.local()
-
-def get_spawning_popen():
-    return getattr(_tls, 'spawning_popen', None)
-
-def set_spawning_popen(popen):
-    _tls.spawning_popen = popen
-
-def assert_spawning(obj):
-    if get_spawning_popen() is None:
-        raise RuntimeError(
-            '%s objects should only be shared between processes'
-            ' through inheritance' % type(obj).__name__
-            )
-
-#
-#
-#
-
-_Popen = None
-
-def Popen(process_obj):
-    if _Popen is None:
-        set_start_method()
-    return _Popen(process_obj)
-
-def get_start_method():
-    if _Popen is None:
-        set_start_method()
-    return _Popen.method
-
-def set_start_method(meth=None, *, start_helpers=True):
-    global _Popen
-    try:
-        modname = _method_to_module[meth]
-        __import__(modname)
-    except (KeyError, ImportError):
-        raise ValueError('could not use start method %r' % meth)
-    module = sys.modules[modname]
-    if start_helpers:
-        module.Popen.ensure_helpers_running()
-    _Popen = module.Popen
-
-
-if sys.platform == 'win32':
-
-    _method_to_module = {
-        None: 'multiprocessing.popen_spawn_win32',
-        'spawn': 'multiprocessing.popen_spawn_win32',
-        }
-
-    def get_all_start_methods():
-        return ['spawn']
-
-else:
-    _method_to_module = {
-        None: 'multiprocessing.popen_fork',
-        'fork': 'multiprocessing.popen_fork',
-        'spawn': 'multiprocessing.popen_spawn_posix',
-        'forkserver': 'multiprocessing.popen_forkserver',
-        }
-
-    def get_all_start_methods():
-        from . import reduction
-        if reduction.HAVE_SEND_HANDLE:
-            return ['fork', 'spawn', 'forkserver']
-        else:
-            return ['fork', 'spawn']
diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py
--- a/Lib/multiprocessing/popen_fork.py
+++ b/Lib/multiprocessing/popen_fork.py
@@ -81,7 +81,3 @@
             os.close(child_w)
             util.Finalize(self, os.close, (parent_r,))
             self.sentinel = parent_r
-
-    @staticmethod
-    def ensure_helpers_running():
-        pass
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
--- a/Lib/multiprocessing/popen_forkserver.py
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -4,8 +4,8 @@
 from . import reduction
 if not reduction.HAVE_SEND_HANDLE:
     raise ImportError('No support for sending fds between processes')
+from . import context
 from . import forkserver
-from . import popen
 from . import popen_fork
 from . import spawn
 from . import util
@@ -42,12 +42,12 @@
     def _launch(self, process_obj):
         prep_data = spawn.get_preparation_data(process_obj._name)
         buf = io.BytesIO()
-        popen.set_spawning_popen(self)
+        context.set_spawning_popen(self)
         try:
             reduction.dump(prep_data, buf)
             reduction.dump(process_obj, buf)
         finally:
-            popen.set_spawning_popen(None)
+            context.set_spawning_popen(None)
 
         self.sentinel, w = forkserver.connect_to_new_process(self._fds)
         util.Finalize(self, os.close, (self.sentinel,))
@@ -67,9 +67,3 @@
                 # The process ended abnormally perhaps because of a signal
                 self.returncode = 255
         return self.returncode
-
-    @staticmethod
-    def ensure_helpers_running():
-        from . import semaphore_tracker
-        semaphore_tracker.ensure_running()
-        forkserver.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
--- a/Lib/multiprocessing/popen_spawn_posix.py
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -2,7 +2,7 @@
 import io
 import os
 
-from . import popen
+from . import context
 from . import popen_fork
 from . import reduction
 from . import spawn
@@ -41,16 +41,16 @@
 
     def _launch(self, process_obj):
         from . import semaphore_tracker
-        tracker_fd = semaphore_tracker._semaphore_tracker_fd
+        tracker_fd = semaphore_tracker.getfd()
         self._fds.append(tracker_fd)
         prep_data = spawn.get_preparation_data(process_obj._name)
         fp = io.BytesIO()
-        popen.set_spawning_popen(self)
+        context.set_spawning_popen(self)
         try:
             reduction.dump(prep_data, fp)
             reduction.dump(process_obj, fp)
         finally:
-            popen.set_spawning_popen(None)
+            context.set_spawning_popen(None)
 
         parent_r = child_w = child_r = parent_w = None
         try:
@@ -70,8 +70,3 @@
             for fd in (child_r, child_w, parent_w):
                 if fd is not None:
                     os.close(fd)
-
-    @staticmethod
-    def ensure_helpers_running():
-        from . import semaphore_tracker
-        semaphore_tracker.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
--- a/Lib/multiprocessing/popen_spawn_win32.py
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -4,8 +4,8 @@
 import sys
 import _winapi
 
+from . import context
 from . import spawn
-from . import popen
 from . import reduction
 from . import util
 
@@ -60,15 +60,15 @@
             util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
 
             # send information to child
-            popen.set_spawning_popen(self)
+            context.set_spawning_popen(self)
             try:
                 reduction.dump(prep_data, to_child)
                 reduction.dump(process_obj, to_child)
             finally:
-                popen.set_spawning_popen(None)
+                context.set_spawning_popen(None)
 
     def duplicate_for_child(self, handle):
-        assert self is popen.get_spawning_popen()
+        assert self is context.get_spawning_popen()
         return reduction.duplicate(handle, self.sentinel)
 
     def wait(self, timeout=None):
@@ -97,7 +97,3 @@
             except OSError:
                 if self.wait(timeout=1.0) is None:
                     raise
-
-    @staticmethod
-    def ensure_helpers_running():
-        pass
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -7,7 +7,7 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
-__all__ = ['Process', 'current_process', 'active_children']
+__all__ = ['BaseProcess', 'current_process', 'active_children']
 
 #
 # Imports
@@ -59,13 +59,14 @@
 # The `Process` class
 #
 
-class Process(object):
+class BaseProcess(object):
     '''
     Process objects represent activity that is run in a separate process
 
     The class is analogous to `threading.Thread`
     '''
-    _Popen = None
+    def _Popen(self):
+        raise NotImplementedError
 
     def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
                  *, daemon=None):
@@ -101,11 +102,7 @@
         assert not _current_process._config.get('daemon'), \
                'daemonic processes are not allowed to have children'
         _cleanup()
-        if self._Popen is not None:
-            Popen = self._Popen
-        else:
-            from .popen import Popen
-        self._popen = Popen(self)
+        self._popen = self._Popen(self)
         self._sentinel = self._popen.sentinel
         _children.add(self)
 
@@ -229,10 +226,12 @@
     ##
 
     def _bootstrap(self):
-        from . import util
+        from . import util, context
         global _current_process, _process_counter, _children
 
         try:
+            if self._start_method is not None:
+                context._force_start_method(self._start_method)
             _process_counter = itertools.count(1)
             _children = set()
             if sys.stdin is not None:
@@ -282,7 +281,7 @@
 
 class AuthenticationString(bytes):
     def __reduce__(self):
-        from .popen import get_spawning_popen
+        from .context import get_spawning_popen
         if get_spawning_popen() is None:
             raise TypeError(
                 'Pickling an AuthenticationString object is '
@@ -294,7 +293,7 @@
 # Create object representing the main process
 #
 
-class _MainProcess(Process):
+class _MainProcess(BaseProcess):
 
     def __init__(self):
         self._identity = ()
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -22,8 +22,7 @@
 import _multiprocessing
 
 from . import connection
-from . import popen
-from . import synchronize
+from . import context
 
 from .util import debug, info, Finalize, register_after_fork, is_exiting
 from .reduction import ForkingPickler
@@ -34,18 +33,18 @@
 
 class Queue(object):
 
-    def __init__(self, maxsize=0):
+    def __init__(self, maxsize=0, *, ctx):
         if maxsize <= 0:
             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
         self._maxsize = maxsize
         self._reader, self._writer = connection.Pipe(duplex=False)
-        self._rlock = synchronize.Lock()
+        self._rlock = ctx.Lock()
         self._opid = os.getpid()
         if sys.platform == 'win32':
             self._wlock = None
         else:
-            self._wlock = synchronize.Lock()
-        self._sem = synchronize.BoundedSemaphore(maxsize)
+            self._wlock = ctx.Lock()
+        self._sem = ctx.BoundedSemaphore(maxsize)
         # For use by concurrent.futures
         self._ignore_epipe = False
 
@@ -55,7 +54,7 @@
             register_after_fork(self, Queue._after_fork)
 
     def __getstate__(self):
-        popen.assert_spawning(self)
+        context.assert_spawning(self)
         return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                 self._rlock, self._wlock, self._sem, self._opid)
 
@@ -279,10 +278,10 @@
 
 class JoinableQueue(Queue):
 
-    def __init__(self, maxsize=0):
-        Queue.__init__(self, maxsize)
-        self._unfinished_tasks = synchronize.Semaphore(0)
-        self._cond = synchronize.Condition()
+    def __init__(self, maxsize=0, *, ctx):
+        Queue.__init__(self, maxsize, ctx=ctx)
+        self._unfinished_tasks = ctx.Semaphore(0)
+        self._cond = ctx.Condition()
 
     def __getstate__(self):
         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
@@ -332,20 +331,20 @@
 
 class SimpleQueue(object):
 
-    def __init__(self):
+    def __init__(self, *, ctx):
         self._reader, self._writer = connection.Pipe(duplex=False)
-        self._rlock = synchronize.Lock()
+        self._rlock = ctx.Lock()
         self._poll = self._reader.poll
         if sys.platform == 'win32':
             self._wlock = None
         else:
-            self._wlock = synchronize.Lock()
+            self._wlock = ctx.Lock()
 
     def empty(self):
         return not self._poll()
 
     def __getstate__(self):
-        popen.assert_spawning(self)
+        context.assert_spawning(self)
         return (self._reader, self._writer, self._rlock, self._wlock)
 
     def __setstate__(self, state):
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -15,7 +15,7 @@
 import socket
 import sys
 
-from . import popen
+from . import context
 from . import util
 
 __all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
@@ -183,7 +183,7 @@
 
     def DupFd(fd):
         '''Return a wrapper for an fd.'''
-        popen_obj = popen.get_spawning_popen()
+        popen_obj = context.get_spawning_popen()
         if popen_obj is not None:
             return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
         elif HAVE_SEND_HANDLE:
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
--- a/Lib/multiprocessing/semaphore_tracker.py
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -26,60 +26,70 @@
 __all__ = ['ensure_running', 'register', 'unregister']
 
 
-_semaphore_tracker_fd = None
-_lock = threading.Lock()
+class SemaphoreTracker(object):
 
+    def __init__(self):
+        self._lock = threading.Lock()
+        self._fd = None
 
-def ensure_running():
-    '''Make sure that semaphore tracker process is running.
+    def getfd(self):
+        self.ensure_running()
+        return self._fd
 
-    This can be run from any process.  Usually a child process will use
-    the semaphore created by its parent.'''
-    global _semaphore_tracker_fd
-    with _lock:
-        if _semaphore_tracker_fd is not None:
-            return
-        fds_to_pass = []
-        try:
-            fds_to_pass.append(sys.stderr.fileno())
-        except Exception:
-            pass
-        cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
-        r, w = os.pipe()
-        try:
-            fds_to_pass.append(r)
-            # process will out live us, so no need to wait on pid
-            exe = spawn.get_executable()
-            args = [exe] + util._args_from_interpreter_flags()
-            args += ['-c', cmd % r]
-            util.spawnv_passfds(exe, args, fds_to_pass)
-        except:
-            os.close(w)
-            raise
-        else:
-            _semaphore_tracker_fd = w
-        finally:
-            os.close(r)
+    def ensure_running(self):
+        '''Make sure that semaphore tracker process is running.
 
+        This can be run from any process.  Usually a child process will use
+        the semaphore created by its parent.'''
+        with self._lock:
+            if self._fd is not None:
+                return
+            fds_to_pass = []
+            try:
+                fds_to_pass.append(sys.stderr.fileno())
+            except Exception:
+                pass
+            cmd = 'from multiprocessing.semaphore_tracker import main;main(%d)'
+            r, w = os.pipe()
+            try:
+                fds_to_pass.append(r)
+                # process will out live us, so no need to wait on pid
+                exe = spawn.get_executable()
+                args = [exe] + util._args_from_interpreter_flags()
+                args += ['-c', cmd % r]
+                util.spawnv_passfds(exe, args, fds_to_pass)
+            except:
+                os.close(w)
+                raise
+            else:
+                self._fd = w
+            finally:
+                os.close(r)
 
-def register(name):
-    '''Register name of semaphore with semaphore tracker.'''
-    _send('REGISTER', name)
+    def register(self, name):
+        '''Register name of semaphore with semaphore tracker.'''
+        self._send('REGISTER', name)
 
+    def unregister(self, name):
+        '''Unregister name of semaphore with semaphore tracker.'''
+        self._send('UNREGISTER', name)
 
-def unregister(name):
-    '''Unregister name of semaphore with semaphore tracker.'''
-    _send('UNREGISTER', name)
+    def _send(self, cmd, name):
+        self.ensure_running()
+        msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+        if len(name) > 512:
+            # posix guarantees that writes to a pipe of less than PIPE_BUF
+            # bytes are atomic, and that PIPE_BUF >= 512
+            raise ValueError('name too long')
+        nbytes = os.write(self._fd, msg)
+        assert nbytes == len(msg)
 
 
-def _send(cmd, name):
-    msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
-    if len(name) > 512:
-        # posix guarantees that writes to a pipe of less than PIPE_BUF
-        # bytes are atomic, and that PIPE_BUF >= 512
-        raise ValueError('name too long')
-    nbytes = os.write(_semaphore_tracker_fd, msg)
-    assert nbytes == len(msg)
+_semaphore_tracker = SemaphoreTracker()
+ensure_running = _semaphore_tracker.ensure_running
+register = _semaphore_tracker.register
+unregister = _semaphore_tracker.unregister
+getfd = _semaphore_tracker.getfd
 
 
 def main(fd):
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -11,10 +11,10 @@
 import weakref
 
 from . import heap
+from . import get_context
 
-from .synchronize import RLock
+from .context import assert_spawning
 from .reduction import ForkingPickler
-from .popen import assert_spawning
 
 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
 
@@ -66,7 +66,7 @@
         result.__init__(*size_or_initializer)
         return result
 
-def Value(typecode_or_type, *args, lock=True):
+def Value(typecode_or_type, *args, lock=True, ctx=None):
     '''
     Return a synchronization wrapper for a Value
     '''
@@ -74,12 +74,13 @@
     if lock is False:
         return obj
     if lock in (True, None):
-        lock = RLock()
+        ctx = ctx or get_context()
+        lock = ctx.RLock()
     if not hasattr(lock, 'acquire'):
         raise AttributeError("'%r' has no method 'acquire'" % lock)
-    return synchronized(obj, lock)
+    return synchronized(obj, lock, ctx=ctx)
 
-def Array(typecode_or_type, size_or_initializer, *, lock=True):
+def Array(typecode_or_type, size_or_initializer, *, lock=True, ctx=None):
     '''
     Return a synchronization wrapper for a RawArray
     '''
@@ -87,25 +88,27 @@
     if lock is False:
         return obj
     if lock in (True, None):
-        lock = RLock()
+        ctx = ctx or get_context()
+        lock = ctx.RLock()
     if not hasattr(lock, 'acquire'):
         raise AttributeError("'%r' has no method 'acquire'" % lock)
-    return synchronized(obj, lock)
+    return synchronized(obj, lock, ctx=ctx)
 
 def copy(obj):
     new_obj = _new_value(type(obj))
     ctypes.pointer(new_obj)[0] = obj
     return new_obj
 
-def synchronized(obj, lock=None):
+def synchronized(obj, lock=None, ctx=None):
     assert not isinstance(obj, SynchronizedBase), 'object already synchronized'
+    ctx = ctx or get_context()
 
     if isinstance(obj, ctypes._SimpleCData):
-        return Synchronized(obj, lock)
+        return Synchronized(obj, lock, ctx)
     elif isinstance(obj, ctypes.Array):
         if obj._type_ is ctypes.c_char:
-            return SynchronizedString(obj, lock)
-        return SynchronizedArray(obj, lock)
+            return SynchronizedString(obj, lock, ctx)
+        return SynchronizedArray(obj, lock, ctx)
     else:
         cls = type(obj)
         try:
@@ -115,7 +118,7 @@
             d = dict((name, make_property(name)) for name in names)
             classname = 'Synchronized' + cls.__name__
             scls = class_cache[cls] = type(classname, (SynchronizedBase,), d)
-        return scls(obj, lock)
+        return scls(obj, lock, ctx)
 
 #
 # Functions for pickling/unpickling
@@ -175,9 +178,13 @@
 
 class SynchronizedBase(object):
 
-    def __init__(self, obj, lock=None):
+    def __init__(self, obj, lock=None, ctx=None):
         self._obj = obj
-        self._lock = lock or RLock()
+        if lock:
+            self._lock = lock
+        else:
+            ctx = ctx or get_context(force=True)
+            self._lock = ctx.RLock()
         self.acquire = self._lock.acquire
         self.release = self._lock.release
 
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
--- a/Lib/multiprocessing/spawn.py
+++ b/Lib/multiprocessing/spawn.py
@@ -12,9 +12,9 @@
 import pickle
 import sys
 
+from . import get_start_method, set_start_method
 from . import process
 from . import util
-from . import popen
 
 __all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
            'get_preparation_data', 'get_command_line', 'import_main_path']
@@ -91,7 +91,7 @@
         fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
     else:
         from . import semaphore_tracker
-        semaphore_tracker._semaphore_tracker_fd = tracker_fd
+        semaphore_tracker._semaphore_tracker._fd = tracker_fd
         fd = pipe_handle
     exitcode = _main(fd)
     sys.exit(exitcode)
@@ -154,7 +154,7 @@
         sys_argv=sys.argv,
         orig_dir=process.ORIGINAL_DIR,
         dir=os.getcwd(),
-        start_method=popen.get_start_method(),
+        start_method=get_start_method(),
         )
 
     if sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
@@ -204,7 +204,7 @@
         process.ORIGINAL_DIR = data['orig_dir']
 
     if 'start_method' in data:
-        popen.set_start_method(data['start_method'], start_helpers=False)
+        set_start_method(data['start_method'])
 
     if 'main_path' in data:
         import_main_path(data['main_path'])
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -20,7 +20,7 @@
 
 from time import time as _time
 
-from . import popen
+from . import context
 from . import process
 from . import util
 
@@ -50,14 +50,15 @@
 
     _rand = tempfile._RandomNameSequence()
 
-    def __init__(self, kind, value, maxvalue):
-        unlink_immediately = (sys.platform == 'win32' or
-                              popen.get_start_method() == 'fork')
+    def __init__(self, kind, value, maxvalue, *, ctx):
+        ctx = ctx or get_context()
+        ctx = ctx.get_context()
+        unlink_now = sys.platform == 'win32' or ctx._name == 'fork'
         for i in range(100):
             try:
                 sl = self._semlock = _multiprocessing.SemLock(
                     kind, value, maxvalue, self._make_name(),
-                    unlink_immediately)
+                    unlink_now)
             except FileExistsError:
                 pass
             else:
@@ -99,10 +100,10 @@
         return self._semlock.__exit__(*args)
 
     def __getstate__(self):
-        popen.assert_spawning(self)
+        context.assert_spawning(self)
         sl = self._semlock
         if sys.platform == 'win32':
-            h = popen.get_spawning_popen().duplicate_for_child(sl.handle)
+            h = context.get_spawning_popen().duplicate_for_child(sl.handle)
         else:
             h = sl.handle
         return (h, sl.kind, sl.maxvalue, sl.name)
@@ -123,8 +124,8 @@
 
 class Semaphore(SemLock):
 
-    def __init__(self, value=1):
-        SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX)
+    def __init__(self, value=1, *, ctx):
+        SemLock.__init__(self, SEMAPHORE, value, SEM_VALUE_MAX, ctx=ctx)
 
     def get_value(self):
         return self._semlock._get_value()
@@ -142,8 +143,8 @@
 
 class BoundedSemaphore(Semaphore):
 
-    def __init__(self, value=1):
-        SemLock.__init__(self, SEMAPHORE, value, value)
+    def __init__(self, value=1, *, ctx):
+        SemLock.__init__(self, SEMAPHORE, value, value, ctx=ctx)
 
     def __repr__(self):
         try:
@@ -159,8 +160,8 @@
 
 class Lock(SemLock):
 
-    def __init__(self):
-        SemLock.__init__(self, SEMAPHORE, 1, 1)
+    def __init__(self, *, ctx):
+        SemLock.__init__(self, SEMAPHORE, 1, 1, ctx=ctx)
 
     def __repr__(self):
         try:
@@ -184,8 +185,8 @@
 
 class RLock(SemLock):
 
-    def __init__(self):
-        SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1)
+    def __init__(self, *, ctx):
+        SemLock.__init__(self, RECURSIVE_MUTEX, 1, 1, ctx=ctx)
 
     def __repr__(self):
         try:
@@ -210,15 +211,15 @@
 
 class Condition(object):
 
-    def __init__(self, lock=None):
-        self._lock = lock or RLock()
-        self._sleeping_count = Semaphore(0)
-        self._woken_count = Semaphore(0)
-        self._wait_semaphore = Semaphore(0)
+    def __init__(self, lock=None, *, ctx):
+        self._lock = lock or ctx.RLock()
+        self._sleeping_count = ctx.Semaphore(0)
+        self._woken_count = ctx.Semaphore(0)
+        self._wait_semaphore = ctx.Semaphore(0)
         self._make_methods()
 
     def __getstate__(self):
-        popen.assert_spawning(self)
+        context.assert_spawning(self)
         return (self._lock, self._sleeping_count,
                 self._woken_count, self._wait_semaphore)
 
@@ -332,9 +333,9 @@
 
 class Event(object):
 
-    def __init__(self):
-        self._cond = Condition(Lock())
-        self._flag = Semaphore(0)
+    def __init__(self, *, ctx):
+        self._cond = ctx.Condition(ctx.Lock())
+        self._flag = ctx.Semaphore(0)
 
     def is_set(self):
         self._cond.acquire()
@@ -383,11 +384,11 @@
 
 class Barrier(threading.Barrier):
 
-    def __init__(self, parties, action=None, timeout=None):
+    def __init__(self, parties, action=None, timeout=None, *, ctx):
         import struct
         from .heap import BufferWrapper
         wrapper = BufferWrapper(struct.calcsize('i') * 2)
-        cond = Condition()
+        cond = ctx.Condition()
         self.__setstate__((parties, action, timeout, cond, wrapper))
         self._state = 0
         self._count = 0
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -3555,6 +3555,32 @@
             conn.close()
 
 class TestStartMethod(unittest.TestCase):
+    @classmethod
+    def _check_context(cls, conn):
+        conn.send(multiprocessing.get_start_method())
+
+    def check_context(self, ctx):
+        r, w = ctx.Pipe(duplex=False)
+        p = ctx.Process(target=self._check_context, args=(w,))
+        p.start()
+        w.close()
+        child_method = r.recv()
+        r.close()
+        p.join()
+        self.assertEqual(child_method, ctx.get_start_method())
+
+    def test_context(self):
+        for method in ('fork', 'spawn', 'forkserver'):
+            try:
+                ctx = multiprocessing.get_context(method)
+            except ValueError:
+                continue
+            self.assertEqual(ctx.get_start_method(), method)
+            self.assertIs(ctx.get_context(), ctx)
+            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
+            self.assertRaises(ValueError, ctx.set_start_method, None)
+            self.check_context(ctx)
+
     def test_set_get(self):
         multiprocessing.set_forkserver_preload(PRELOAD)
         count = 0
@@ -3562,13 +3588,19 @@
         try:
             for method in ('fork', 'spawn', 'forkserver'):
                 try:
-                    multiprocessing.set_start_method(method)
+                    multiprocessing.set_start_method(method, force=True)
                 except ValueError:
                     continue
                 self.assertEqual(multiprocessing.get_start_method(), method)
+                ctx = multiprocessing.get_context()
+                self.assertEqual(ctx.get_start_method(), method)
+                self.assertTrue(type(ctx).__name__.lower().startswith(method))
+                self.assertTrue(
+                    ctx.Process.__name__.lower().startswith(method))
+                self.check_context(multiprocessing)
                 count += 1
         finally:
-            multiprocessing.set_start_method(old_method)
+            multiprocessing.set_start_method(old_method, force=True)
         self.assertGreaterEqual(count, 1)
 
     def test_get_all(self):
@@ -3753,9 +3785,9 @@
         multiprocessing.process._cleanup()
         dangling[0] = multiprocessing.process._dangling.copy()
         dangling[1] = threading._dangling.copy()
-        old_start_method[0] = multiprocessing.get_start_method()
+        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
         try:
-            multiprocessing.set_start_method(start_method)
+            multiprocessing.set_start_method(start_method, force=True)
         except ValueError:
             raise unittest.SkipTest(start_method +
                                     ' start method not supported')
@@ -3771,7 +3803,7 @@
         multiprocessing.get_logger().setLevel(LOG_LEVEL)
 
     def tearDownModule():
-        multiprocessing.set_start_method(old_start_method[0])
+        multiprocessing.set_start_method(old_start_method[0], force=True)
         # pause a bit so we don't get warning about dangling threads/processes
         time.sleep(0.5)
         multiprocessing.process._cleanup()

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list