[Python-checkins] cpython: Issue #8713: Support alternative start methods in multiprocessing on Unix.

richard.oudkerk python-checkins at python.org
Wed Aug 14 16:49:01 CEST 2013


http://hg.python.org/cpython/rev/3b82e0d83bf9
changeset:   85169:3b82e0d83bf9
parent:      85167:55c55bfe237b
user:        Richard Oudkerk <shibturn at gmail.com>
date:        Wed Aug 14 15:35:41 2013 +0100
summary:
  Issue #8713: Support alternative start methods in multiprocessing on Unix.

See http://hg.python.org/sandbox/sbt#spawn

files:
  Doc/includes/mp_benchmarks.py               |  239 ----
  Doc/includes/mp_newtype.py                  |   14 +-
  Doc/includes/mp_pool.py                     |  337 +-----
  Doc/includes/mp_synchronize.py              |  278 -----
  Doc/includes/mp_webserver.py                |   70 -
  Doc/includes/mp_workers.py                  |   13 -
  Doc/library/multiprocessing.rst             |  246 +++-
  Doc/whatsnew/3.4.rst                        |   13 +
  Lib/multiprocessing/__init__.py             |  107 +-
  Lib/multiprocessing/connection.py           |   61 +-
  Lib/multiprocessing/dummy/__init__.py       |    4 +-
  Lib/multiprocessing/forking.py              |  477 ---------
  Lib/multiprocessing/forkserver.py           |  238 ++++
  Lib/multiprocessing/heap.py                 |   56 +-
  Lib/multiprocessing/managers.py             |   44 +-
  Lib/multiprocessing/pool.py                 |   84 +-
  Lib/multiprocessing/popen.py                |   78 +
  Lib/multiprocessing/popen_fork.py           |   87 +
  Lib/multiprocessing/popen_forkserver.py     |   75 +
  Lib/multiprocessing/popen_spawn_posix.py    |   75 +
  Lib/multiprocessing/popen_spawn_win32.py    |  102 ++
  Lib/multiprocessing/process.py              |   60 +-
  Lib/multiprocessing/queues.py               |   36 +-
  Lib/multiprocessing/reduction.py            |  361 +++----
  Lib/multiprocessing/resource_sharer.py      |  158 +++
  Lib/multiprocessing/semaphore_tracker.py    |  135 ++
  Lib/multiprocessing/sharedctypes.py         |    7 +-
  Lib/multiprocessing/spawn.py                |  258 +++++
  Lib/multiprocessing/synchronize.py          |   73 +-
  Lib/multiprocessing/util.py                 |   70 +-
  Lib/test/test_multiprocessing.py            |  495 ++++++---
  Lib/test/mp_fork_bomb.py                    |    5 +
  Lib/test/regrtest.py                        |    2 +-
  Lib/test/test_multiprocessing_fork.py       |    7 +
  Lib/test/test_multiprocessing_forkserver.py |    7 +
  Lib/test/test_multiprocessing_spawn.py      |    7 +
  Makefile.pre.in                             |    4 +-
  Modules/_multiprocessing/multiprocessing.c  |    1 +
  Modules/_multiprocessing/multiprocessing.h  |    1 +
  Modules/_multiprocessing/semaphore.c        |   78 +-
  40 files changed, 2442 insertions(+), 2021 deletions(-)


diff --git a/Doc/includes/mp_benchmarks.py b/Doc/includes/mp_benchmarks.py
deleted file mode 100644
--- a/Doc/includes/mp_benchmarks.py
+++ /dev/null
@@ -1,239 +0,0 @@
-#
-# Simple benchmarks for the multiprocessing package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import multiprocessing
-import threading
-import queue
-import gc
-
-_timer = time.perf_counter
-
-delta = 1
-
-
-#### TEST_QUEUESPEED
-
-def queuespeed_func(q, c, iterations):
-    a = '0' * 256
-    c.acquire()
-    c.notify()
-    c.release()
-
-    for i in range(iterations):
-        q.put(a)
-
-    q.put('STOP')
-
-def test_queuespeed(Process, q, c):
-    elapsed = 0
-    iterations = 1
-
-    while elapsed < delta:
-        iterations *= 2
-
-        p = Process(target=queuespeed_func, args=(q, c, iterations))
-        c.acquire()
-        p.start()
-        c.wait()
-        c.release()
-
-        result = None
-        t = _timer()
-
-        while result != 'STOP':
-            result = q.get()
-
-        elapsed = _timer() - t
-
-        p.join()
-
-    print(iterations, 'objects passed through the queue in', elapsed, 'seconds')
-    print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_PIPESPEED
-
-def pipe_func(c, cond, iterations):
-    a = '0' * 256
-    cond.acquire()
-    cond.notify()
-    cond.release()
-
-    for i in range(iterations):
-        c.send(a)
-
-    c.send('STOP')
-
-def test_pipespeed():
-    c, d = multiprocessing.Pipe()
-    cond = multiprocessing.Condition()
-    elapsed = 0
-    iterations = 1
-
-    while elapsed < delta:
-        iterations *= 2
-
-        p = multiprocessing.Process(target=pipe_func,
-                                    args=(d, cond, iterations))
-        cond.acquire()
-        p.start()
-        cond.wait()
-        cond.release()
-
-        result = None
-        t = _timer()
-
-        while result != 'STOP':
-            result = c.recv()
-
-        elapsed = _timer() - t
-        p.join()
-
-    print(iterations, 'objects passed through connection in',elapsed,'seconds')
-    print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_SEQSPEED
-
-def test_seqspeed(seq):
-    elapsed = 0
-    iterations = 1
-
-    while elapsed < delta:
-        iterations *= 2
-
-        t = _timer()
-
-        for i in range(iterations):
-            a = seq[5]
-
-        elapsed = _timer() - t
-
-    print(iterations, 'iterations in', elapsed, 'seconds')
-    print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_LOCK
-
-def test_lockspeed(l):
-    elapsed = 0
-    iterations = 1
-
-    while elapsed < delta:
-        iterations *= 2
-
-        t = _timer()
-
-        for i in range(iterations):
-            l.acquire()
-            l.release()
-
-        elapsed = _timer() - t
-
-    print(iterations, 'iterations in', elapsed, 'seconds')
-    print('average number/sec:', iterations/elapsed)
-
-
-#### TEST_CONDITION
-
-def conditionspeed_func(c, N):
-    c.acquire()
-    c.notify()
-
-    for i in range(N):
-        c.wait()
-        c.notify()
-
-    c.release()
-
-def test_conditionspeed(Process, c):
-    elapsed = 0
-    iterations = 1
-
-    while elapsed < delta:
-        iterations *= 2
-
-        c.acquire()
-        p = Process(target=conditionspeed_func, args=(c, iterations))
-        p.start()
-
-        c.wait()
-
-        t = _timer()
-
-        for i in range(iterations):
-            c.notify()
-            c.wait()
-
-        elapsed = _timer() - t
-
-        c.release()
-        p.join()
-
-    print(iterations * 2, 'waits in', elapsed, 'seconds')
-    print('average number/sec:', iterations * 2 / elapsed)
-
-####
-
-def test():
-    manager = multiprocessing.Manager()
-
-    gc.disable()
-
-    print('\n\t######## testing Queue.Queue\n')
-    test_queuespeed(threading.Thread, queue.Queue(),
-                    threading.Condition())
-    print('\n\t######## testing multiprocessing.Queue\n')
-    test_queuespeed(multiprocessing.Process, multiprocessing.Queue(),
-                    multiprocessing.Condition())
-    print('\n\t######## testing Queue managed by server process\n')
-    test_queuespeed(multiprocessing.Process, manager.Queue(),
-                    manager.Condition())
-    print('\n\t######## testing multiprocessing.Pipe\n')
-    test_pipespeed()
-
-    print()
-
-    print('\n\t######## testing list\n')
-    test_seqspeed(list(range(10)))
-    print('\n\t######## testing list managed by server process\n')
-    test_seqspeed(manager.list(list(range(10))))
-    print('\n\t######## testing Array("i", ..., lock=False)\n')
-    test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=False))
-    print('\n\t######## testing Array("i", ..., lock=True)\n')
-    test_seqspeed(multiprocessing.Array('i', list(range(10)), lock=True))
-
-    print()
-
-    print('\n\t######## testing threading.Lock\n')
-    test_lockspeed(threading.Lock())
-    print('\n\t######## testing threading.RLock\n')
-    test_lockspeed(threading.RLock())
-    print('\n\t######## testing multiprocessing.Lock\n')
-    test_lockspeed(multiprocessing.Lock())
-    print('\n\t######## testing multiprocessing.RLock\n')
-    test_lockspeed(multiprocessing.RLock())
-    print('\n\t######## testing lock managed by server process\n')
-    test_lockspeed(manager.Lock())
-    print('\n\t######## testing rlock managed by server process\n')
-    test_lockspeed(manager.RLock())
-
-    print()
-
-    print('\n\t######## testing threading.Condition\n')
-    test_conditionspeed(threading.Thread, threading.Condition())
-    print('\n\t######## testing multiprocessing.Condition\n')
-    test_conditionspeed(multiprocessing.Process, multiprocessing.Condition())
-    print('\n\t######## testing condition managed by a server process\n')
-    test_conditionspeed(multiprocessing.Process, manager.Condition())
-
-    gc.enable()
-
-if __name__ == '__main__':
-    multiprocessing.freeze_support()
-    test()
diff --git a/Doc/includes/mp_newtype.py b/Doc/includes/mp_newtype.py
--- a/Doc/includes/mp_newtype.py
+++ b/Doc/includes/mp_newtype.py
@@ -1,11 +1,3 @@
-#
-# This module shows how to use arbitrary callables with a subclass of
-# `BaseManager`.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
 from multiprocessing import freeze_support
 from multiprocessing.managers import BaseManager, BaseProxy
 import operator
@@ -27,12 +19,10 @@
 
 # Proxy type for generator objects
 class GeneratorProxy(BaseProxy):
-    _exposed_ = ('next', '__next__')
+    _exposed_ = ['__next__']
     def __iter__(self):
         return self
     def __next__(self):
-        return self._callmethod('next')
-    def __next__(self):
         return self._callmethod('__next__')
 
 # Function to return the operator module
@@ -90,8 +80,6 @@
     op = manager.operator()
     print('op.add(23, 45) =', op.add(23, 45))
     print('op.pow(2, 94) =', op.pow(2, 94))
-    print('op.getslice(range(10), 2, 6) =', op.getslice(list(range(10)), 2, 6))
-    print('op.repeat(range(5), 3) =', op.repeat(list(range(5)), 3))
     print('op._exposed_ =', op._exposed_)
 
 ##
diff --git a/Doc/includes/mp_pool.py b/Doc/includes/mp_pool.py
--- a/Doc/includes/mp_pool.py
+++ b/Doc/includes/mp_pool.py
@@ -1,10 +1,3 @@
-#
-# A test of `multiprocessing.Pool` class
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
 import multiprocessing
 import time
 import random
@@ -46,269 +39,115 @@
 #
 
 def test():
-    print('cpu_count() = %d\n' % multiprocessing.cpu_count())
-
-    #
-    # Create pool
-    #
-
     PROCESSES = 4
     print('Creating pool with %d processes\n' % PROCESSES)
-    pool = multiprocessing.Pool(PROCESSES)
-    print('pool = %s' % pool)
-    print()
 
-    #
-    # Tests
-    #
+    with multiprocessing.Pool(PROCESSES) as pool:
+        #
+        # Tests
+        #
 
-    TASKS = [(mul, (i, 7)) for i in range(10)] + \
-            [(plus, (i, 8)) for i in range(10)]
+        TASKS = [(mul, (i, 7)) for i in range(10)] + \
+                [(plus, (i, 8)) for i in range(10)]
 
-    results = [pool.apply_async(calculate, t) for t in TASKS]
-    imap_it = pool.imap(calculatestar, TASKS)
-    imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
+        results = [pool.apply_async(calculate, t) for t in TASKS]
+        imap_it = pool.imap(calculatestar, TASKS)
+        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
 
-    print('Ordered results using pool.apply_async():')
-    for r in results:
-        print('\t', r.get())
-    print()
+        print('Ordered results using pool.apply_async():')
+        for r in results:
+            print('\t', r.get())
+        print()
 
-    print('Ordered results using pool.imap():')
-    for x in imap_it:
-        print('\t', x)
-    print()
+        print('Ordered results using pool.imap():')
+        for x in imap_it:
+            print('\t', x)
+        print()
 
-    print('Unordered results using pool.imap_unordered():')
-    for x in imap_unordered_it:
-        print('\t', x)
-    print()
+        print('Unordered results using pool.imap_unordered():')
+        for x in imap_unordered_it:
+            print('\t', x)
+        print()
 
-    print('Ordered results using pool.map() --- will block till complete:')
-    for x in pool.map(calculatestar, TASKS):
-        print('\t', x)
-    print()
+        print('Ordered results using pool.map() --- will block till complete:')
+        for x in pool.map(calculatestar, TASKS):
+            print('\t', x)
+        print()
 
-    #
-    # Simple benchmarks
-    #
+        #
+        # Test error handling
+        #
 
-    N = 100000
-    print('def pow3(x): return x**3')
+        print('Testing error handling:')
 
-    t = time.time()
-    A = list(map(pow3, range(N)))
-    print('\tmap(pow3, range(%d)):\n\t\t%s seconds' % \
-          (N, time.time() - t))
+        try:
+            print(pool.apply(f, (5,)))
+        except ZeroDivisionError:
+            print('\tGot ZeroDivisionError as expected from pool.apply()')
+        else:
+            raise AssertionError('expected ZeroDivisionError')
 
-    t = time.time()
-    B = pool.map(pow3, range(N))
-    print('\tpool.map(pow3, range(%d)):\n\t\t%s seconds' % \
-          (N, time.time() - t))
+        try:
+            print(pool.map(f, list(range(10))))
+        except ZeroDivisionError:
+            print('\tGot ZeroDivisionError as expected from pool.map()')
+        else:
+            raise AssertionError('expected ZeroDivisionError')
 
-    t = time.time()
-    C = list(pool.imap(pow3, range(N), chunksize=N//8))
-    print('\tlist(pool.imap(pow3, range(%d), chunksize=%d)):\n\t\t%s' \
-          ' seconds' % (N, N//8, time.time() - t))
+        try:
+            print(list(pool.imap(f, list(range(10)))))
+        except ZeroDivisionError:
+            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
+        else:
+            raise AssertionError('expected ZeroDivisionError')
 
-    assert A == B == C, (len(A), len(B), len(C))
-    print()
+        it = pool.imap(f, list(range(10)))
+        for i in range(10):
+            try:
+                x = next(it)
+            except ZeroDivisionError:
+                if i == 5:
+                    pass
+            except StopIteration:
+                break
+            else:
+                if i == 5:
+                    raise AssertionError('expected ZeroDivisionError')
 
-    L = [None] * 1000000
-    print('def noop(x): pass')
-    print('L = [None] * 1000000')
+        assert i == 9
+        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
+        print()
 
-    t = time.time()
-    A = list(map(noop, L))
-    print('\tmap(noop, L):\n\t\t%s seconds' % \
-          (time.time() - t))
+        #
+        # Testing timeouts
+        #
 
-    t = time.time()
-    B = pool.map(noop, L)
-    print('\tpool.map(noop, L):\n\t\t%s seconds' % \
-          (time.time() - t))
+        print('Testing ApplyResult.get() with timeout:', end=' ')
+        res = pool.apply_async(calculate, TASKS[0])
+        while 1:
+            sys.stdout.flush()
+            try:
+                sys.stdout.write('\n\t%s' % res.get(0.02))
+                break
+            except multiprocessing.TimeoutError:
+                sys.stdout.write('.')
+        print()
+        print()
 
-    t = time.time()
-    C = list(pool.imap(noop, L, chunksize=len(L)//8))
-    print('\tlist(pool.imap(noop, L, chunksize=%d)):\n\t\t%s seconds' % \
-          (len(L)//8, time.time() - t))
-
-    assert A == B == C, (len(A), len(B), len(C))
-    print()
-
-    del A, B, C, L
-
-    #
-    # Test error handling
-    #
-
-    print('Testing error handling:')
-
-    try:
-        print(pool.apply(f, (5,)))
-    except ZeroDivisionError:
-        print('\tGot ZeroDivisionError as expected from pool.apply()')
-    else:
-        raise AssertionError('expected ZeroDivisionError')
-
-    try:
-        print(pool.map(f, list(range(10))))
-    except ZeroDivisionError:
-        print('\tGot ZeroDivisionError as expected from pool.map()')
-    else:
-        raise AssertionError('expected ZeroDivisionError')
-
-    try:
-        print(list(pool.imap(f, list(range(10)))))
-    except ZeroDivisionError:
-        print('\tGot ZeroDivisionError as expected from list(pool.imap())')
-    else:
-        raise AssertionError('expected ZeroDivisionError')
-
-    it = pool.imap(f, list(range(10)))
-    for i in range(10):
-        try:
-            x = next(it)
-        except ZeroDivisionError:
-            if i == 5:
-                pass
-        except StopIteration:
-            break
-        else:
-            if i == 5:
-                raise AssertionError('expected ZeroDivisionError')
-
-    assert i == 9
-    print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
-    print()
-
-    #
-    # Testing timeouts
-    #
-
-    print('Testing ApplyResult.get() with timeout:', end=' ')
-    res = pool.apply_async(calculate, TASKS[0])
-    while 1:
-        sys.stdout.flush()
-        try:
-            sys.stdout.write('\n\t%s' % res.get(0.02))
-            break
-        except multiprocessing.TimeoutError:
-            sys.stdout.write('.')
-    print()
-    print()
-
-    print('Testing IMapIterator.next() with timeout:', end=' ')
-    it = pool.imap(calculatestar, TASKS)
-    while 1:
-        sys.stdout.flush()
-        try:
-            sys.stdout.write('\n\t%s' % it.next(0.02))
-        except StopIteration:
-            break
-        except multiprocessing.TimeoutError:
-            sys.stdout.write('.')
-    print()
-    print()
-
-    #
-    # Testing callback
-    #
-
-    print('Testing callback:')
-
-    A = []
-    B = [56, 0, 1, 8, 27, 64, 125, 216, 343, 512, 729]
-
-    r = pool.apply_async(mul, (7, 8), callback=A.append)
-    r.wait()
-
-    r = pool.map_async(pow3, list(range(10)), callback=A.extend)
-    r.wait()
-
-    if A == B:
-        print('\tcallbacks succeeded\n')
-    else:
-        print('\t*** callbacks failed\n\t\t%s != %s\n' % (A, B))
-
-    #
-    # Check there are no outstanding tasks
-    #
-
-    assert not pool._cache, 'cache = %r' % pool._cache
-
-    #
-    # Check close() methods
-    #
-
-    print('Testing close():')
-
-    for worker in pool._pool:
-        assert worker.is_alive()
-
-    result = pool.apply_async(time.sleep, [0.5])
-    pool.close()
-    pool.join()
-
-    assert result.get() is None
-
-    for worker in pool._pool:
-        assert not worker.is_alive()
-
-    print('\tclose() succeeded\n')
-
-    #
-    # Check terminate() method
-    #
-
-    print('Testing terminate():')
-
-    pool = multiprocessing.Pool(2)
-    DELTA = 0.1
-    ignore = pool.apply(pow3, [2])
-    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
-    pool.terminate()
-    pool.join()
-
-    for worker in pool._pool:
-        assert not worker.is_alive()
-
-    print('\tterminate() succeeded\n')
-
-    #
-    # Check garbage collection
-    #
-
-    print('Testing garbage collection:')
-
-    pool = multiprocessing.Pool(2)
-    DELTA = 0.1
-    processes = pool._pool
-    ignore = pool.apply(pow3, [2])
-    results = [pool.apply_async(time.sleep, [DELTA]) for i in range(100)]
-
-    results = pool = None
-
-    time.sleep(DELTA * 2)
-
-    for worker in processes:
-        assert not worker.is_alive()
-
-    print('\tgarbage collection succeeded\n')
+        print('Testing IMapIterator.next() with timeout:', end=' ')
+        it = pool.imap(calculatestar, TASKS)
+        while 1:
+            sys.stdout.flush()
+            try:
+                sys.stdout.write('\n\t%s' % it.next(0.02))
+            except StopIteration:
+                break
+            except multiprocessing.TimeoutError:
+                sys.stdout.write('.')
+        print()
+        print()
 
 
 if __name__ == '__main__':
     multiprocessing.freeze_support()
-
-    assert len(sys.argv) in (1, 2)
-
-    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
-        print(' Using processes '.center(79, '-'))
-    elif sys.argv[1] == 'threads':
-        print(' Using threads '.center(79, '-'))
-        import multiprocessing.dummy as multiprocessing
-    else:
-        print('Usage:\n\t%s [processes | threads]' % sys.argv[0])
-        raise SystemExit(2)
-
     test()
diff --git a/Doc/includes/mp_synchronize.py b/Doc/includes/mp_synchronize.py
deleted file mode 100644
--- a/Doc/includes/mp_synchronize.py
+++ /dev/null
@@ -1,278 +0,0 @@
-#
-# A test file for the `multiprocessing` package
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import time
-import sys
-import random
-from queue import Empty
-
-import multiprocessing               # may get overwritten
-
-
-#### TEST_VALUE
-
-def value_func(running, mutex):
-    random.seed()
-    time.sleep(random.random()*4)
-
-    mutex.acquire()
-    print('\n\t\t\t' + str(multiprocessing.current_process()) + ' has finished')
-    running.value -= 1
-    mutex.release()
-
-def test_value():
-    TASKS = 10
-    running = multiprocessing.Value('i', TASKS)
-    mutex = multiprocessing.Lock()
-
-    for i in range(TASKS):
-        p = multiprocessing.Process(target=value_func, args=(running, mutex))
-        p.start()
-
-    while running.value > 0:
-        time.sleep(0.08)
-        mutex.acquire()
-        print(running.value, end=' ')
-        sys.stdout.flush()
-        mutex.release()
-
-    print()
-    print('No more running processes')
-
-
-#### TEST_QUEUE
-
-def queue_func(queue):
-    for i in range(30):
-        time.sleep(0.5 * random.random())
-        queue.put(i*i)
-    queue.put('STOP')
-
-def test_queue():
-    q = multiprocessing.Queue()
-
-    p = multiprocessing.Process(target=queue_func, args=(q,))
-    p.start()
-
-    o = None
-    while o != 'STOP':
-        try:
-            o = q.get(timeout=0.3)
-            print(o, end=' ')
-            sys.stdout.flush()
-        except Empty:
-            print('TIMEOUT')
-
-    print()
-
-
-#### TEST_CONDITION
-
-def condition_func(cond):
-    cond.acquire()
-    print('\t' + str(cond))
-    time.sleep(2)
-    print('\tchild is notifying')
-    print('\t' + str(cond))
-    cond.notify()
-    cond.release()
-
-def test_condition():
-    cond = multiprocessing.Condition()
-
-    p = multiprocessing.Process(target=condition_func, args=(cond,))
-    print(cond)
-
-    cond.acquire()
-    print(cond)
-    cond.acquire()
-    print(cond)
-
-    p.start()
-
-    print('main is waiting')
-    cond.wait()
-    print('main has woken up')
-
-    print(cond)
-    cond.release()
-    print(cond)
-    cond.release()
-
-    p.join()
-    print(cond)
-
-
-#### TEST_SEMAPHORE
-
-def semaphore_func(sema, mutex, running):
-    sema.acquire()
-
-    mutex.acquire()
-    running.value += 1
-    print(running.value, 'tasks are running')
-    mutex.release()
-
-    random.seed()
-    time.sleep(random.random()*2)
-
-    mutex.acquire()
-    running.value -= 1
-    print('%s has finished' % multiprocessing.current_process())
-    mutex.release()
-
-    sema.release()
-
-def test_semaphore():
-    sema = multiprocessing.Semaphore(3)
-    mutex = multiprocessing.RLock()
-    running = multiprocessing.Value('i', 0)
-
-    processes = [
-        multiprocessing.Process(target=semaphore_func,
-                                args=(sema, mutex, running))
-        for i in range(10)
-        ]
-
-    for p in processes:
-        p.start()
-
-    for p in processes:
-        p.join()
-
-
-#### TEST_JOIN_TIMEOUT
-
-def join_timeout_func():
-    print('\tchild sleeping')
-    time.sleep(5.5)
-    print('\n\tchild terminating')
-
-def test_join_timeout():
-    p = multiprocessing.Process(target=join_timeout_func)
-    p.start()
-
-    print('waiting for process to finish')
-
-    while 1:
-        p.join(timeout=1)
-        if not p.is_alive():
-            break
-        print('.', end=' ')
-        sys.stdout.flush()
-
-
-#### TEST_EVENT
-
-def event_func(event):
-    print('\t%r is waiting' % multiprocessing.current_process())
-    event.wait()
-    print('\t%r has woken up' % multiprocessing.current_process())
-
-def test_event():
-    event = multiprocessing.Event()
-
-    processes = [multiprocessing.Process(target=event_func, args=(event,))
-                 for i in range(5)]
-
-    for p in processes:
-        p.start()
-
-    print('main is sleeping')
-    time.sleep(2)
-
-    print('main is setting event')
-    event.set()
-
-    for p in processes:
-        p.join()
-
-
-#### TEST_SHAREDVALUES
-
-def sharedvalues_func(values, arrays, shared_values, shared_arrays):
-    for i in range(len(values)):
-        v = values[i][1]
-        sv = shared_values[i].value
-        assert v == sv
-
-    for i in range(len(values)):
-        a = arrays[i][1]
-        sa = list(shared_arrays[i][:])
-        assert a == sa
-
-    print('Tests passed')
-
-def test_sharedvalues():
-    values = [
-        ('i', 10),
-        ('h', -2),
-        ('d', 1.25)
-        ]
-    arrays = [
-        ('i', list(range(100))),
-        ('d', [0.25 * i for i in range(100)]),
-        ('H', list(range(1000)))
-        ]
-
-    shared_values = [multiprocessing.Value(id, v) for id, v in values]
-    shared_arrays = [multiprocessing.Array(id, a) for id, a in arrays]
-
-    p = multiprocessing.Process(
-        target=sharedvalues_func,
-        args=(values, arrays, shared_values, shared_arrays)
-        )
-    p.start()
-    p.join()
-
-    assert p.exitcode == 0
-
-
-####
-
-def test(namespace=multiprocessing):
-    global multiprocessing
-
-    multiprocessing = namespace
-
-    for func in [test_value, test_queue, test_condition,
-                 test_semaphore, test_join_timeout, test_event,
-                 test_sharedvalues]:
-
-        print('\n\t######## %s\n' % func.__name__)
-        func()
-
-    ignore = multiprocessing.active_children()      # cleanup any old processes
-    if hasattr(multiprocessing, '_debug_info'):
-        info = multiprocessing._debug_info()
-        if info:
-            print(info)
-            raise ValueError('there should be no positive refcounts left')
-
-
-if __name__ == '__main__':
-    multiprocessing.freeze_support()
-
-    assert len(sys.argv) in (1, 2)
-
-    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
-        print(' Using processes '.center(79, '-'))
-        namespace = multiprocessing
-    elif sys.argv[1] == 'manager':
-        print(' Using processes and a manager '.center(79, '-'))
-        namespace = multiprocessing.Manager()
-        namespace.Process = multiprocessing.Process
-        namespace.current_process = multiprocessing.current_process
-        namespace.active_children = multiprocessing.active_children
-    elif sys.argv[1] == 'threads':
-        print(' Using threads '.center(79, '-'))
-        import multiprocessing.dummy as namespace
-    else:
-        print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
-        raise SystemExit(2)
-
-    test(namespace)
diff --git a/Doc/includes/mp_webserver.py b/Doc/includes/mp_webserver.py
deleted file mode 100644
--- a/Doc/includes/mp_webserver.py
+++ /dev/null
@@ -1,70 +0,0 @@
-#
-# Example where a pool of http servers share a single listening socket
-#
-# On Windows this module depends on the ability to pickle a socket
-# object so that the worker processes can inherit a copy of the server
-# object.  (We import `multiprocessing.reduction` to enable this pickling.)
-#
-# Not sure if we should synchronize access to `socket.accept()` method by
-# using a process-shared lock -- does not seem to be necessary.
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
-import os
-import sys
-
-from multiprocessing import Process, current_process, freeze_support
-from http.server import HTTPServer
-from http.server import SimpleHTTPRequestHandler
-
-if sys.platform == 'win32':
-    import multiprocessing.reduction    # make sockets pickable/inheritable
-
-
-def note(format, *args):
-    sys.stderr.write('[%s]\t%s\n' % (current_process().name, format % args))
-
-
-class RequestHandler(SimpleHTTPRequestHandler):
-    # we override log_message() to show which process is handling the request
-    def log_message(self, format, *args):
-        note(format, *args)
-
-def serve_forever(server):
-    note('starting server')
-    try:
-        server.serve_forever()
-    except KeyboardInterrupt:
-        pass
-
-
-def runpool(address, number_of_processes):
-    # create a single server object -- children will each inherit a copy
-    server = HTTPServer(address, RequestHandler)
-
-    # create child processes to act as workers
-    for i in range(number_of_processes - 1):
-        Process(target=serve_forever, args=(server,)).start()
-
-    # main process also acts as a worker
-    serve_forever(server)
-
-
-def test():
-    DIR = os.path.join(os.path.dirname(__file__), '..')
-    ADDRESS = ('localhost', 8000)
-    NUMBER_OF_PROCESSES = 4
-
-    print('Serving at http://%s:%d using %d worker processes' % \
-          (ADDRESS[0], ADDRESS[1], NUMBER_OF_PROCESSES))
-    print('To exit press Ctrl-' + ['C', 'Break'][sys.platform=='win32'])
-
-    os.chdir(DIR)
-    runpool(ADDRESS, NUMBER_OF_PROCESSES)
-
-
-if __name__ == '__main__':
-    freeze_support()
-    test()
diff --git a/Doc/includes/mp_workers.py b/Doc/includes/mp_workers.py
--- a/Doc/includes/mp_workers.py
+++ b/Doc/includes/mp_workers.py
@@ -1,16 +1,3 @@
-#
-# Simple example which uses a pool of workers to carry out some tasks.
-#
-# Notice that the results will probably not come out of the output
-# queue in the same in the same order as the corresponding tasks were
-# put on the input queue.  If it is important to get the results back
-# in the original order then consider using `Pool.map()` or
-# `Pool.imap()` (which will save on the amount of code needed anyway).
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# All rights reserved.
-#
-
 import time
 import random
 
diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -93,11 +93,80 @@
         p.start()
         p.join()
 
-For an explanation of why (on Windows) the ``if __name__ == '__main__'`` part is
+For an explanation of why the ``if __name__ == '__main__'`` part is
 necessary, see :ref:`multiprocessing-programming`.
 
 
 
+Start methods
+~~~~~~~~~~~~~
+
+Depending on the platform, :mod:`multiprocessing` supports three ways
+to start a process.  These *start methods* are
+
+  *spawn*
+    The parent process starts a fresh python interpreter process.  The
+    child process will only inherit those resources necessary to run
+    the process objects :meth:`~Process.run` method.  In particular,
+    unnecessary file descriptors and handles from the parent process
+    will not be inherited.  Starting a process using this method is
+    rather slow compared to using *fork* or *forkserver*.
+
+    Available on Unix and Windows.  The default on Windows.
+
+  *fork*
+    The parent process uses :func:`os.fork` to fork the Python
+    interpreter.  The child process, when it begins, is effectively
+    identical to the parent process.  All resources of the parent are
+    inherited by the child process.  Note that safely forking a
+    multithreaded process is problematic.
+
+    Available on Unix only.  The default on Unix.
+
+  *forkserver*
+    When the program starts and selects the *forkserver* start method,
+    a server process is started.  From then on, whenever a new process
+    is need the parent process connects to the server and requests
+    that it fork a new process.  The fork server process is single
+    threaded so it is safe for it to use :func:`os.fork`.  No
+    unnecessary resources are inherited.
+
+    Available on Unix platforms which support passing file descriptors
+    over unix pipes.
+
+Before Python 3.4 *fork* was the only option available on Unix.  Also,
+prior to Python 3.4, child processes would inherit all the parents
+inheritable handles on Windows.
+
+On Unix using the *spawn* or *forkserver* start methods will also
+start a *semaphore tracker* process which tracks the unlinked named
+semaphores created by processes of the program.  When all processes
+have exited the semaphore tracker unlinks any remaining semaphores.
+Usually there should be none, but if a process was killed by a signal
+there may some "leaked" semaphores.  (Unlinking the named semaphores
+is a serious matter since the system allows only a limited number, and
+they will not be automatically unlinked until the next reboot.)
+
+To select the a start method you use the :func:`set_start_method` in
+the ``if __name__ == '__main__'`` clause of the main module.  For
+example::
+
+       import multiprocessing as mp
+
+       def foo():
+           print('hello')
+
+       if __name__ == '__main__':
+           mp.set_start_method('spawn')
+           p = mp.Process(target=foo)
+           p.start()
+           p.join()
+
+:func:`set_start_method` should not be used more than once in the
+program.
+
+
+
 Exchanging objects between processes
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 
@@ -274,15 +343,31 @@
 For example::
 
    from multiprocessing import Pool
+   from time import sleep
 
    def f(x):
        return x*x
 
    if __name__ == '__main__':
-       with Pool(processes=4) as pool:        # start 4 worker processes
-           result = pool.apply_async(f, [10]) # evaluate "f(10)" asynchronously
-           print(result.get(timeout=1))       # prints "100" unless your computer is *very* slow
-           print(pool.map(f, range(10)))      # prints "[0, 1, 4,..., 81]"
+       # start 4 worker processes
+       with Pool(processes=4) as pool:
+
+           # print "[0, 1, 4,..., 81]"
+           print(pool.map(f, range(10)))
+
+           # print same numbers in arbitrary order
+           for i in pool.imap_unordered(f, range(10)):
+               print(i)
+
+           # evaluate "f(10)" asynchronously
+           res = pool.apply_async(f, [10])
+           print(res.get(timeout=1))             # prints "100"
+
+           # make worker sleep for 10 secs
+           res = pool.apply_async(sleep, 10)
+           print(res.get(timeout=1))             # raises multiprocessing.TimeoutError
+
+       # exiting the 'with'-block has stopped the pool
 
 Note that the methods of a pool should only ever be used by the
 process which created it.
@@ -763,6 +848,24 @@
    If the module is being run normally by the Python interpreter then
    :func:`freeze_support` has no effect.
 
+.. function:: get_all_start_methods()
+
+   Returns a list of the supported start methods, the first of which
+   is the default.  The possible start methods are ``'fork'``,
+   ``'spawn'`` and ``'forkserver'``.  On Windows only ``'spawn'`` is
+   available.  On Unix ``'fork'`` and ``'spawn'`` are always
+   supported, with ``'fork'`` being the default.
+
+   .. versionadded:: 3.4
+
+.. function:: get_start_method()
+
+   Return the current start method.  This can be ``'fork'``,
+   ``'spawn'`` or ``'forkserver'``.  ``'fork'`` is the default on
+   Unix, while ``'spawn'`` is the default on Windows.
+
+   .. versionadded:: 3.4
+
 .. function:: set_executable()
 
    Sets the path of the Python interpreter to use when starting a child process.
@@ -771,8 +874,21 @@
 
       set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
 
-   before they can create child processes.  (Windows only)
-
+   before they can create child processes.
+
+   .. versionchanged:: 3.4
+      Now supported on Unix when the ``'spawn'`` start method is used.
+
+.. function:: set_start_method(method)
+
+   Set the method which should be used to start child processes.
+   *method* can be ``'fork'``, ``'spawn'`` or ``'forkserver'``.
+
+   Note that this should be called at most once, and it should be
+   protected inside the ``if __name__ == '__main__'`` clause of the
+   main module.
+
+   .. versionadded:: 3.4
 
 .. note::
 
@@ -2175,43 +2291,8 @@
     [INFO/MainProcess] sending shutdown message to manager
     [INFO/SyncManager-...] manager exiting with exitcode 0
 
-In addition to having these two logging functions, the multiprocessing also
-exposes two additional logging level attributes. These are  :const:`SUBWARNING`
-and :const:`SUBDEBUG`. The table below illustrates where theses fit in the
-normal level hierarchy.
-
-+----------------+----------------+
-| Level          | Numeric value  |
-+================+================+
-| ``SUBWARNING`` | 25             |
-+----------------+----------------+
-| ``SUBDEBUG``   | 5              |
-+----------------+----------------+
-
 For a full table of logging levels, see the :mod:`logging` module.
 
-These additional logging levels are used primarily for certain debug messages
-within the multiprocessing module. Below is the same example as above, except
-with :const:`SUBDEBUG` enabled::
-
-    >>> import multiprocessing, logging
-    >>> logger = multiprocessing.log_to_stderr()
-    >>> logger.setLevel(multiprocessing.SUBDEBUG)
-    >>> logger.warning('doomed')
-    [WARNING/MainProcess] doomed
-    >>> m = multiprocessing.Manager()
-    [INFO/SyncManager-...] child process calling self.run()
-    [INFO/SyncManager-...] created temp directory /.../pymp-...
-    [INFO/SyncManager-...] manager serving at '/.../pymp-djGBXN/listener-...'
-    >>> del m
-    [SUBDEBUG/MainProcess] finalizer calling ...
-    [INFO/MainProcess] sending shutdown message to manager
-    [DEBUG/SyncManager-...] manager received shutdown message
-    [SUBDEBUG/SyncManager-...] calling <Finalize object, callback=unlink, ...
-    [SUBDEBUG/SyncManager-...] finalizer calling <built-in function unlink> ...
-    [SUBDEBUG/SyncManager-...] calling <Finalize object, dead>
-    [SUBDEBUG/SyncManager-...] finalizer calling <function rmtree at 0x5aa730> ...
-    [INFO/SyncManager-...] manager exiting with exitcode 0
 
 The :mod:`multiprocessing.dummy` module
 ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
@@ -2232,8 +2313,10 @@
 :mod:`multiprocessing`.
 
 
-All platforms
-~~~~~~~~~~~~~
+All start methods
+~~~~~~~~~~~~~~~~~
+
+The following applies to all start methods.
 
 Avoid shared state
 
@@ -2266,11 +2349,13 @@
 
 Better to inherit than pickle/unpickle
 
-    On Windows many types from :mod:`multiprocessing` need to be picklable so
-    that child processes can use them.  However, one should generally avoid
-    sending shared objects to other processes using pipes or queues.  Instead
-    you should arrange the program so that a process which needs access to a
-    shared resource created elsewhere can inherit it from an ancestor process.
+    When using the *spawn* or *forkserver* start methods many types
+    from :mod:`multiprocessing` need to be picklable so that child
+    processes can use them.  However, one should generally avoid
+    sending shared objects to other processes using pipes or queues.
+    Instead you should arrange the program so that a process which
+    needs access to a shared resource created elsewhere can inherit it
+    from an ancestor process.
 
 Avoid terminating processes
 
@@ -2314,15 +2399,17 @@
 
 Explicitly pass resources to child processes
 
-    On Unix a child process can make use of a shared resource created in a
-    parent process using a global resource.  However, it is better to pass the
-    object as an argument to the constructor for the child process.
-
-    Apart from making the code (potentially) compatible with Windows this also
-    ensures that as long as the child process is still alive the object will not
-    be garbage collected in the parent process.  This might be important if some
-    resource is freed when the object is garbage collected in the parent
-    process.
+    On Unix using the *fork* start method, a child process can make
+    use of a shared resource created in a parent process using a
+    global resource.  However, it is better to pass the object as an
+    argument to the constructor for the child process.
+
+    Apart from making the code (potentially) compatible with Windows
+    and the other start methods this also ensures that as long as the
+    child process is still alive the object will not be garbage
+    collected in the parent process.  This might be important if some
+    resource is freed when the object is garbage collected in the
+    parent process.
 
     So for instance ::
 
@@ -2381,17 +2468,19 @@
 
     For more information, see :issue:`5155`, :issue:`5313` and :issue:`5331`
 
-Windows
-~~~~~~~
-
-Since Windows lacks :func:`os.fork` it has a few extra restrictions:
+The *spawn* and *forkserver* start methods
+~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+There are a few extra restriction which don't apply to the *fork*
+start method.
 
 More picklability
 
-    Ensure that all arguments to :meth:`Process.__init__` are picklable.  This
-    means, in particular, that bound or unbound methods cannot be used directly
-    as the ``target`` argument on Windows --- just define a function and use
-    that instead.
+    Ensure that all arguments to :meth:`Process.__init__` are
+    picklable.  This means, in particular, that bound or unbound
+    methods cannot be used directly as the ``target`` (unless you use
+    the *fork* start method) --- just define a function and use that
+    instead.
 
     Also, if you subclass :class:`Process` then make sure that instances will be
     picklable when the :meth:`Process.start` method is called.
@@ -2411,7 +2500,8 @@
     interpreter without causing unintended side effects (such a starting a new
     process).
 
-    For example, under Windows running the following module would fail with a
+    For example, using the *spawn* or *forkserver* start method
+    running the following module would fail with a
     :exc:`RuntimeError`::
 
         from multiprocessing import Process
@@ -2425,13 +2515,14 @@
     Instead one should protect the "entry point" of the program by using ``if
     __name__ == '__main__':`` as follows::
 
-       from multiprocessing import Process, freeze_support
+       from multiprocessing import Process, freeze_support, set_start_method
 
        def foo():
            print('hello')
 
        if __name__ == '__main__':
            freeze_support()
+           set_start_method('spawn')
            p = Process(target=foo)
            p.start()
 
@@ -2462,26 +2553,7 @@
    :language: python3
 
 
-Synchronization types like locks, conditions and queues:
-
-.. literalinclude:: ../includes/mp_synchronize.py
-   :language: python3
-
-
 An example showing how to use queues to feed tasks to a collection of worker
 processes and collect the results:
 
 .. literalinclude:: ../includes/mp_workers.py
-
-
-An example of how a pool of worker processes can each run a
-:class:`~http.server.SimpleHTTPRequestHandler` instance while sharing a single
-listening socket.
-
-.. literalinclude:: ../includes/mp_webserver.py
-
-
-Some simple benchmarks comparing :mod:`multiprocessing` with :mod:`threading`:
-
-.. literalinclude:: ../includes/mp_benchmarks.py
-
diff --git a/Doc/whatsnew/3.4.rst b/Doc/whatsnew/3.4.rst
--- a/Doc/whatsnew/3.4.rst
+++ b/Doc/whatsnew/3.4.rst
@@ -108,6 +108,8 @@
 * Single-dispatch generic functions (:pep:`443`)
 * SHA-3 (Keccak) support for :mod:`hashlib`.
 * TLSv1.1 and TLSv1.2 support for :mod:`ssl`.
+* :mod:`multiprocessing` now has option to avoid using :func:`os.fork`
+  on Unix (:issue:`8713`).
 
 Security improvements:
 
@@ -254,6 +256,17 @@
 (Contributed by Valerie Lambert in :issue:`4885`.)
 
 
+multiprocessing
+---------------
+
+On Unix two new *start methods* have been added for starting processes
+using :mod:`multiprocessing`.  These make the mixing of processes with
+threads more robust.  See :issue:`8713`.
+
+Also, except when using the old *fork* start method, child processes
+will no longer inherit unneeded handles/file descriptors from their parents.
+
+
 poplib
 ------
 
diff --git a/Lib/multiprocessing/__init__.py b/Lib/multiprocessing/__init__.py
--- a/Lib/multiprocessing/__init__.py
+++ b/Lib/multiprocessing/__init__.py
@@ -21,6 +21,8 @@
     'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition',
     'Event', 'Barrier', 'Queue', 'SimpleQueue', 'JoinableQueue', 'Pool',
     'Value', 'Array', 'RawValue', 'RawArray', 'SUBDEBUG', 'SUBWARNING',
+    'set_executable', 'set_start_method', 'get_start_method',
+    'get_all_start_methods', 'set_forkserver_preload'
     ]
 
 #
@@ -30,8 +32,14 @@
 import os
 import sys
 
-from multiprocessing.process import Process, current_process, active_children
-from multiprocessing.util import SUBDEBUG, SUBWARNING
+from .process import Process, current_process, active_children
+
+#
+# XXX These should not really be documented or public.
+#
+
+SUBDEBUG = 5
+SUBWARNING = 25
 
 #
 # Alias for main module -- will be reset by bootstrapping child processes
@@ -56,8 +64,6 @@
 class AuthenticationError(ProcessError):
     pass
 
-import _multiprocessing
-
 #
 # Definitions not depending on native semaphores
 #
@@ -69,7 +75,7 @@
     The managers methods such as `Lock()`, `Condition()` and `Queue()`
     can be used to create shared objects.
     '''
-    from multiprocessing.managers import SyncManager
+    from .managers import SyncManager
     m = SyncManager()
     m.start()
     return m
@@ -78,7 +84,7 @@
     '''
     Returns two connection object connected by a pipe
     '''
-    from multiprocessing.connection import Pipe
+    from .connection import Pipe
     return Pipe(duplex)
 
 def cpu_count():
@@ -97,21 +103,21 @@
     If so then run code specified by commandline and exit.
     '''
     if sys.platform == 'win32' and getattr(sys, 'frozen', False):
-        from multiprocessing.forking import freeze_support
+        from .spawn import freeze_support
         freeze_support()
 
 def get_logger():
     '''
     Return package logger -- if it does not already exist then it is created
     '''
-    from multiprocessing.util import get_logger
+    from .util import get_logger
     return get_logger()
 
 def log_to_stderr(level=None):
     '''
     Turn on logging and add a handler which prints to stderr
     '''
-    from multiprocessing.util import log_to_stderr
+    from .util import log_to_stderr
     return log_to_stderr(level)
 
 def allow_connection_pickling():
@@ -120,7 +126,7 @@
     '''
     # This is undocumented.  In previous versions of multiprocessing
     # its only effect was to make socket objects inheritable on Windows.
-    import multiprocessing.connection
+    from . import connection
 
 #
 # Definitions depending on native semaphores
@@ -130,120 +136,151 @@
     '''
     Returns a non-recursive lock object
     '''
-    from multiprocessing.synchronize import Lock
+    from .synchronize import Lock
     return Lock()
 
 def RLock():
     '''
     Returns a recursive lock object
     '''
-    from multiprocessing.synchronize import RLock
+    from .synchronize import RLock
     return RLock()
 
 def Condition(lock=None):
     '''
     Returns a condition object
     '''
-    from multiprocessing.synchronize import Condition
+    from .synchronize import Condition
     return Condition(lock)
 
 def Semaphore(value=1):
     '''
     Returns a semaphore object
     '''
-    from multiprocessing.synchronize import Semaphore
+    from .synchronize import Semaphore
     return Semaphore(value)
 
 def BoundedSemaphore(value=1):
     '''
     Returns a bounded semaphore object
     '''
-    from multiprocessing.synchronize import BoundedSemaphore
+    from .synchronize import BoundedSemaphore
     return BoundedSemaphore(value)
 
 def Event():
     '''
     Returns an event object
     '''
-    from multiprocessing.synchronize import Event
+    from .synchronize import Event
     return Event()
 
 def Barrier(parties, action=None, timeout=None):
     '''
     Returns a barrier object
     '''
-    from multiprocessing.synchronize import Barrier
+    from .synchronize import Barrier
     return Barrier(parties, action, timeout)
 
 def Queue(maxsize=0):
     '''
     Returns a queue object
     '''
-    from multiprocessing.queues import Queue
+    from .queues import Queue
     return Queue(maxsize)
 
 def JoinableQueue(maxsize=0):
     '''
     Returns a queue object
     '''
-    from multiprocessing.queues import JoinableQueue
+    from .queues import JoinableQueue
     return JoinableQueue(maxsize)
 
 def SimpleQueue():
     '''
     Returns a queue object
     '''
-    from multiprocessing.queues import SimpleQueue
+    from .queues import SimpleQueue
     return SimpleQueue()
 
 def Pool(processes=None, initializer=None, initargs=(), maxtasksperchild=None):
     '''
     Returns a process pool object
     '''
-    from multiprocessing.pool import Pool
+    from .pool import Pool
     return Pool(processes, initializer, initargs, maxtasksperchild)
 
 def RawValue(typecode_or_type, *args):
     '''
     Returns a shared object
     '''
-    from multiprocessing.sharedctypes import RawValue
+    from .sharedctypes import RawValue
     return RawValue(typecode_or_type, *args)
 
 def RawArray(typecode_or_type, size_or_initializer):
     '''
     Returns a shared array
     '''
-    from multiprocessing.sharedctypes import RawArray
+    from .sharedctypes import RawArray
     return RawArray(typecode_or_type, size_or_initializer)
 
 def Value(typecode_or_type, *args, lock=True):
     '''
     Returns a synchronized shared object
     '''
-    from multiprocessing.sharedctypes import Value
+    from .sharedctypes import Value
     return Value(typecode_or_type, *args, lock=lock)
 
 def Array(typecode_or_type, size_or_initializer, *, lock=True):
     '''
     Returns a synchronized shared array
     '''
-    from multiprocessing.sharedctypes import Array
+    from .sharedctypes import Array
     return Array(typecode_or_type, size_or_initializer, lock=lock)
 
 #
 #
 #
 
-if sys.platform == 'win32':
+def set_executable(executable):
+    '''
+    Sets the path to a python.exe or pythonw.exe binary used to run
+    child processes instead of sys.executable when using the 'spawn'
+    start method.  Useful for people embedding Python.
+    '''
+    from .spawn import set_executable
+    set_executable(executable)
 
-    def set_executable(executable):
-        '''
-        Sets the path to a python.exe or pythonw.exe binary used to run
-        child processes on Windows instead of sys.executable.
-        Useful for people embedding Python.
-        '''
-        from multiprocessing.forking import set_executable
-        set_executable(executable)
+def set_start_method(method):
+    '''
+    Set method for starting processes: 'fork', 'spawn' or 'forkserver'.
+    '''
+    from .popen import set_start_method
+    set_start_method(method)
 
-    __all__ += ['set_executable']
+def get_start_method():
+    '''
+    Get method for starting processes: 'fork', 'spawn' or 'forkserver'.
+    '''
+    from .popen import get_start_method
+    return get_start_method()
+
+def get_all_start_methods():
+    '''
+    Get list of availables start methods, default first.
+    '''
+    from .popen import get_all_start_methods
+    return get_all_start_methods()
+
+def set_forkserver_preload(module_names):
+    '''
+    Set list of module names to try to load in the forkserver process
+    when it is started.  Properly chosen this can significantly reduce
+    the cost of starting a new process using the forkserver method.
+    The default list is ['__main__'].
+    '''
+    try:
+        from .forkserver import set_forkserver_preload
+    except ImportError:
+        pass
+    else:
+        set_forkserver_preload(module_names)
diff --git a/Lib/multiprocessing/connection.py b/Lib/multiprocessing/connection.py
--- a/Lib/multiprocessing/connection.py
+++ b/Lib/multiprocessing/connection.py
@@ -21,9 +21,13 @@
 import itertools
 
 import _multiprocessing
-from multiprocessing import current_process, AuthenticationError, BufferTooShort
-from multiprocessing.util import get_temp_dir, Finalize, sub_debug, debug
-from multiprocessing.forking import ForkingPickler
+
+from . import reduction
+from . import util
+
+from . import AuthenticationError, BufferTooShort
+from .reduction import ForkingPickler
+
 try:
     import _winapi
     from _winapi import WAIT_OBJECT_0, WAIT_TIMEOUT, INFINITE
@@ -71,7 +75,7 @@
     if family == 'AF_INET':
         return ('localhost', 0)
     elif family == 'AF_UNIX':
-        return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
+        return tempfile.mktemp(prefix='listener-', dir=util.get_temp_dir())
     elif family == 'AF_PIPE':
         return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
                                (os.getpid(), next(_mmap_counter)))
@@ -505,7 +509,7 @@
             c1 = Connection(s1.detach())
             c2 = Connection(s2.detach())
         else:
-            fd1, fd2 = os.pipe()
+            fd1, fd2 = util.pipe()
             c1 = Connection(fd1, writable=False)
             c2 = Connection(fd2, readable=False)
 
@@ -577,7 +581,7 @@
         self._last_accepted = None
 
         if family == 'AF_UNIX':
-            self._unlink = Finalize(
+            self._unlink = util.Finalize(
                 self, os.unlink, args=(address,), exitpriority=0
                 )
         else:
@@ -625,8 +629,8 @@
             self._handle_queue = [self._new_handle(first=True)]
 
             self._last_accepted = None
-            sub_debug('listener created with address=%r', self._address)
-            self.close = Finalize(
+            util.sub_debug('listener created with address=%r', self._address)
+            self.close = util.Finalize(
                 self, PipeListener._finalize_pipe_listener,
                 args=(self._handle_queue, self._address), exitpriority=0
                 )
@@ -668,7 +672,7 @@
 
         @staticmethod
         def _finalize_pipe_listener(queue, address):
-            sub_debug('closing listener with address=%r', address)
+            util.sub_debug('closing listener with address=%r', address)
             for handle in queue:
                 _winapi.CloseHandle(handle)
 
@@ -919,15 +923,32 @@
 #
 
 if sys.platform == 'win32':
-    from . import reduction
-    ForkingPickler.register(socket.socket, reduction.reduce_socket)
-    ForkingPickler.register(Connection, reduction.reduce_connection)
-    ForkingPickler.register(PipeConnection, reduction.reduce_pipe_connection)
+    def reduce_connection(conn):
+        handle = conn.fileno()
+        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
+            from . import resource_sharer
+            ds = resource_sharer.DupSocket(s)
+            return rebuild_connection, (ds, conn.readable, conn.writable)
+    def rebuild_connection(ds, readable, writable):
+        sock = ds.detach()
+        return Connection(sock.detach(), readable, writable)
+    reduction.register(Connection, reduce_connection)
+
+    def reduce_pipe_connection(conn):
+        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
+                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
+        dh = reduction.DupHandle(conn.fileno(), access)
+        return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
+    def rebuild_pipe_connection(dh, readable, writable):
+        handle = dh.detach()
+        return PipeConnection(handle, readable, writable)
+    reduction.register(PipeConnection, reduce_pipe_connection)
+
 else:
-    try:
-        from . import reduction
-    except ImportError:
-        pass
-    else:
-        ForkingPickler.register(socket.socket, reduction.reduce_socket)
-        ForkingPickler.register(Connection, reduction.reduce_connection)
+    def reduce_connection(conn):
+        df = reduction.DupFd(conn.fileno())
+        return rebuild_connection, (df, conn.readable, conn.writable)
+    def rebuild_connection(df, readable, writable):
+        fd = df.detach()
+        return Connection(fd, readable, writable)
+    reduction.register(Connection, reduce_connection)
diff --git a/Lib/multiprocessing/dummy/__init__.py b/Lib/multiprocessing/dummy/__init__.py
--- a/Lib/multiprocessing/dummy/__init__.py
+++ b/Lib/multiprocessing/dummy/__init__.py
@@ -22,7 +22,7 @@
 import weakref
 import array
 
-from multiprocessing.dummy.connection import Pipe
+from .connection import Pipe
 from threading import Lock, RLock, Semaphore, BoundedSemaphore
 from threading import Event, Condition, Barrier
 from queue import Queue
@@ -113,7 +113,7 @@
     pass
 
 def Pool(processes=None, initializer=None, initargs=()):
-    from multiprocessing.pool import ThreadPool
+    from ..pool import ThreadPool
     return ThreadPool(processes, initializer, initargs)
 
 JoinableQueue = Queue
diff --git a/Lib/multiprocessing/forking.py b/Lib/multiprocessing/forking.py
deleted file mode 100644
--- a/Lib/multiprocessing/forking.py
+++ /dev/null
@@ -1,477 +0,0 @@
-#
-# Module for starting a process object using os.fork() or CreateProcess()
-#
-# multiprocessing/forking.py
-#
-# Copyright (c) 2006-2008, R Oudkerk
-# Licensed to PSF under a Contributor Agreement.
-#
-
-import io
-import os
-import pickle
-import sys
-import signal
-import errno
-
-from multiprocessing import util, process
-
-__all__ = ['Popen', 'assert_spawning', 'duplicate', 'close', 'ForkingPickler']
-
-#
-# Check that the current thread is spawning a child process
-#
-
-def assert_spawning(self):
-    if not Popen.thread_is_spawning():
-        raise RuntimeError(
-            '%s objects should only be shared between processes'
-            ' through inheritance' % type(self).__name__
-            )
-
-#
-# Try making some callable types picklable
-#
-
-from pickle import Pickler
-from copyreg import dispatch_table
-
-class ForkingPickler(Pickler):
-    _extra_reducers = {}
-    def __init__(self, *args):
-        Pickler.__init__(self, *args)
-        self.dispatch_table = dispatch_table.copy()
-        self.dispatch_table.update(self._extra_reducers)
-    @classmethod
-    def register(cls, type, reduce):
-        cls._extra_reducers[type] = reduce
-
-    @staticmethod
-    def dumps(obj):
-        buf = io.BytesIO()
-        ForkingPickler(buf, pickle.HIGHEST_PROTOCOL).dump(obj)
-        return buf.getbuffer()
-
-    loads = pickle.loads
-
-
-def _reduce_method(m):
-    if m.__self__ is None:
-        return getattr, (m.__class__, m.__func__.__name__)
-    else:
-        return getattr, (m.__self__, m.__func__.__name__)
-class _C:
-    def f(self):
-        pass
-ForkingPickler.register(type(_C().f), _reduce_method)
-
-
-def _reduce_method_descriptor(m):
-    return getattr, (m.__objclass__, m.__name__)
-ForkingPickler.register(type(list.append), _reduce_method_descriptor)
-ForkingPickler.register(type(int.__add__), _reduce_method_descriptor)
-
-try:
-    from functools import partial
-except ImportError:
-    pass
-else:
-    def _reduce_partial(p):
-        return _rebuild_partial, (p.func, p.args, p.keywords or {})
-    def _rebuild_partial(func, args, keywords):
-        return partial(func, *args, **keywords)
-    ForkingPickler.register(partial, _reduce_partial)
-
-#
-# Unix
-#
-
-if sys.platform != 'win32':
-    duplicate = os.dup
-    close = os.close
-
-    #
-    # We define a Popen class similar to the one from subprocess, but
-    # whose constructor takes a process object as its argument.
-    #
-
-    class Popen(object):
-
-        def __init__(self, process_obj):
-            sys.stdout.flush()
-            sys.stderr.flush()
-            self.returncode = None
-
-            r, w = os.pipe()
-            self.sentinel = r
-
-            self.pid = os.fork()
-            if self.pid == 0:
-                os.close(r)
-                if 'random' in sys.modules:
-                    import random
-                    random.seed()
-                code = process_obj._bootstrap()
-                os._exit(code)
-
-            # `w` will be closed when the child exits, at which point `r`
-            # will become ready for reading (using e.g. select()).
-            os.close(w)
-            util.Finalize(self, os.close, (r,))
-
-        def poll(self, flag=os.WNOHANG):
-            if self.returncode is None:
-                while True:
-                    try:
-                        pid, sts = os.waitpid(self.pid, flag)
-                    except OSError as e:
-                        if e.errno == errno.EINTR:
-                            continue
-                        # Child process not yet created. See #1731717
-                        # e.errno == errno.ECHILD == 10
-                        return None
-                    else:
-                        break
-                if pid == self.pid:
-                    if os.WIFSIGNALED(sts):
-                        self.returncode = -os.WTERMSIG(sts)
-                    else:
-                        assert os.WIFEXITED(sts)
-                        self.returncode = os.WEXITSTATUS(sts)
-            return self.returncode
-
-        def wait(self, timeout=None):
-            if self.returncode is None:
-                if timeout is not None:
-                    from .connection import wait
-                    if not wait([self.sentinel], timeout):
-                        return None
-                # This shouldn't block if wait() returned successfully.
-                return self.poll(os.WNOHANG if timeout == 0.0 else 0)
-            return self.returncode
-
-        def terminate(self):
-            if self.returncode is None:
-                try:
-                    os.kill(self.pid, signal.SIGTERM)
-                except OSError:
-                    if self.wait(timeout=0.1) is None:
-                        raise
-
-        @staticmethod
-        def thread_is_spawning():
-            return False
-
-#
-# Windows
-#
-
-else:
-    import _thread
-    import msvcrt
-    import _winapi
-
-    from pickle import load, HIGHEST_PROTOCOL
-
-    def dump(obj, file, protocol=None):
-        ForkingPickler(file, protocol).dump(obj)
-
-    #
-    #
-    #
-
-    TERMINATE = 0x10000
-    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
-    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
-
-    close = _winapi.CloseHandle
-
-    #
-    # _python_exe is the assumed path to the python executable.
-    # People embedding Python want to modify it.
-    #
-
-    if WINSERVICE:
-        _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
-    else:
-        _python_exe = sys.executable
-
-    def set_executable(exe):
-        global _python_exe
-        _python_exe = exe
-
-    #
-    #
-    #
-
-    def duplicate(handle, target_process=None, inheritable=False):
-        if target_process is None:
-            target_process = _winapi.GetCurrentProcess()
-        return _winapi.DuplicateHandle(
-            _winapi.GetCurrentProcess(), handle, target_process,
-            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS
-            )
-
-    #
-    # We define a Popen class similar to the one from subprocess, but
-    # whose constructor takes a process object as its argument.
-    #
-
-    class Popen(object):
-        '''
-        Start a subprocess to run the code of a process object
-        '''
-        _tls = _thread._local()
-
-        def __init__(self, process_obj):
-            cmd = ' '.join('"%s"' % x for x in get_command_line())
-            prep_data = get_preparation_data(process_obj._name)
-
-            # create pipe for communication with child
-            rfd, wfd = os.pipe()
-
-            # get handle for read end of the pipe and make it inheritable
-            rhandle = duplicate(msvcrt.get_osfhandle(rfd), inheritable=True)
-            os.close(rfd)
-
-            with open(wfd, 'wb', closefd=True) as to_child:
-                # start process
-                try:
-                    hp, ht, pid, tid = _winapi.CreateProcess(
-                        _python_exe, cmd + (' %s' % rhandle),
-                        None, None, 1, 0, None, None, None
-                        )
-                    _winapi.CloseHandle(ht)
-                finally:
-                    close(rhandle)
-
-                # set attributes of self
-                self.pid = pid
-                self.returncode = None
-                self._handle = hp
-                self.sentinel = int(hp)
-                util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
-
-                # send information to child
-                Popen._tls.process_handle = int(hp)
-                try:
-                    dump(prep_data, to_child, HIGHEST_PROTOCOL)
-                    dump(process_obj, to_child, HIGHEST_PROTOCOL)
-                finally:
-                    del Popen._tls.process_handle
-
-        @staticmethod
-        def thread_is_spawning():
-            return getattr(Popen._tls, 'process_handle', None) is not None
-
-        @staticmethod
-        def duplicate_for_child(handle):
-            return duplicate(handle, Popen._tls.process_handle)
-
-        def wait(self, timeout=None):
-            if self.returncode is None:
-                if timeout is None:
-                    msecs = _winapi.INFINITE
-                else:
-                    msecs = max(0, int(timeout * 1000 + 0.5))
-
-                res = _winapi.WaitForSingleObject(int(self._handle), msecs)
-                if res == _winapi.WAIT_OBJECT_0:
-                    code = _winapi.GetExitCodeProcess(self._handle)
-                    if code == TERMINATE:
-                        code = -signal.SIGTERM
-                    self.returncode = code
-
-            return self.returncode
-
-        def poll(self):
-            return self.wait(timeout=0)
-
-        def terminate(self):
-            if self.returncode is None:
-                try:
-                    _winapi.TerminateProcess(int(self._handle), TERMINATE)
-                except OSError:
-                    if self.wait(timeout=1.0) is None:
-                        raise
-
-    #
-    #
-    #
-
-    def is_forking(argv):
-        '''
-        Return whether commandline indicates we are forking
-        '''
-        if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
-            assert len(argv) == 3
-            return True
-        else:
-            return False
-
-
-    def freeze_support():
-        '''
-        Run code for process object if this in not the main process
-        '''
-        if is_forking(sys.argv):
-            main()
-            sys.exit()
-
-
-    def get_command_line():
-        '''
-        Returns prefix of command line used for spawning a child process
-        '''
-        if getattr(process.current_process(), '_inheriting', False):
-            raise RuntimeError('''
-            Attempt to start a new process before the current process
-            has finished its bootstrapping phase.
-
-            This probably means that you are on Windows and you have
-            forgotten to use the proper idiom in the main module:
-
-                if __name__ == '__main__':
-                    freeze_support()
-                    ...
-
-            The "freeze_support()" line can be omitted if the program
-            is not going to be frozen to produce a Windows executable.''')
-
-        if getattr(sys, 'frozen', False):
-            return [sys.executable, '--multiprocessing-fork']
-        else:
-            prog = 'from multiprocessing.forking import main; main()'
-            opts = util._args_from_interpreter_flags()
-            return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
-
-
-    def main():
-        '''
-        Run code specifed by data received over pipe
-        '''
-        assert is_forking(sys.argv)
-
-        handle = int(sys.argv[-1])
-        fd = msvcrt.open_osfhandle(handle, os.O_RDONLY)
-        from_parent = os.fdopen(fd, 'rb')
-
-        process.current_process()._inheriting = True
-        preparation_data = load(from_parent)
-        prepare(preparation_data)
-        self = load(from_parent)
-        process.current_process()._inheriting = False
-
-        from_parent.close()
-
-        exitcode = self._bootstrap()
-        sys.exit(exitcode)
-
-
-    def get_preparation_data(name):
-        '''
-        Return info about parent needed by child to unpickle process object
-        '''
-        from .util import _logger, _log_to_stderr
-
-        d = dict(
-            name=name,
-            sys_path=sys.path,
-            sys_argv=sys.argv,
-            log_to_stderr=_log_to_stderr,
-            orig_dir=process.ORIGINAL_DIR,
-            authkey=process.current_process().authkey,
-            )
-
-        if _logger is not None:
-            d['log_level'] = _logger.getEffectiveLevel()
-
-        if not WINEXE and not WINSERVICE:
-            main_path = getattr(sys.modules['__main__'], '__file__', None)
-            if not main_path and sys.argv[0] not in ('', '-c'):
-                main_path = sys.argv[0]
-            if main_path is not None:
-                if not os.path.isabs(main_path) and \
-                                          process.ORIGINAL_DIR is not None:
-                    main_path = os.path.join(process.ORIGINAL_DIR, main_path)
-                d['main_path'] = os.path.normpath(main_path)
-
-        return d
-
-#
-# Prepare current process
-#
-
-old_main_modules = []
-
-def prepare(data):
-    '''
-    Try to get current process ready to unpickle process object
-    '''
-    old_main_modules.append(sys.modules['__main__'])
-
-    if 'name' in data:
-        process.current_process().name = data['name']
-
-    if 'authkey' in data:
-        process.current_process()._authkey = data['authkey']
-
-    if 'log_to_stderr' in data and data['log_to_stderr']:
-        util.log_to_stderr()
-
-    if 'log_level' in data:
-        util.get_logger().setLevel(data['log_level'])
-
-    if 'sys_path' in data:
-        sys.path = data['sys_path']
-
-    if 'sys_argv' in data:
-        sys.argv = data['sys_argv']
-
-    if 'dir' in data:
-        os.chdir(data['dir'])
-
-    if 'orig_dir' in data:
-        process.ORIGINAL_DIR = data['orig_dir']
-
-    if 'main_path' in data:
-        # XXX (ncoghlan): The following code makes several bogus
-        # assumptions regarding the relationship between __file__
-        # and a module's real name. See PEP 302 and issue #10845
-        main_path = data['main_path']
-        main_name = os.path.splitext(os.path.basename(main_path))[0]
-        if main_name == '__init__':
-            main_name = os.path.basename(os.path.dirname(main_path))
-
-        if main_name == '__main__':
-            main_module = sys.modules['__main__']
-            main_module.__file__ = main_path
-        elif main_name != 'ipython':
-            # Main modules not actually called __main__.py may
-            # contain additional code that should still be executed
-            import importlib
-            import types
-
-            if main_path is None:
-                dirs = None
-            elif os.path.basename(main_path).startswith('__init__.py'):
-                dirs = [os.path.dirname(os.path.dirname(main_path))]
-            else:
-                dirs = [os.path.dirname(main_path)]
-
-            assert main_name not in sys.modules, main_name
-            sys.modules.pop('__mp_main__', None)
-            # We should not try to load __main__
-            # since that would execute 'if __name__ == "__main__"'
-            # clauses, potentially causing a psuedo fork bomb.
-            loader = importlib.find_loader(main_name, path=dirs)
-            main_module = types.ModuleType(main_name)
-            try:
-                loader.init_module_attrs(main_module)
-            except AttributeError:  # init_module_attrs is optional
-                pass
-            main_module.__name__ = '__mp_main__'
-            code = loader.get_code(main_name)
-            exec(code, main_module.__dict__)
-
-            sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/forkserver.py b/Lib/multiprocessing/forkserver.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/forkserver.py
@@ -0,0 +1,238 @@
+import errno
+import os
+import select
+import signal
+import socket
+import struct
+import sys
+import threading
+
+from . import connection
+from . import process
+from . import reduction
+from . import spawn
+from . import util
+
+__all__ = ['ensure_running', 'get_inherited_fds', 'connect_to_new_process',
+           'set_forkserver_preload']
+
+#
+#
+#
+
+MAXFDS_TO_SEND = 256
+UNSIGNED_STRUCT = struct.Struct('Q')     # large enough for pid_t
+
+_inherited_fds = None
+_lock = threading.Lock()
+_preload_modules = ['__main__']
+
+
+#
+# Public function
+#
+
+def set_forkserver_preload(modules_names):
+    '''Set list of module names to try to load in forkserver process.'''
+    global _preload_modules
+    _preload_modules = modules_names
+
+
+def get_inherited_fds():
+    '''Return list of fds inherited from parent process.
+
+    This returns None if the current process was not started by fork server.
+    '''
+    return _inherited_fds
+
+
+def connect_to_new_process(fds):
+    '''Request forkserver to create a child process.
+
+    Returns a pair of fds (status_r, data_w).  The calling process can read
+    the child process's pid and (eventually) its returncode from status_r.
+    The calling process should write to data_w the pickled preparation and
+    process data.
+    '''
+    if len(fds) + 3 >= MAXFDS_TO_SEND:
+        raise ValueError('too many fds')
+    address, alive_w = process.current_process()._config['forkserver_info']
+    with socket.socket(socket.AF_UNIX) as client:
+        client.connect(address)
+        parent_r, child_w = util.pipe()
+        child_r, parent_w = util.pipe()
+        allfds = [child_r, child_w, alive_w]
+        allfds += fds
+        try:
+            reduction.sendfds(client, allfds)
+            return parent_r, parent_w
+        except:
+            os.close(parent_r)
+            os.close(parent_w)
+            raise
+        finally:
+            os.close(child_r)
+            os.close(child_w)
+
+
+def ensure_running():
+    '''Make sure that a fork server is running.
+
+    This can be called from any process.  Note that usually a child
+    process will just reuse the forkserver started by its parent, so
+    ensure_running() will do nothing.
+    '''
+    with _lock:
+        config = process.current_process()._config
+        if config.get('forkserver_info') is not None:
+            return
+
+        assert all(type(mod) is str for mod in _preload_modules)
+        semaphore_tracker_fd = config['semaphore_tracker_fd']
+        cmd = ('from multiprocessing.forkserver import main; ' +
+               'main(%d, %d, %r, **%r)')
+
+        if _preload_modules:
+            desired_keys = {'main_path', 'sys_path'}
+            data = spawn.get_preparation_data('ignore')
+            data = dict((x,y) for (x,y) in data.items() if x in desired_keys)
+        else:
+            data = {}
+
+        with socket.socket(socket.AF_UNIX) as listener:
+            address = connection.arbitrary_address('AF_UNIX')
+            listener.bind(address)
+            os.chmod(address, 0o600)
+            listener.listen(100)
+
+            # all client processes own the write end of the "alive" pipe;
+            # when they all terminate the read end becomes ready.
+            alive_r, alive_w = os.pipe()
+            config['forkserver_info'] = (address, alive_w)
+            fds_to_pass = [listener.fileno(), alive_r, semaphore_tracker_fd]
+            cmd %= (listener.fileno(), alive_r, _preload_modules, data)
+            exe = spawn.get_executable()
+            args = [exe] + util._args_from_interpreter_flags() + ['-c', cmd]
+            pid = util.spawnv_passfds(exe, args, fds_to_pass)
+
+
+def main(listener_fd, alive_r, preload, main_path=None, sys_path=None):
+    '''Run forkserver.'''
+    if preload:
+        if '__main__' in preload and main_path is not None:
+            process.current_process()._inheriting = True
+            try:
+                spawn.import_main_path(main_path)
+            finally:
+                del process.current_process()._inheriting
+        for modname in preload:
+            try:
+                __import__(modname)
+            except ImportError:
+                pass
+
+    # close sys.stdin
+    if sys.stdin is not None:
+        try:
+            sys.stdin.close()
+            sys.stdin = open(os.devnull)
+        except (OSError, ValueError):
+            pass
+
+    # ignoring SIGCHLD means no need to reap zombie processes
+    handler = signal.signal(signal.SIGCHLD, signal.SIG_IGN)
+    with socket.socket(socket.AF_UNIX, fileno=listener_fd) as listener:
+        readers = [listener, alive_r]
+
+        while True:
+            try:
+                rfds, wfds, xfds = select.select(readers, [], [])
+
+                if alive_r in rfds:
+                    # EOF because no more client processes left
+                    assert os.read(alive_r, 1) == b''
+                    raise SystemExit
+
+                assert listener in rfds
+                with listener.accept()[0] as s:
+                    code = 1
+                    if os.fork() == 0:
+                        try:
+                            _serve_one(s, listener, alive_r, handler)
+                        except Exception:
+                            sys.excepthook(*sys.exc_info())
+                            sys.stderr.flush()
+                        finally:
+                            os._exit(code)
+
+            except InterruptedError:
+                pass
+            except OSError as e:
+                if e.errno != errno.ECONNABORTED:
+                    raise
+
+#
+# Code to bootstrap new process
+#
+
+def _serve_one(s, listener, alive_r, handler):
+    global _inherited_fds
+
+    # close unnecessary stuff and reset SIGCHLD handler
+    listener.close()
+    os.close(alive_r)
+    signal.signal(signal.SIGCHLD, handler)
+
+    # receive fds from parent process
+    fds = reduction.recvfds(s, MAXFDS_TO_SEND + 1)
+    s.close()
+    assert len(fds) <= MAXFDS_TO_SEND
+    child_r, child_w, alive_w, *_inherited_fds = fds
+
+    # send pid to client processes
+    write_unsigned(child_w, os.getpid())
+
+    # reseed random number generator
+    if 'random' in sys.modules:
+        import random
+        random.seed()
+
+    # run process object received over pipe
+    code = spawn._main(child_r)
+
+    # write the exit code to the pipe
+    write_unsigned(child_w, code)
+
+#
+# Read and write unsigned numbers
+#
+
+def read_unsigned(fd):
+    data = b''
+    length = UNSIGNED_STRUCT.size
+    while len(data) < length:
+        while True:
+            try:
+                s = os.read(fd, length - len(data))
+            except InterruptedError:
+                pass
+            else:
+                break
+        if not s:
+            raise EOFError('unexpected EOF')
+        data += s
+    return UNSIGNED_STRUCT.unpack(data)[0]
+
+def write_unsigned(fd, n):
+    msg = UNSIGNED_STRUCT.pack(n)
+    while msg:
+        while True:
+            try:
+                nbytes = os.write(fd, msg)
+            except InterruptedError:
+                pass
+            else:
+                break
+        if nbytes == 0:
+            raise RuntimeError('should not get here')
+        msg = msg[nbytes:]
diff --git a/Lib/multiprocessing/heap.py b/Lib/multiprocessing/heap.py
--- a/Lib/multiprocessing/heap.py
+++ b/Lib/multiprocessing/heap.py
@@ -8,15 +8,17 @@
 #
 
 import bisect
+import itertools
 import mmap
 import os
 import sys
+import tempfile
 import threading
-import itertools
+import _multiprocessing
 
-import _multiprocessing
-from multiprocessing.util import Finalize, info
-from multiprocessing.forking import assert_spawning
+from . import popen
+from . import reduction
+from . import util
 
 __all__ = ['BufferWrapper']
 
@@ -30,17 +32,25 @@
 
     class Arena(object):
 
-        _counter = itertools.count()
+        _rand = tempfile._RandomNameSequence()
 
         def __init__(self, size):
             self.size = size
-            self.name = 'pym-%d-%d' % (os.getpid(), next(Arena._counter))
-            self.buffer = mmap.mmap(-1, self.size, tagname=self.name)
-            assert _winapi.GetLastError() == 0, 'tagname already in use'
+            for i in range(100):
+                name = 'pym-%d-%s' % (os.getpid(), next(self._rand))
+                buf = mmap.mmap(-1, size, tagname=name)
+                if _winapi.GetLastError() == 0:
+                    break
+                # We have reopened a preexisting mmap.
+                buf.close()
+            else:
+                raise FileExistsError('Cannot find name for new mmap')
+            self.name = name
+            self.buffer = buf
             self._state = (self.size, self.name)
 
         def __getstate__(self):
-            assert_spawning(self)
+            popen.assert_spawning(self)
             return self._state
 
         def __setstate__(self, state):
@@ -52,10 +62,28 @@
 
     class Arena(object):
 
-        def __init__(self, size):
-            self.buffer = mmap.mmap(-1, size)
+        def __init__(self, size, fd=-1):
             self.size = size
-            self.name = None
+            self.fd = fd
+            if fd == -1:
+                self.fd, name = tempfile.mkstemp(
+                     prefix='pym-%d-'%os.getpid(), dir=util.get_temp_dir())
+                os.unlink(name)
+                util.Finalize(self, os.close, (self.fd,))
+                with open(self.fd, 'wb', closefd=False) as f:
+                    f.write(b'\0'*size)
+            self.buffer = mmap.mmap(self.fd, self.size)
+
+    def reduce_arena(a):
+        if a.fd == -1:
+            raise ValueError('Arena is unpicklable because '
+                             'forking was enabled when it was created')
+        return rebuild_arena, (a.size, reduction.DupFd(a.fd))
+
+    def rebuild_arena(size, dupfd):
+        return Arena(size, dupfd.detach())
+
+    reduction.register(Arena, reduce_arena)
 
 #
 # Class allowing allocation of chunks of memory from arenas
@@ -90,7 +118,7 @@
         if i == len(self._lengths):
             length = self._roundup(max(self._size, size), mmap.PAGESIZE)
             self._size *= 2
-            info('allocating a new mmap of length %d', length)
+            util.info('allocating a new mmap of length %d', length)
             arena = Arena(length)
             self._arenas.append(arena)
             return (arena, 0, length)
@@ -216,7 +244,7 @@
         assert 0 <= size < sys.maxsize
         block = BufferWrapper._heap.malloc(size)
         self._state = (block, size)
-        Finalize(self, BufferWrapper._heap.free, args=(block,))
+        util.Finalize(self, BufferWrapper._heap.free, args=(block,))
 
     def create_memoryview(self):
         (arena, start, stop), size = self._state
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -19,11 +19,15 @@
 import array
 import queue
 
+from time import time as _time
 from traceback import format_exc
-from multiprocessing import Process, current_process, active_children, Pool, util, connection
-from multiprocessing.process import AuthenticationString
-from multiprocessing.forking import Popen, ForkingPickler
-from time import time as _time
+
+from . import connection
+from . import pool
+from . import process
+from . import popen
+from . import reduction
+from . import util
 
 #
 # Register some things for pickling
@@ -31,16 +35,14 @@
 
 def reduce_array(a):
     return array.array, (a.typecode, a.tobytes())
-ForkingPickler.register(array.array, reduce_array)
+reduction.register(array.array, reduce_array)
 
 view_types = [type(getattr({}, name)()) for name in ('items','keys','values')]
 if view_types[0] is not list:       # only needed in Py3.0
     def rebuild_as_list(obj):
         return list, (list(obj),)
     for view_type in view_types:
-        ForkingPickler.register(view_type, rebuild_as_list)
-        import copyreg
-        copyreg.pickle(view_type, rebuild_as_list)
+        reduction.register(view_type, rebuild_as_list)
 
 #
 # Type for identifying shared objects
@@ -130,7 +132,7 @@
     def __init__(self, registry, address, authkey, serializer):
         assert isinstance(authkey, bytes)
         self.registry = registry
-        self.authkey = AuthenticationString(authkey)
+        self.authkey = process.AuthenticationString(authkey)
         Listener, Client = listener_client[serializer]
 
         # do authentication later
@@ -146,7 +148,7 @@
         Run the server forever
         '''
         self.stop_event = threading.Event()
-        current_process()._manager_server = self
+        process.current_process()._manager_server = self
         try:
             accepter = threading.Thread(target=self.accepter)
             accepter.daemon = True
@@ -438,9 +440,9 @@
 
     def __init__(self, address=None, authkey=None, serializer='pickle'):
         if authkey is None:
-            authkey = current_process().authkey
+            authkey = process.current_process().authkey
         self._address = address     # XXX not final address if eg ('', 0)
-        self._authkey = AuthenticationString(authkey)
+        self._authkey = process.AuthenticationString(authkey)
         self._state = State()
         self._state.value = State.INITIAL
         self._serializer = serializer
@@ -476,7 +478,7 @@
         reader, writer = connection.Pipe(duplex=False)
 
         # spawn process which runs a server
-        self._process = Process(
+        self._process = process.Process(
             target=type(self)._run_server,
             args=(self._registry, self._address, self._authkey,
                   self._serializer, writer, initializer, initargs),
@@ -691,11 +693,11 @@
         self._Client = listener_client[serializer][1]
 
         if authkey is not None:
-            self._authkey = AuthenticationString(authkey)
+            self._authkey = process.AuthenticationString(authkey)
         elif self._manager is not None:
             self._authkey = self._manager._authkey
         else:
-            self._authkey = current_process().authkey
+            self._authkey = process.current_process().authkey
 
         if incref:
             self._incref()
@@ -704,7 +706,7 @@
 
     def _connect(self):
         util.debug('making connection to manager')
-        name = current_process().name
+        name = process.current_process().name
         if threading.current_thread().name != 'MainThread':
             name += '|' + threading.current_thread().name
         conn = self._Client(self._token.address, authkey=self._authkey)
@@ -798,7 +800,7 @@
 
     def __reduce__(self):
         kwds = {}
-        if Popen.thread_is_spawning():
+        if popen.get_spawning_popen() is not None:
             kwds['authkey'] = self._authkey
 
         if getattr(self, '_isauto', False):
@@ -835,14 +837,14 @@
 
     If possible the shared object is returned, or otherwise a proxy for it.
     '''
-    server = getattr(current_process(), '_manager_server', None)
+    server = getattr(process.current_process(), '_manager_server', None)
 
     if server and server.address == token.address:
         return server.id_to_obj[token.id][0]
     else:
         incref = (
             kwds.pop('incref', True) and
-            not getattr(current_process(), '_inheriting', False)
+            not getattr(process.current_process(), '_inheriting', False)
             )
         return func(token, serializer, incref=incref, **kwds)
 
@@ -889,7 +891,7 @@
     if authkey is None and manager is not None:
         authkey = manager._authkey
     if authkey is None:
-        authkey = current_process().authkey
+        authkey = process.current_process().authkey
 
     ProxyType = MakeProxyType('AutoProxy[%s]' % token.typeid, exposed)
     proxy = ProxyType(token, serializer, manager=manager, authkey=authkey,
@@ -1109,7 +1111,7 @@
                      AcquirerProxy)
 SyncManager.register('Condition', threading.Condition, ConditionProxy)
 SyncManager.register('Barrier', threading.Barrier, BarrierProxy)
-SyncManager.register('Pool', Pool, PoolProxy)
+SyncManager.register('Pool', pool.Pool, PoolProxy)
 SyncManager.register('list', list, ListProxy)
 SyncManager.register('dict', dict, DictProxy)
 SyncManager.register('Value', Value, ValueProxy)
diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py
--- a/Lib/multiprocessing/pool.py
+++ b/Lib/multiprocessing/pool.py
@@ -7,7 +7,7 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
-__all__ = ['Pool']
+__all__ = ['Pool', 'ThreadPool']
 
 #
 # Imports
@@ -21,8 +21,10 @@
 import time
 import traceback
 
-from multiprocessing import Process, TimeoutError
-from multiprocessing.util import Finalize, debug
+# If threading is available then ThreadPool should be provided.  Therefore
+# we avoid top-level imports which are liable to fail on some systems.
+from . import util
+from . import Process, cpu_count, TimeoutError, SimpleQueue
 
 #
 # Constants representing the state of a pool
@@ -104,11 +106,11 @@
         try:
             task = get()
         except (EOFError, OSError):
-            debug('worker got EOFError or OSError -- exiting')
+            util.debug('worker got EOFError or OSError -- exiting')
             break
 
         if task is None:
-            debug('worker got sentinel -- exiting')
+            util.debug('worker got sentinel -- exiting')
             break
 
         job, i, func, args, kwds = task
@@ -121,11 +123,11 @@
             put((job, i, result))
         except Exception as e:
             wrapped = MaybeEncodingError(e, result[1])
-            debug("Possible encoding error while sending result: %s" % (
+            util.debug("Possible encoding error while sending result: %s" % (
                 wrapped))
             put((job, i, (False, wrapped)))
         completed += 1
-    debug('worker exiting after %d tasks' % completed)
+    util.debug('worker exiting after %d tasks' % completed)
 
 #
 # Class representing a process pool
@@ -184,7 +186,7 @@
         self._result_handler._state = RUN
         self._result_handler.start()
 
-        self._terminate = Finalize(
+        self._terminate = util.Finalize(
             self, self._terminate_pool,
             args=(self._taskqueue, self._inqueue, self._outqueue, self._pool,
                   self._worker_handler, self._task_handler,
@@ -201,7 +203,7 @@
             worker = self._pool[i]
             if worker.exitcode is not None:
                 # worker exited
-                debug('cleaning up worker %d' % i)
+                util.debug('cleaning up worker %d' % i)
                 worker.join()
                 cleaned = True
                 del self._pool[i]
@@ -221,7 +223,7 @@
             w.name = w.name.replace('Process', 'PoolWorker')
             w.daemon = True
             w.start()
-            debug('added worker')
+            util.debug('added worker')
 
     def _maintain_pool(self):
         """Clean up any exited workers and start replacements for them.
@@ -230,7 +232,6 @@
             self._repopulate_pool()
 
     def _setup_queues(self):
-        from .queues import SimpleQueue
         self._inqueue = SimpleQueue()
         self._outqueue = SimpleQueue()
         self._quick_put = self._inqueue._writer.send
@@ -358,7 +359,7 @@
             time.sleep(0.1)
         # send sentinel to stop workers
         pool._taskqueue.put(None)
-        debug('worker handler exiting')
+        util.debug('worker handler exiting')
 
     @staticmethod
     def _handle_tasks(taskqueue, put, outqueue, pool):
@@ -368,36 +369,36 @@
             i = -1
             for i, task in enumerate(taskseq):
                 if thread._state:
-                    debug('task handler found thread._state != RUN')
+                    util.debug('task handler found thread._state != RUN')
                     break
                 try:
                     put(task)
                 except OSError:
-                    debug('could not put task on queue')
+                    util.debug('could not put task on queue')
                     break
             else:
                 if set_length:
-                    debug('doing set_length()')
+                    util.debug('doing set_length()')
                     set_length(i+1)
                 continue
             break
         else:
-            debug('task handler got sentinel')
+            util.debug('task handler got sentinel')
 
 
         try:
             # tell result handler to finish when cache is empty
-            debug('task handler sending sentinel to result handler')
+            util.debug('task handler sending sentinel to result handler')
             outqueue.put(None)
 
             # tell workers there is no more work
-            debug('task handler sending sentinel to workers')
+            util.debug('task handler sending sentinel to workers')
             for p in pool:
                 put(None)
         except OSError:
-            debug('task handler got OSError when sending sentinels')
+            util.debug('task handler got OSError when sending sentinels')
 
-        debug('task handler exiting')
+        util.debug('task handler exiting')
 
     @staticmethod
     def _handle_results(outqueue, get, cache):
@@ -407,16 +408,16 @@
             try:
                 task = get()
             except (OSError, EOFError):
-                debug('result handler got EOFError/OSError -- exiting')
+                util.debug('result handler got EOFError/OSError -- exiting')
                 return
 
             if thread._state:
                 assert thread._state == TERMINATE
-                debug('result handler found thread._state=TERMINATE')
+                util.debug('result handler found thread._state=TERMINATE')
                 break
 
             if task is None:
-                debug('result handler got sentinel')
+                util.debug('result handler got sentinel')
                 break
 
             job, i, obj = task
@@ -429,11 +430,11 @@
             try:
                 task = get()
             except (OSError, EOFError):
-                debug('result handler got EOFError/OSError -- exiting')
+                util.debug('result handler got EOFError/OSError -- exiting')
                 return
 
             if task is None:
-                debug('result handler ignoring extra sentinel')
+                util.debug('result handler ignoring extra sentinel')
                 continue
             job, i, obj = task
             try:
@@ -442,7 +443,7 @@
                 pass
 
         if hasattr(outqueue, '_reader'):
-            debug('ensuring that outqueue is not full')
+            util.debug('ensuring that outqueue is not full')
             # If we don't make room available in outqueue then
             # attempts to add the sentinel (None) to outqueue may
             # block.  There is guaranteed to be no more than 2 sentinels.
@@ -454,7 +455,7 @@
             except (OSError, EOFError):
                 pass
 
-        debug('result handler exiting: len(cache)=%s, thread._state=%s',
+        util.debug('result handler exiting: len(cache)=%s, thread._state=%s',
               len(cache), thread._state)
 
     @staticmethod
@@ -472,19 +473,19 @@
               )
 
     def close(self):
-        debug('closing pool')
+        util.debug('closing pool')
         if self._state == RUN:
             self._state = CLOSE
             self._worker_handler._state = CLOSE
 
     def terminate(self):
-        debug('terminating pool')
+        util.debug('terminating pool')
         self._state = TERMINATE
         self._worker_handler._state = TERMINATE
         self._terminate()
 
     def join(self):
-        debug('joining pool')
+        util.debug('joining pool')
         assert self._state in (CLOSE, TERMINATE)
         self._worker_handler.join()
         self._task_handler.join()
@@ -495,7 +496,7 @@
     @staticmethod
     def _help_stuff_finish(inqueue, task_handler, size):
         # task_handler may be blocked trying to put items on inqueue
-        debug('removing tasks from inqueue until task handler finished')
+        util.debug('removing tasks from inqueue until task handler finished')
         inqueue._rlock.acquire()
         while task_handler.is_alive() and inqueue._reader.poll():
             inqueue._reader.recv()
@@ -505,12 +506,12 @@
     def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool,
                         worker_handler, task_handler, result_handler, cache):
         # this is guaranteed to only be called once
-        debug('finalizing pool')
+        util.debug('finalizing pool')
 
         worker_handler._state = TERMINATE
         task_handler._state = TERMINATE
 
-        debug('helping task handler/workers to finish')
+        util.debug('helping task handler/workers to finish')
         cls._help_stuff_finish(inqueue, task_handler, len(pool))
 
         assert result_handler.is_alive() or len(cache) == 0
@@ -520,31 +521,31 @@
 
         # We must wait for the worker handler to exit before terminating
         # workers because we don't want workers to be restarted behind our back.
-        debug('joining worker handler')
+        util.debug('joining worker handler')
         if threading.current_thread() is not worker_handler:
             worker_handler.join()
 
         # Terminate workers which haven't already finished.
         if pool and hasattr(pool[0], 'terminate'):
-            debug('terminating workers')
+            util.debug('terminating workers')
             for p in pool:
                 if p.exitcode is None:
                     p.terminate()
 
-        debug('joining task handler')
+        util.debug('joining task handler')
         if threading.current_thread() is not task_handler:
             task_handler.join()
 
-        debug('joining result handler')
+        util.debug('joining result handler')
         if threading.current_thread() is not result_handler:
             result_handler.join()
 
         if pool and hasattr(pool[0], 'terminate'):
-            debug('joining pool workers')
+            util.debug('joining pool workers')
             for p in pool:
                 if p.is_alive():
                     # worker has not yet exited
-                    debug('cleaning up worker %d' % p.pid)
+                    util.debug('cleaning up worker %d' % p.pid)
                     p.join()
 
     def __enter__(self):
@@ -730,7 +731,10 @@
 
 class ThreadPool(Pool):
 
-    from .dummy import Process
+    @staticmethod
+    def Process(*args, **kwds):
+        from .dummy import Process
+        return Process(*args, **kwds)
 
     def __init__(self, processes=None, initializer=None, initargs=()):
         Pool.__init__(self, processes, initializer, initargs)
diff --git a/Lib/multiprocessing/popen.py b/Lib/multiprocessing/popen.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/popen.py
@@ -0,0 +1,78 @@
+import sys
+import threading
+
+__all__ = ['Popen', 'get_spawning_popen', 'set_spawning_popen',
+           'assert_spawning']
+
+#
+# Check that the current thread is spawning a child process
+#
+
+_tls = threading.local()
+
+def get_spawning_popen():
+    return getattr(_tls, 'spawning_popen', None)
+
+def set_spawning_popen(popen):
+    _tls.spawning_popen = popen
+
+def assert_spawning(obj):
+    if get_spawning_popen() is None:
+        raise RuntimeError(
+            '%s objects should only be shared between processes'
+            ' through inheritance' % type(obj).__name__
+            )
+
+#
+#
+#
+
+_Popen = None
+
+def Popen(process_obj):
+    if _Popen is None:
+        set_start_method()
+    return _Popen(process_obj)
+
+def get_start_method():
+    if _Popen is None:
+        set_start_method()
+    return _Popen.method
+
+def set_start_method(meth=None, *, start_helpers=True):
+    global _Popen
+    try:
+        modname = _method_to_module[meth]
+        __import__(modname)
+    except (KeyError, ImportError):
+        raise ValueError('could not use start method %r' % meth)
+    module = sys.modules[modname]
+    if start_helpers:
+        module.Popen.ensure_helpers_running()
+    _Popen = module.Popen
+
+
+if sys.platform == 'win32':
+
+    _method_to_module = {
+        None: 'multiprocessing.popen_spawn_win32',
+        'spawn': 'multiprocessing.popen_spawn_win32',
+        }
+
+    def get_all_start_methods():
+        return ['spawn']
+
+else:
+    _method_to_module = {
+        None: 'multiprocessing.popen_fork',
+        'fork': 'multiprocessing.popen_fork',
+        'spawn': 'multiprocessing.popen_spawn_posix',
+        'forkserver': 'multiprocessing.popen_forkserver',
+        }
+
+    def get_all_start_methods():
+        from . import reduction
+        if reduction.HAVE_SEND_HANDLE:
+            return ['fork', 'spawn', 'forkserver']
+        else:
+            return ['fork', 'spawn']
diff --git a/Lib/multiprocessing/popen_fork.py b/Lib/multiprocessing/popen_fork.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/popen_fork.py
@@ -0,0 +1,87 @@
+import os
+import sys
+import signal
+import errno
+
+from . import util
+
+__all__ = ['Popen']
+
+#
+# Start child process using fork
+#
+
+class Popen(object):
+    method = 'fork'
+
+    def __init__(self, process_obj):
+        sys.stdout.flush()
+        sys.stderr.flush()
+        self.returncode = None
+        self._launch(process_obj)
+
+    def duplicate_for_child(self, fd):
+        return fd
+
+    def poll(self, flag=os.WNOHANG):
+        if self.returncode is None:
+            while True:
+                try:
+                    pid, sts = os.waitpid(self.pid, flag)
+                except OSError as e:
+                    if e.errno == errno.EINTR:
+                        continue
+                    # Child process not yet created. See #1731717
+                    # e.errno == errno.ECHILD == 10
+                    return None
+                else:
+                    break
+            if pid == self.pid:
+                if os.WIFSIGNALED(sts):
+                    self.returncode = -os.WTERMSIG(sts)
+                else:
+                    assert os.WIFEXITED(sts)
+                    self.returncode = os.WEXITSTATUS(sts)
+        return self.returncode
+
+    def wait(self, timeout=None):
+        if self.returncode is None:
+            if timeout is not None:
+                from .connection import wait
+                if not wait([self.sentinel], timeout):
+                    return None
+            # This shouldn't block if wait() returned successfully.
+            return self.poll(os.WNOHANG if timeout == 0.0 else 0)
+        return self.returncode
+
+    def terminate(self):
+        if self.returncode is None:
+            try:
+                os.kill(self.pid, signal.SIGTERM)
+            except ProcessLookupError:
+                pass
+            except OSError:
+                if self.wait(timeout=0.1) is None:
+                    raise
+
+    def _launch(self, process_obj):
+        code = 1
+        parent_r, child_w = util.pipe()
+        self.pid = os.fork()
+        if self.pid == 0:
+            try:
+                os.close(parent_r)
+                if 'random' in sys.modules:
+                    import random
+                    random.seed()
+                code = process_obj._bootstrap()
+            finally:
+                os._exit(code)
+        else:
+            os.close(child_w)
+            util.Finalize(self, os.close, (parent_r,))
+            self.sentinel = parent_r
+
+    @staticmethod
+    def ensure_helpers_running():
+        pass
diff --git a/Lib/multiprocessing/popen_forkserver.py b/Lib/multiprocessing/popen_forkserver.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/popen_forkserver.py
@@ -0,0 +1,75 @@
+import io
+import os
+
+from . import reduction
+if not reduction.HAVE_SEND_HANDLE:
+    raise ImportError('No support for sending fds between processes')
+from . import forkserver
+from . import popen
+from . import popen_fork
+from . import spawn
+from . import util
+
+
+__all__ = ['Popen']
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+    def __init__(self, ind):
+        self.ind = ind
+    def detach(self):
+        return forkserver.get_inherited_fds()[self.ind]
+
+#
+# Start child process using a server process
+#
+
+class Popen(popen_fork.Popen):
+    method = 'forkserver'
+    DupFd = _DupFd
+
+    def __init__(self, process_obj):
+        self._fds = []
+        super().__init__(process_obj)
+
+    def duplicate_for_child(self, fd):
+        self._fds.append(fd)
+        return len(self._fds) - 1
+
+    def _launch(self, process_obj):
+        prep_data = spawn.get_preparation_data(process_obj._name)
+        buf = io.BytesIO()
+        popen.set_spawning_popen(self)
+        try:
+            reduction.dump(prep_data, buf)
+            reduction.dump(process_obj, buf)
+        finally:
+            popen.set_spawning_popen(None)
+
+        self.sentinel, w = forkserver.connect_to_new_process(self._fds)
+        util.Finalize(self, os.close, (self.sentinel,))
+        with open(w, 'wb', closefd=True) as f:
+            f.write(buf.getbuffer())
+        self.pid = forkserver.read_unsigned(self.sentinel)
+
+    def poll(self, flag=os.WNOHANG):
+        if self.returncode is None:
+            from .connection import wait
+            timeout = 0 if flag == os.WNOHANG else None
+            if not wait([self.sentinel], timeout):
+                return None
+            try:
+                self.returncode = forkserver.read_unsigned(self.sentinel)
+            except (OSError, EOFError):
+                # The process ended abnormally perhaps because of a signal
+                self.returncode = 255
+        return self.returncode
+
+    @staticmethod
+    def ensure_helpers_running():
+        from . import semaphore_tracker
+        semaphore_tracker.ensure_running()
+        forkserver.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_posix.py b/Lib/multiprocessing/popen_spawn_posix.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_posix.py
@@ -0,0 +1,75 @@
+import fcntl
+import io
+import os
+
+from . import popen
+from . import popen_fork
+from . import reduction
+from . import spawn
+from . import util
+
+from . import current_process
+
+__all__ = ['Popen']
+
+
+#
+# Wrapper for an fd used while launching a process
+#
+
+class _DupFd(object):
+    def __init__(self, fd):
+        self.fd = fd
+    def detach(self):
+        return self.fd
+
+#
+# Start child process using a fresh interpreter
+#
+
+class Popen(popen_fork.Popen):
+    method = 'spawn'
+    DupFd = _DupFd
+
+    def __init__(self, process_obj):
+        self._fds = []
+        super().__init__(process_obj)
+
+    def duplicate_for_child(self, fd):
+        self._fds.append(fd)
+        return fd
+
+    def _launch(self, process_obj):
+        tracker_fd = current_process()._config['semaphore_tracker_fd']
+        self._fds.append(tracker_fd)
+        prep_data = spawn.get_preparation_data(process_obj._name)
+        fp = io.BytesIO()
+        popen.set_spawning_popen(self)
+        try:
+            reduction.dump(prep_data, fp)
+            reduction.dump(process_obj, fp)
+        finally:
+            popen.set_spawning_popen(None)
+
+        parent_r = child_w = child_r = parent_w = None
+        try:
+            parent_r, child_w = util.pipe()
+            child_r, parent_w = util.pipe()
+            cmd = spawn.get_command_line() + [str(child_r)]
+            self._fds.extend([child_r, child_w])
+            self.pid = util.spawnv_passfds(spawn.get_executable(),
+                                           cmd, self._fds)
+            self.sentinel = parent_r
+            with open(parent_w, 'wb', closefd=False) as f:
+                f.write(fp.getbuffer())
+        finally:
+            if parent_r is not None:
+                util.Finalize(self, os.close, (parent_r,))
+            for fd in (child_r, child_w, parent_w):
+                if fd is not None:
+                    os.close(fd)
+
+    @staticmethod
+    def ensure_helpers_running():
+        from . import semaphore_tracker
+        semaphore_tracker.ensure_running()
diff --git a/Lib/multiprocessing/popen_spawn_win32.py b/Lib/multiprocessing/popen_spawn_win32.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/popen_spawn_win32.py
@@ -0,0 +1,102 @@
+import os
+import msvcrt
+import signal
+import sys
+import _winapi
+
+from . import spawn
+from . import popen
+from . import reduction
+from . import util
+
+__all__ = ['Popen']
+
+#
+#
+#
+
+TERMINATE = 0x10000
+WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+#
+# We define a Popen class similar to the one from subprocess, but
+# whose constructor takes a process object as its argument.
+#
+
+class Popen(object):
+    '''
+    Start a subprocess to run the code of a process object
+    '''
+    method = 'spawn'
+
+    def __init__(self, process_obj):
+        prep_data = spawn.get_preparation_data(process_obj._name)
+        cmd = ' '.join('"%s"' % x for x in spawn.get_command_line())
+
+        # read end of pipe will be "stolen" by the child process
+        # -- see spawn_main() in spawn.py.
+        rhandle, whandle = _winapi.CreatePipe(None, 0)
+        wfd = msvcrt.open_osfhandle(whandle, 0)
+        cmd += ' {} {}'.format(os.getpid(), rhandle)
+
+        with open(wfd, 'wb', closefd=True) as to_child:
+            # start process
+            try:
+                hp, ht, pid, tid = _winapi.CreateProcess(
+                    spawn.get_executable(), cmd,
+                    None, None, False, 0, None, None, None)
+                _winapi.CloseHandle(ht)
+            except:
+                _winapi.CloseHandle(rhandle)
+                raise
+
+            # set attributes of self
+            self.pid = pid
+            self.returncode = None
+            self._handle = hp
+            self.sentinel = int(hp)
+            util.Finalize(self, _winapi.CloseHandle, (self.sentinel,))
+
+            # send information to child
+            popen.set_spawning_popen(self)
+            try:
+                reduction.dump(prep_data, to_child)
+                reduction.dump(process_obj, to_child)
+            finally:
+                popen.set_spawning_popen(None)
+
+    def duplicate_for_child(self, handle):
+        assert self is popen.get_spawning_popen()
+        return reduction.duplicate(handle, self.sentinel)
+
+    def wait(self, timeout=None):
+        if self.returncode is None:
+            if timeout is None:
+                msecs = _winapi.INFINITE
+            else:
+                msecs = max(0, int(timeout * 1000 + 0.5))
+
+            res = _winapi.WaitForSingleObject(int(self._handle), msecs)
+            if res == _winapi.WAIT_OBJECT_0:
+                code = _winapi.GetExitCodeProcess(self._handle)
+                if code == TERMINATE:
+                    code = -signal.SIGTERM
+                self.returncode = code
+
+        return self.returncode
+
+    def poll(self):
+        return self.wait(timeout=0)
+
+    def terminate(self):
+        if self.returncode is None:
+            try:
+                _winapi.TerminateProcess(int(self._handle), TERMINATE)
+            except OSError:
+                if self.wait(timeout=1.0) is None:
+                    raise
+
+    @staticmethod
+    def ensure_helpers_running():
+        pass
diff --git a/Lib/multiprocessing/process.py b/Lib/multiprocessing/process.py
--- a/Lib/multiprocessing/process.py
+++ b/Lib/multiprocessing/process.py
@@ -43,7 +43,7 @@
     Return list of process objects corresponding to live child processes
     '''
     _cleanup()
-    return list(_current_process._children)
+    return list(_children)
 
 #
 #
@@ -51,9 +51,9 @@
 
 def _cleanup():
     # check for processes which have finished
-    for p in list(_current_process._children):
+    for p in list(_children):
         if p._popen.poll() is not None:
-            _current_process._children.discard(p)
+            _children.discard(p)
 
 #
 # The `Process` class
@@ -63,21 +63,16 @@
     '''
     Process objects represent activity that is run in a separate process
 
-    The class is analagous to `threading.Thread`
+    The class is analogous to `threading.Thread`
     '''
     _Popen = None
 
     def __init__(self, group=None, target=None, name=None, args=(), kwargs={},
                  *, daemon=None):
         assert group is None, 'group argument must be None for now'
-        count = next(_current_process._counter)
+        count = next(_process_counter)
         self._identity = _current_process._identity + (count,)
-        self._authkey = _current_process._authkey
-        if daemon is not None:
-            self._daemonic = daemon
-        else:
-            self._daemonic = _current_process._daemonic
-        self._tempdir = _current_process._tempdir
+        self._config = _current_process._config.copy()
         self._parent_pid = os.getpid()
         self._popen = None
         self._target = target
@@ -85,6 +80,8 @@
         self._kwargs = dict(kwargs)
         self._name = name or type(self).__name__ + '-' + \
                      ':'.join(str(i) for i in self._identity)
+        if daemon is not None:
+            self.daemon = daemon
         _dangling.add(self)
 
     def run(self):
@@ -101,16 +98,16 @@
         assert self._popen is None, 'cannot start a process twice'
         assert self._parent_pid == os.getpid(), \
                'can only start a process object created by current process'
-        assert not _current_process._daemonic, \
+        assert not _current_process._config.get('daemon'), \
                'daemonic processes are not allowed to have children'
         _cleanup()
         if self._Popen is not None:
             Popen = self._Popen
         else:
-            from .forking import Popen
+            from .popen import Popen
         self._popen = Popen(self)
         self._sentinel = self._popen.sentinel
-        _current_process._children.add(self)
+        _children.add(self)
 
     def terminate(self):
         '''
@@ -126,7 +123,7 @@
         assert self._popen is not None, 'can only join a started process'
         res = self._popen.wait(timeout)
         if res is not None:
-            _current_process._children.discard(self)
+            _children.discard(self)
 
     def is_alive(self):
         '''
@@ -154,7 +151,7 @@
         '''
         Return whether process is a daemon
         '''
-        return self._daemonic
+        return self._config.get('daemon', False)
 
     @daemon.setter
     def daemon(self, daemonic):
@@ -162,18 +159,18 @@
         Set whether process is a daemon
         '''
         assert self._popen is None, 'process has already started'
-        self._daemonic = daemonic
+        self._config['daemon'] = daemonic
 
     @property
     def authkey(self):
-        return self._authkey
+        return self._config['authkey']
 
     @authkey.setter
     def authkey(self, authkey):
         '''
         Set authorization key of process
         '''
-        self._authkey = AuthenticationString(authkey)
+        self._config['authkey'] = AuthenticationString(authkey)
 
     @property
     def exitcode(self):
@@ -227,17 +224,17 @@
                 status = 'stopped[%s]' % _exitcode_to_name.get(status, status)
 
         return '<%s(%s, %s%s)>' % (type(self).__name__, self._name,
-                                   status, self._daemonic and ' daemon' or '')
+                                   status, self.daemon and ' daemon' or '')
 
     ##
 
     def _bootstrap(self):
         from . import util
-        global _current_process
+        global _current_process, _process_counter, _children
 
         try:
-            self._children = set()
-            self._counter = itertools.count(1)
+            _process_counter = itertools.count(1)
+            _children = set()
             if sys.stdin is not None:
                 try:
                     sys.stdin.close()
@@ -285,8 +282,8 @@
 
 class AuthenticationString(bytes):
     def __reduce__(self):
-        from .forking import Popen
-        if not Popen.thread_is_spawning():
+        from .popen import get_spawning_popen
+        if get_spawning_popen() is None:
             raise TypeError(
                 'Pickling an AuthenticationString object is '
                 'disallowed for security reasons'
@@ -301,16 +298,19 @@
 
     def __init__(self):
         self._identity = ()
-        self._daemonic = False
         self._name = 'MainProcess'
         self._parent_pid = None
         self._popen = None
-        self._counter = itertools.count(1)
-        self._children = set()
-        self._authkey = AuthenticationString(os.urandom(32))
-        self._tempdir = None
+        self._config = {'authkey': AuthenticationString(os.urandom(32)),
+                        'semprefix': 'mp'}
+        # Note that some versions of FreeBSD only allow named
+        # semaphores to have names of up to 14 characters.  Therfore
+        # we choose a short prefix.
+
 
 _current_process = _MainProcess()
+_process_counter = itertools.count(1)
+_children = set()
 del _MainProcess
 
 #
diff --git a/Lib/multiprocessing/queues.py b/Lib/multiprocessing/queues.py
--- a/Lib/multiprocessing/queues.py
+++ b/Lib/multiprocessing/queues.py
@@ -18,11 +18,15 @@
 import errno
 
 from queue import Empty, Full
+
 import _multiprocessing
-from multiprocessing.connection import Pipe
-from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
-from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning, ForkingPickler
+
+from . import connection
+from . import popen
+from . import synchronize
+
+from .util import debug, info, Finalize, register_after_fork, is_exiting
+from .reduction import ForkingPickler
 
 #
 # Queue type using a pipe, buffer and thread
@@ -34,14 +38,14 @@
         if maxsize <= 0:
             maxsize = _multiprocessing.SemLock.SEM_VALUE_MAX
         self._maxsize = maxsize
-        self._reader, self._writer = Pipe(duplex=False)
-        self._rlock = Lock()
+        self._reader, self._writer = connection.Pipe(duplex=False)
+        self._rlock = synchronize.Lock()
         self._opid = os.getpid()
         if sys.platform == 'win32':
             self._wlock = None
         else:
-            self._wlock = Lock()
-        self._sem = BoundedSemaphore(maxsize)
+            self._wlock = synchronize.Lock()
+        self._sem = synchronize.BoundedSemaphore(maxsize)
         # For use by concurrent.futures
         self._ignore_epipe = False
 
@@ -51,7 +55,7 @@
             register_after_fork(self, Queue._after_fork)
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         return (self._ignore_epipe, self._maxsize, self._reader, self._writer,
                 self._rlock, self._wlock, self._sem, self._opid)
 
@@ -208,8 +212,6 @@
     @staticmethod
     def _feed(buffer, notempty, send_bytes, writelock, close, ignore_epipe):
         debug('starting thread to feed data to pipe')
-        from .util import is_exiting
-
         nacquire = notempty.acquire
         nrelease = notempty.release
         nwait = notempty.wait
@@ -279,8 +281,8 @@
 
     def __init__(self, maxsize=0):
         Queue.__init__(self, maxsize)
-        self._unfinished_tasks = Semaphore(0)
-        self._cond = Condition()
+        self._unfinished_tasks = synchronize.Semaphore(0)
+        self._cond = synchronize.Condition()
 
     def __getstate__(self):
         return Queue.__getstate__(self) + (self._cond, self._unfinished_tasks)
@@ -331,19 +333,19 @@
 class SimpleQueue(object):
 
     def __init__(self):
-        self._reader, self._writer = Pipe(duplex=False)
-        self._rlock = Lock()
+        self._reader, self._writer = connection.Pipe(duplex=False)
+        self._rlock = synchronize.Lock()
         self._poll = self._reader.poll
         if sys.platform == 'win32':
             self._wlock = None
         else:
-            self._wlock = Lock()
+            self._wlock = synchronize.Lock()
 
     def empty(self):
         return not self._poll()
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         return (self._reader, self._writer, self._rlock, self._wlock)
 
     def __setstate__(self, state):
diff --git a/Lib/multiprocessing/reduction.py b/Lib/multiprocessing/reduction.py
--- a/Lib/multiprocessing/reduction.py
+++ b/Lib/multiprocessing/reduction.py
@@ -1,6 +1,5 @@
 #
-# Module to allow connection and socket objects to be transferred
-# between processes
+# Module which deals with pickling of objects.
 #
 # multiprocessing/reduction.py
 #
@@ -8,27 +7,57 @@
 # Licensed to PSF under a Contributor Agreement.
 #
 
-__all__ = ['reduce_socket', 'reduce_connection', 'send_handle', 'recv_handle']
+import copyreg
+import functools
+import io
+import os
+import pickle
+import socket
+import sys
 
-import os
-import sys
-import socket
-import threading
-import struct
-import signal
+from . import popen
+from . import util
 
-from multiprocessing import current_process
-from multiprocessing.util import register_after_fork, debug, sub_debug
-from multiprocessing.util import is_exiting, sub_warning
+__all__ = ['send_handle', 'recv_handle', 'ForkingPickler', 'register', 'dump']
 
 
+HAVE_SEND_HANDLE = (sys.platform == 'win32' or
+                    (hasattr(socket, 'CMSG_LEN') and
+                     hasattr(socket, 'SCM_RIGHTS') and
+                     hasattr(socket.socket, 'sendmsg')))
+
 #
-#
+# Pickler subclass
 #
 
-if not(sys.platform == 'win32' or (hasattr(socket, 'CMSG_LEN') and
-                                   hasattr(socket, 'SCM_RIGHTS'))):
-    raise ImportError('pickling of connections not supported')
+class ForkingPickler(pickle.Pickler):
+    '''Pickler subclass used by multiprocessing.'''
+    _extra_reducers = {}
+    _copyreg_dispatch_table = copyreg.dispatch_table
+
+    def __init__(self, *args):
+        super().__init__(*args)
+        self.dispatch_table = self._copyreg_dispatch_table.copy()
+        self.dispatch_table.update(self._extra_reducers)
+
+    @classmethod
+    def register(cls, type, reduce):
+        '''Register a reduce function for a type.'''
+        cls._extra_reducers[type] = reduce
+
+    @classmethod
+    def dumps(cls, obj, protocol=None):
+        buf = io.BytesIO()
+        cls(buf, protocol).dump(obj)
+        return buf.getbuffer()
+
+    loads = pickle.loads
+
+register = ForkingPickler.register
+
+def dump(obj, file, protocol=None):
+    '''Replacement for pickle.dump() using ForkingPickler.'''
+    ForkingPickler(file, protocol).dump(obj)
 
 #
 # Platform specific definitions
@@ -36,20 +65,44 @@
 
 if sys.platform == 'win32':
     # Windows
-    __all__ += ['reduce_pipe_connection']
+    __all__ += ['DupHandle', 'duplicate', 'steal_handle']
     import _winapi
 
+    def duplicate(handle, target_process=None, inheritable=False):
+        '''Duplicate a handle.  (target_process is a handle not a pid!)'''
+        if target_process is None:
+            target_process = _winapi.GetCurrentProcess()
+        return _winapi.DuplicateHandle(
+            _winapi.GetCurrentProcess(), handle, target_process,
+            0, inheritable, _winapi.DUPLICATE_SAME_ACCESS)
+
+    def steal_handle(source_pid, handle):
+        '''Steal a handle from process identified by source_pid.'''
+        source_process_handle = _winapi.OpenProcess(
+            _winapi.PROCESS_DUP_HANDLE, False, source_pid)
+        try:
+            return _winapi.DuplicateHandle(
+                source_process_handle, handle,
+                _winapi.GetCurrentProcess(), 0, False,
+                _winapi.DUPLICATE_SAME_ACCESS | _winapi.DUPLICATE_CLOSE_SOURCE)
+        finally:
+            _winapi.CloseHandle(source_process_handle)
+
     def send_handle(conn, handle, destination_pid):
+        '''Send a handle over a local connection.'''
         dh = DupHandle(handle, _winapi.DUPLICATE_SAME_ACCESS, destination_pid)
         conn.send(dh)
 
     def recv_handle(conn):
+        '''Receive a handle over a local connection.'''
         return conn.recv().detach()
 
     class DupHandle(object):
+        '''Picklable wrapper for a handle.'''
         def __init__(self, handle, access, pid=None):
-            # duplicate handle for process with given pid
             if pid is None:
+                # We just duplicate the handle in the current process and
+                # let the receiving process steal the handle.
                 pid = os.getpid()
             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False, pid)
             try:
@@ -62,9 +115,12 @@
             self._pid = pid
 
         def detach(self):
+            '''Get the handle.  This should only be called once.'''
             # retrieve handle from process which currently owns it
             if self._pid == os.getpid():
+                # The handle has already been duplicated for this process.
                 return self._handle
+            # We must steal the handle from the process whose pid is self._pid.
             proc = _winapi.OpenProcess(_winapi.PROCESS_DUP_HANDLE, False,
                                        self._pid)
             try:
@@ -74,207 +130,112 @@
             finally:
                 _winapi.CloseHandle(proc)
 
-    class DupSocket(object):
-        def __init__(self, sock):
-            new_sock = sock.dup()
-            def send(conn, pid):
-                share = new_sock.share(pid)
-                conn.send_bytes(share)
-            self._id = resource_sharer.register(send, new_sock.close)
-
-        def detach(self):
-            conn = resource_sharer.get_connection(self._id)
-            try:
-                share = conn.recv_bytes()
-                return socket.fromshare(share)
-            finally:
-                conn.close()
-
-    def reduce_socket(s):
-        return rebuild_socket, (DupSocket(s),)
-
-    def rebuild_socket(ds):
-        return ds.detach()
-
-    def reduce_connection(conn):
-        handle = conn.fileno()
-        with socket.fromfd(handle, socket.AF_INET, socket.SOCK_STREAM) as s:
-            ds = DupSocket(s)
-            return rebuild_connection, (ds, conn.readable, conn.writable)
-
-    def rebuild_connection(ds, readable, writable):
-        from .connection import Connection
-        sock = ds.detach()
-        return Connection(sock.detach(), readable, writable)
-
-    def reduce_pipe_connection(conn):
-        access = ((_winapi.FILE_GENERIC_READ if conn.readable else 0) |
-                  (_winapi.FILE_GENERIC_WRITE if conn.writable else 0))
-        dh = DupHandle(conn.fileno(), access)
-        return rebuild_pipe_connection, (dh, conn.readable, conn.writable)
-
-    def rebuild_pipe_connection(dh, readable, writable):
-        from .connection import PipeConnection
-        handle = dh.detach()
-        return PipeConnection(handle, readable, writable)
-
 else:
     # Unix
+    __all__ += ['DupFd', 'sendfds', 'recvfds']
+    import array
 
     # On MacOSX we should acknowledge receipt of fds -- see Issue14669
     ACKNOWLEDGE = sys.platform == 'darwin'
 
-    def send_handle(conn, handle, destination_pid):
-        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
-            s.sendmsg([b'x'], [(socket.SOL_SOCKET, socket.SCM_RIGHTS,
-                                struct.pack("@i", handle))])
-        if ACKNOWLEDGE and conn.recv_bytes() != b'ACK':
+    def sendfds(sock, fds):
+        '''Send an array of fds over an AF_UNIX socket.'''
+        fds = array.array('i', fds)
+        msg = bytes([len(fds) % 256])
+        sock.sendmsg([msg], [(socket.SOL_SOCKET, socket.SCM_RIGHTS, fds)])
+        if ACKNOWLEDGE and sock.recv(1) != b'A':
             raise RuntimeError('did not receive acknowledgement of fd')
 
+    def recvfds(sock, size):
+        '''Receive an array of fds over an AF_UNIX socket.'''
+        a = array.array('i')
+        bytes_size = a.itemsize * size
+        msg, ancdata, flags, addr = sock.recvmsg(1, socket.CMSG_LEN(bytes_size))
+        if not msg and not ancdata:
+            raise EOFError
+        try:
+            if ACKNOWLEDGE:
+                sock.send(b'A')
+            if len(ancdata) != 1:
+                raise RuntimeError('received %d items of ancdata' %
+                                   len(ancdata))
+            cmsg_level, cmsg_type, cmsg_data = ancdata[0]
+            if (cmsg_level == socket.SOL_SOCKET and
+                cmsg_type == socket.SCM_RIGHTS):
+                if len(cmsg_data) % a.itemsize != 0:
+                    raise ValueError
+                a.frombytes(cmsg_data)
+                assert len(a) % 256 == msg[0]
+                return list(a)
+        except (ValueError, IndexError):
+            pass
+        raise RuntimeError('Invalid data received')
+
+    def send_handle(conn, handle, destination_pid):
+        '''Send a handle over a local connection.'''
+        with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
+            sendfds(s, [handle])
+
     def recv_handle(conn):
-        size = struct.calcsize("@i")
+        '''Receive a handle over a local connection.'''
         with socket.fromfd(conn.fileno(), socket.AF_UNIX, socket.SOCK_STREAM) as s:
-            msg, ancdata, flags, addr = s.recvmsg(1, socket.CMSG_LEN(size))
-            try:
-                if ACKNOWLEDGE:
-                    conn.send_bytes(b'ACK')
-                cmsg_level, cmsg_type, cmsg_data = ancdata[0]
-                if (cmsg_level == socket.SOL_SOCKET and
-                    cmsg_type == socket.SCM_RIGHTS):
-                    return struct.unpack("@i", cmsg_data[:size])[0]
-            except (ValueError, IndexError, struct.error):
-                pass
-            raise RuntimeError('Invalid data received')
+            return recvfds(s, 1)[0]
 
-    class DupFd(object):
-        def __init__(self, fd):
-            new_fd = os.dup(fd)
-            def send(conn, pid):
-                send_handle(conn, new_fd, pid)
-            def close():
-                os.close(new_fd)
-            self._id = resource_sharer.register(send, close)
-
-        def detach(self):
-            conn = resource_sharer.get_connection(self._id)
-            try:
-                return recv_handle(conn)
-            finally:
-                conn.close()
-
-    def reduce_socket(s):
-        df = DupFd(s.fileno())
-        return rebuild_socket, (df, s.family, s.type, s.proto)
-
-    def rebuild_socket(df, family, type, proto):
-        fd = df.detach()
-        s = socket.fromfd(fd, family, type, proto)
-        os.close(fd)
-        return s
-
-    def reduce_connection(conn):
-        df = DupFd(conn.fileno())
-        return rebuild_connection, (df, conn.readable, conn.writable)
-
-    def rebuild_connection(df, readable, writable):
-        from .connection import Connection
-        fd = df.detach()
-        return Connection(fd, readable, writable)
+    def DupFd(fd):
+        '''Return a wrapper for an fd.'''
+        popen_obj = popen.get_spawning_popen()
+        if popen_obj is not None:
+            return popen_obj.DupFd(popen_obj.duplicate_for_child(fd))
+        elif HAVE_SEND_HANDLE:
+            from . import resource_sharer
+            return resource_sharer.DupFd(fd)
+        else:
+            raise ValueError('SCM_RIGHTS appears not to be available')
 
 #
-# Server which shares registered resources with clients
+# Try making some callable types picklable
 #
 
-class ResourceSharer(object):
-    def __init__(self):
-        self._key = 0
-        self._cache = {}
-        self._old_locks = []
-        self._lock = threading.Lock()
-        self._listener = None
-        self._address = None
-        self._thread = None
-        register_after_fork(self, ResourceSharer._afterfork)
+def _reduce_method(m):
+    if m.__self__ is None:
+        return getattr, (m.__class__, m.__func__.__name__)
+    else:
+        return getattr, (m.__self__, m.__func__.__name__)
+class _C:
+    def f(self):
+        pass
+register(type(_C().f), _reduce_method)
 
-    def register(self, send, close):
-        with self._lock:
-            if self._address is None:
-                self._start()
-            self._key += 1
-            self._cache[self._key] = (send, close)
-            return (self._address, self._key)
 
-    @staticmethod
-    def get_connection(ident):
-        from .connection import Client
-        address, key = ident
-        c = Client(address, authkey=current_process().authkey)
-        c.send((key, os.getpid()))
-        return c
+def _reduce_method_descriptor(m):
+    return getattr, (m.__objclass__, m.__name__)
+register(type(list.append), _reduce_method_descriptor)
+register(type(int.__add__), _reduce_method_descriptor)
 
-    def stop(self, timeout=None):
-        from .connection import Client
-        with self._lock:
-            if self._address is not None:
-                c = Client(self._address, authkey=current_process().authkey)
-                c.send(None)
-                c.close()
-                self._thread.join(timeout)
-                if self._thread.is_alive():
-                    sub_warn('ResourceSharer thread did not stop when asked')
-                self._listener.close()
-                self._thread = None
-                self._address = None
-                self._listener = None
-                for key, (send, close) in self._cache.items():
-                    close()
-                self._cache.clear()
 
-    def _afterfork(self):
-        for key, (send, close) in self._cache.items():
-            close()
-        self._cache.clear()
-        # If self._lock was locked at the time of the fork, it may be broken
-        # -- see issue 6721.  Replace it without letting it be gc'ed.
-        self._old_locks.append(self._lock)
-        self._lock = threading.Lock()
-        if self._listener is not None:
-            self._listener.close()
-        self._listener = None
-        self._address = None
-        self._thread = None
+def _reduce_partial(p):
+    return _rebuild_partial, (p.func, p.args, p.keywords or {})
+def _rebuild_partial(func, args, keywords):
+    return functools.partial(func, *args, **keywords)
+register(functools.partial, _reduce_partial)
 
-    def _start(self):
-        from .connection import Listener
-        assert self._listener is None
-        debug('starting listener and thread for sending handles')
-        self._listener = Listener(authkey=current_process().authkey)
-        self._address = self._listener.address
-        t = threading.Thread(target=self._serve)
-        t.daemon = True
-        t.start()
-        self._thread = t
+#
+# Make sockets picklable
+#
 
-    def _serve(self):
-        if hasattr(signal, 'pthread_sigmask'):
-            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
-        while 1:
-            try:
-                conn = self._listener.accept()
-                msg = conn.recv()
-                if msg is None:
-                    break
-                key, destination_pid = msg
-                send, close = self._cache.pop(key)
-                send(conn, destination_pid)
-                close()
-                conn.close()
-            except:
-                if not is_exiting():
-                    import traceback
-                    sub_warning(
-                        'thread for sharing handles raised exception :\n' +
-                        '-'*79 + '\n' + traceback.format_exc() + '-'*79
-                        )
+if sys.platform == 'win32':
+    def _reduce_socket(s):
+        from .resource_sharer import DupSocket
+        return _rebuild_socket, (DupSocket(s),)
+    def _rebuild_socket(ds):
+        return ds.detach()
+    register(socket.socket, _reduce_socket)
 
-resource_sharer = ResourceSharer()
+else:
+    def _reduce_socket(s):
+        df = DupFd(s.fileno())
+        return _rebuild_socket, (df, s.family, s.type, s.proto)
+    def _rebuild_socket(df, family, type, proto):
+        fd = df.detach()
+        return socket.socket(family, type, proto, fileno=fd)
+    register(socket.socket, _reduce_socket)
diff --git a/Lib/multiprocessing/resource_sharer.py b/Lib/multiprocessing/resource_sharer.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/resource_sharer.py
@@ -0,0 +1,158 @@
+#
+# We use a background thread for sharing fds on Unix, and for sharing sockets on
+# Windows.
+#
+# A client which wants to pickle a resource registers it with the resource
+# sharer and gets an identifier in return.  The unpickling process will connect
+# to the resource sharer, sends the identifier and its pid, and then receives
+# the resource.
+#
+
+import os
+import signal
+import socket
+import sys
+import threading
+
+from . import process
+from . import reduction
+from . import util
+
+__all__ = ['stop']
+
+
+if sys.platform == 'win32':
+    __all__ += ['DupSocket']
+
+    class DupSocket(object):
+        '''Picklable wrapper for a socket.'''
+        def __init__(self, sock):
+            new_sock = sock.dup()
+            def send(conn, pid):
+                share = new_sock.share(pid)
+                conn.send_bytes(share)
+            self._id = _resource_sharer.register(send, new_sock.close)
+
+        def detach(self):
+            '''Get the socket.  This should only be called once.'''
+            with _resource_sharer.get_connection(self._id) as conn:
+                share = conn.recv_bytes()
+                return socket.fromshare(share)
+
+else:
+    __all__ += ['DupFd']
+
+    class DupFd(object):
+        '''Wrapper for fd which can be used at any time.'''
+        def __init__(self, fd):
+            new_fd = os.dup(fd)
+            def send(conn, pid):
+                reduction.send_handle(conn, new_fd, pid)
+            def close():
+                os.close(new_fd)
+            self._id = _resource_sharer.register(send, close)
+
+        def detach(self):
+            '''Get the fd.  This should only be called once.'''
+            with _resource_sharer.get_connection(self._id) as conn:
+                return reduction.recv_handle(conn)
+
+
+class _ResourceSharer(object):
+    '''Manager for resouces using background thread.'''
+    def __init__(self):
+        self._key = 0
+        self._cache = {}
+        self._old_locks = []
+        self._lock = threading.Lock()
+        self._listener = None
+        self._address = None
+        self._thread = None
+        util.register_after_fork(self, _ResourceSharer._afterfork)
+
+    def register(self, send, close):
+        '''Register resource, returning an identifier.'''
+        with self._lock:
+            if self._address is None:
+                self._start()
+            self._key += 1
+            self._cache[self._key] = (send, close)
+            return (self._address, self._key)
+
+    @staticmethod
+    def get_connection(ident):
+        '''Return connection from which to receive identified resource.'''
+        from .connection import Client
+        address, key = ident
+        c = Client(address, authkey=process.current_process().authkey)
+        c.send((key, os.getpid()))
+        return c
+
+    def stop(self, timeout=None):
+        '''Stop the background thread and clear registered resources.'''
+        from .connection import Client
+        with self._lock:
+            if self._address is not None:
+                c = Client(self._address,
+                           authkey=process.current_process().authkey)
+                c.send(None)
+                c.close()
+                self._thread.join(timeout)
+                if self._thread.is_alive():
+                    util.sub_warning('_ResourceSharer thread did '
+                                     'not stop when asked')
+                self._listener.close()
+                self._thread = None
+                self._address = None
+                self._listener = None
+                for key, (send, close) in self._cache.items():
+                    close()
+                self._cache.clear()
+
+    def _afterfork(self):
+        for key, (send, close) in self._cache.items():
+            close()
+        self._cache.clear()
+        # If self._lock was locked at the time of the fork, it may be broken
+        # -- see issue 6721.  Replace it without letting it be gc'ed.
+        self._old_locks.append(self._lock)
+        self._lock = threading.Lock()
+        if self._listener is not None:
+            self._listener.close()
+        self._listener = None
+        self._address = None
+        self._thread = None
+
+    def _start(self):
+        from .connection import Listener
+        assert self._listener is None
+        util.debug('starting listener and thread for sending handles')
+        self._listener = Listener(authkey=process.current_process().authkey)
+        self._address = self._listener.address
+        t = threading.Thread(target=self._serve)
+        t.daemon = True
+        t.start()
+        self._thread = t
+
+    def _serve(self):
+        if hasattr(signal, 'pthread_sigmask'):
+            signal.pthread_sigmask(signal.SIG_BLOCK, range(1, signal.NSIG))
+        while 1:
+            try:
+                with self._listener.accept() as conn:
+                    msg = conn.recv()
+                    if msg is None:
+                        break
+                    key, destination_pid = msg
+                    send, close = self._cache.pop(key)
+                    try:
+                        send(conn, destination_pid)
+                    finally:
+                        close()
+            except:
+                if not util.is_exiting():
+                    sys.excepthook(*sys.exc_info())
+
+
+_resource_sharer = _ResourceSharer()
+stop = _resource_sharer.stop
diff --git a/Lib/multiprocessing/semaphore_tracker.py b/Lib/multiprocessing/semaphore_tracker.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/semaphore_tracker.py
@@ -0,0 +1,135 @@
+#
+# On Unix we run a server process which keeps track of unlinked
+# semaphores. The server ignores SIGINT and SIGTERM and reads from a
+# pipe.  Every other process of the program has a copy of the writable
+# end of the pipe, so we get EOF when all other processes have exited.
+# Then the server process unlinks any remaining semaphore names.
+#
+# This is important because the system only supports a limited number
+# of named semaphores, and they will not be automatically removed till
+# the next reboot.  Without this semaphore tracker process, "killall
+# python" would probably leave unlinked semaphores.
+#
+
+import errno
+import os
+import signal
+import sys
+import threading
+import warnings
+import _multiprocessing
+
+from . import spawn
+from . import util
+from . import current_process
+
+__all__ = ['ensure_running', 'register', 'unregister']
+
+
+_lock = threading.Lock()
+
+
+def ensure_running():
+    '''Make sure that semaphore tracker process is running.
+
+    This can be run from any process.  Usually a child process will use
+    the semaphore created by its parent.'''
+    with _lock:
+        config = current_process()._config
+        if config.get('semaphore_tracker_fd') is not None:
+            return
+        fds_to_pass = []
+        try:
+            fds_to_pass.append(sys.stderr.fileno())
+        except Exception:
+            pass
+        cmd = 'from multiprocessing.semaphore_tracker import main; main(%d)'
+        r, semaphore_tracker_fd = util.pipe()
+        try:
+            fds_to_pass.append(r)
+            # process will out live us, so no need to wait on pid
+            exe = spawn.get_executable()
+            args = [exe] + util._args_from_interpreter_flags()
+            args += ['-c', cmd % r]
+            util.spawnv_passfds(exe, args, fds_to_pass)
+        except:
+            os.close(semaphore_tracker_fd)
+            raise
+        else:
+            config['semaphore_tracker_fd'] = semaphore_tracker_fd
+        finally:
+            os.close(r)
+
+
+def register(name):
+    '''Register name of semaphore with semaphore tracker.'''
+    _send('REGISTER', name)
+
+
+def unregister(name):
+    '''Unregister name of semaphore with semaphore tracker.'''
+    _send('UNREGISTER', name)
+
+
+def _send(cmd, name):
+    msg = '{0}:{1}\n'.format(cmd, name).encode('ascii')
+    if len(name) > 512:
+        # posix guarantees that writes to a pipe of less than PIPE_BUF
+        # bytes are atomic, and that PIPE_BUF >= 512
+        raise ValueError('name too long')
+    fd = current_process()._config['semaphore_tracker_fd']
+    nbytes = os.write(fd, msg)
+    assert nbytes == len(msg)
+
+
+def main(fd):
+    '''Run semaphore tracker.'''
+    # protect the process from ^C and "killall python" etc
+    signal.signal(signal.SIGINT, signal.SIG_IGN)
+    signal.signal(signal.SIGTERM, signal.SIG_IGN)
+
+    for f in (sys.stdin, sys.stdout):
+        try:
+            f.close()
+        except Exception:
+            pass
+
+    cache = set()
+    try:
+        # keep track of registered/unregistered semaphores
+        with open(fd, 'rb') as f:
+            for line in f:
+                try:
+                    cmd, name = line.strip().split(b':')
+                    if cmd == b'REGISTER':
+                        cache.add(name)
+                    elif cmd == b'UNREGISTER':
+                        cache.remove(name)
+                    else:
+                        raise RuntimeError('unrecognized command %r' % cmd)
+                except Exception:
+                    try:
+                        sys.excepthook(*sys.exc_info())
+                    except:
+                        pass
+    finally:
+        # all processes have terminated; cleanup any remaining semaphores
+        if cache:
+            try:
+                warnings.warn('semaphore_tracker: There appear to be %d '
+                              'leaked semaphores to clean up at shutdown' %
+                              len(cache))
+            except Exception:
+                pass
+        for name in cache:
+            # For some reason the process which created and registered this
+            # semaphore has failed to unregister it. Presumably it has died.
+            # We therefore unlink it.
+            try:
+                name = name.decode('ascii')
+                try:
+                    _multiprocessing.sem_unlink(name)
+                except Exception as e:
+                    warnings.warn('semaphore_tracker: %r: %s' % (name, e))
+            finally:
+                pass
diff --git a/Lib/multiprocessing/sharedctypes.py b/Lib/multiprocessing/sharedctypes.py
--- a/Lib/multiprocessing/sharedctypes.py
+++ b/Lib/multiprocessing/sharedctypes.py
@@ -10,8 +10,11 @@
 import ctypes
 import weakref
 
-from multiprocessing import heap, RLock
-from multiprocessing.forking import assert_spawning, ForkingPickler
+from . import heap
+
+from .synchronize import RLock
+from .reduction import ForkingPickler
+from .popen import assert_spawning
 
 __all__ = ['RawValue', 'RawArray', 'Value', 'Array', 'copy', 'synchronized']
 
diff --git a/Lib/multiprocessing/spawn.py b/Lib/multiprocessing/spawn.py
new file mode 100644
--- /dev/null
+++ b/Lib/multiprocessing/spawn.py
@@ -0,0 +1,258 @@
+#
+# Code used to start processes when using the spawn or forkserver
+# start methods.
+#
+# multiprocessing/spawn.py
+#
+# Copyright (c) 2006-2008, R Oudkerk
+# Licensed to PSF under a Contributor Agreement.
+#
+
+import os
+import pickle
+import sys
+
+from . import process
+from . import util
+from . import popen
+
+__all__ = ['_main', 'freeze_support', 'set_executable', 'get_executable',
+           'get_preparation_data', 'get_command_line', 'import_main_path']
+
+#
+# _python_exe is the assumed path to the python executable.
+# People embedding Python want to modify it.
+#
+
+if sys.platform != 'win32':
+    WINEXE = False
+    WINSERVICE = False
+else:
+    WINEXE = (sys.platform == 'win32' and getattr(sys, 'frozen', False))
+    WINSERVICE = sys.executable.lower().endswith("pythonservice.exe")
+
+if WINSERVICE:
+    _python_exe = os.path.join(sys.exec_prefix, 'python.exe')
+else:
+    _python_exe = sys.executable
+
+def set_executable(exe):
+    global _python_exe
+    _python_exe = exe
+
+def get_executable():
+    return _python_exe
+
+#
+#
+#
+
+def is_forking(argv):
+    '''
+    Return whether commandline indicates we are forking
+    '''
+    if len(argv) >= 2 and argv[1] == '--multiprocessing-fork':
+        return True
+    else:
+        return False
+
+
+def freeze_support():
+    '''
+    Run code for process object if this in not the main process
+    '''
+    if is_forking(sys.argv):
+        main()
+        sys.exit()
+
+
+def get_command_line():
+    '''
+    Returns prefix of command line used for spawning a child process
+    '''
+    if getattr(sys, 'frozen', False):
+        return [sys.executable, '--multiprocessing-fork']
+    else:
+        prog = 'from multiprocessing.spawn import spawn_main; spawn_main()'
+        opts = util._args_from_interpreter_flags()
+        return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
+
+
+def spawn_main():
+    '''
+    Run code specifed by data received over pipe
+    '''
+    assert is_forking(sys.argv)
+    handle = int(sys.argv[-1])
+    if sys.platform == 'win32':
+        import msvcrt
+        from .reduction import steal_handle
+        pid = int(sys.argv[-2])
+        new_handle = steal_handle(pid, handle)
+        fd = msvcrt.open_osfhandle(new_handle, os.O_RDONLY)
+    else:
+        fd = handle
+    exitcode = _main(fd)
+    sys.exit(exitcode)
+
+
+def _main(fd):
+    with os.fdopen(fd, 'rb', closefd=True) as from_parent:
+        process.current_process()._inheriting = True
+        try:
+            preparation_data = pickle.load(from_parent)
+            prepare(preparation_data)
+            self = pickle.load(from_parent)
+        finally:
+            del process.current_process()._inheriting
+    return self._bootstrap()
+
+
+def _check_not_importing_main():
+    if getattr(process.current_process(), '_inheriting', False):
+        raise RuntimeError('''
+        An attempt has been made to start a new process before the
+        current process has finished its bootstrapping phase.
+
+        This probably means that you are not using fork to start your
+        child processes and you have forgotten to use the proper idiom
+        in the main module:
+
+            if __name__ == '__main__':
+                freeze_support()
+                ...
+
+        The "freeze_support()" line can be omitted if the program
+        is not going to be frozen to produce an executable.''')
+
+
+def get_preparation_data(name):
+    '''
+    Return info about parent needed by child to unpickle process object
+    '''
+    _check_not_importing_main()
+    d = dict(
+        log_to_stderr=util._log_to_stderr,
+        authkey=process.current_process().authkey,
+        )
+
+    if util._logger is not None:
+        d['log_level'] = util._logger.getEffectiveLevel()
+
+    sys_path=sys.path.copy()
+    try:
+        i = sys_path.index('')
+    except ValueError:
+        pass
+    else:
+        sys_path[i] = process.ORIGINAL_DIR
+
+    d.update(
+        name=name,
+        sys_path=sys_path,
+        sys_argv=sys.argv,
+        orig_dir=process.ORIGINAL_DIR,
+        dir=os.getcwd(),
+        start_method=popen.get_start_method(),
+        )
+
+    if sys.platform != 'win32' or (not WINEXE and not WINSERVICE):
+        main_path = getattr(sys.modules['__main__'], '__file__', None)
+        if not main_path and sys.argv[0] not in ('', '-c'):
+            main_path = sys.argv[0]
+        if main_path is not None:
+            if (not os.path.isabs(main_path) and
+                        process.ORIGINAL_DIR is not None):
+                main_path = os.path.join(process.ORIGINAL_DIR, main_path)
+            d['main_path'] = os.path.normpath(main_path)
+
+    return d
+
+#
+# Prepare current process
+#
+
+old_main_modules = []
+
+def prepare(data):
+    '''
+    Try to get current process ready to unpickle process object
+    '''
+    if 'name' in data:
+        process.current_process().name = data['name']
+
+    if 'authkey' in data:
+        process.current_process().authkey = data['authkey']
+
+    if 'log_to_stderr' in data and data['log_to_stderr']:
+        util.log_to_stderr()
+
+    if 'log_level' in data:
+        util.get_logger().setLevel(data['log_level'])
+
+    if 'sys_path' in data:
+        sys.path = data['sys_path']
+
+    if 'sys_argv' in data:
+        sys.argv = data['sys_argv']
+
+    if 'dir' in data:
+        os.chdir(data['dir'])
+
+    if 'orig_dir' in data:
+        process.ORIGINAL_DIR = data['orig_dir']
+
+    if 'start_method' in data:
+        popen.set_start_method(data['start_method'], start_helpers=False)
+
+    if 'main_path' in data:
+        import_main_path(data['main_path'])
+
+
+def import_main_path(main_path):
+    '''
+    Set sys.modules['__main__'] to module at main_path
+    '''
+    # XXX (ncoghlan): The following code makes several bogus
+    # assumptions regarding the relationship between __file__
+    # and a module's real name. See PEP 302 and issue #10845
+    if getattr(sys.modules['__main__'], '__file__', None) == main_path:
+        return
+
+    main_name = os.path.splitext(os.path.basename(main_path))[0]
+    if main_name == '__init__':
+        main_name = os.path.basename(os.path.dirname(main_path))
+
+    if main_name == '__main__':
+        main_module = sys.modules['__main__']
+        main_module.__file__ = main_path
+    elif main_name != 'ipython':
+        # Main modules not actually called __main__.py may
+        # contain additional code that should still be executed
+        import importlib
+        import types
+
+        if main_path is None:
+            dirs = None
+        elif os.path.basename(main_path).startswith('__init__.py'):
+            dirs = [os.path.dirname(os.path.dirname(main_path))]
+        else:
+            dirs = [os.path.dirname(main_path)]
+
+        assert main_name not in sys.modules, main_name
+        sys.modules.pop('__mp_main__', None)
+        # We should not try to load __main__
+        # since that would execute 'if __name__ == "__main__"'
+        # clauses, potentially causing a psuedo fork bomb.
+        loader = importlib.find_loader(main_name, path=dirs)
+        main_module = types.ModuleType(main_name)
+        try:
+            loader.init_module_attrs(main_module)
+        except AttributeError:  # init_module_attrs is optional
+            pass
+        main_module.__name__ = '__mp_main__'
+        code = loader.get_code(main_name)
+        exec(code, main_module.__dict__)
+
+        old_main_modules.append(sys.modules['__main__'])
+        sys.modules['__main__'] = sys.modules['__mp_main__'] = main_module
diff --git a/Lib/multiprocessing/synchronize.py b/Lib/multiprocessing/synchronize.py
--- a/Lib/multiprocessing/synchronize.py
+++ b/Lib/multiprocessing/synchronize.py
@@ -11,20 +11,24 @@
     'Lock', 'RLock', 'Semaphore', 'BoundedSemaphore', 'Condition', 'Event'
     ]
 
+import os
 import threading
 import sys
+import itertools
+import tempfile
+import _multiprocessing
 
-import _multiprocessing
-from multiprocessing.process import current_process
-from multiprocessing.util import register_after_fork, debug
-from multiprocessing.forking import assert_spawning, Popen
 from time import time as _time
 
+from . import popen
+from . import process
+from . import util
+
 # Try to import the mp.synchronize module cleanly, if it fails
 # raise ImportError for platforms lacking a working sem_open implementation.
 # See issue 3770
 try:
-    from _multiprocessing import SemLock
+    from _multiprocessing import SemLock, sem_unlink
 except (ImportError):
     raise ImportError("This platform lacks a functioning sem_open" +
                       " implementation, therefore, the required" +
@@ -44,15 +48,45 @@
 
 class SemLock(object):
 
+    _rand = tempfile._RandomNameSequence()
+
     def __init__(self, kind, value, maxvalue):
-        sl = self._semlock = _multiprocessing.SemLock(kind, value, maxvalue)
-        debug('created semlock with handle %s' % sl.handle)
+        unlink_immediately = (sys.platform == 'win32' or
+                              popen.get_start_method() == 'fork')
+        for i in range(100):
+            try:
+                sl = self._semlock = _multiprocessing.SemLock(
+                    kind, value, maxvalue, self._make_name(),
+                    unlink_immediately)
+            except FileExistsError:
+                pass
+            else:
+                break
+        else:
+            raise FileExistsError('cannot find name for semaphore')
+
+        util.debug('created semlock with handle %s' % sl.handle)
         self._make_methods()
 
         if sys.platform != 'win32':
             def _after_fork(obj):
                 obj._semlock._after_fork()
-            register_after_fork(self, _after_fork)
+            util.register_after_fork(self, _after_fork)
+
+        if self._semlock.name is not None:
+            # We only get here if we are on Unix with forking
+            # disabled.  When the object is garbage collected or the
+            # process shuts down we unlink the semaphore name
+            from .semaphore_tracker import register
+            register(self._semlock.name)
+            util.Finalize(self, SemLock._cleanup, (self._semlock.name,),
+                          exitpriority=0)
+
+    @staticmethod
+    def _cleanup(name):
+        from .semaphore_tracker import unregister
+        sem_unlink(name)
+        unregister(name)
 
     def _make_methods(self):
         self.acquire = self._semlock.acquire
@@ -65,15 +99,24 @@
         return self._semlock.__exit__(*args)
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         sl = self._semlock
-        return (Popen.duplicate_for_child(sl.handle), sl.kind, sl.maxvalue)
+        if sys.platform == 'win32':
+            h = popen.get_spawning_popen().duplicate_for_child(sl.handle)
+        else:
+            h = sl.handle
+        return (h, sl.kind, sl.maxvalue, sl.name)
 
     def __setstate__(self, state):
         self._semlock = _multiprocessing.SemLock._rebuild(*state)
-        debug('recreated blocker with handle %r' % state[0])
+        util.debug('recreated blocker with handle %r' % state[0])
         self._make_methods()
 
+    @staticmethod
+    def _make_name():
+        return '/%s-%s' % (process.current_process()._config['semprefix'],
+                           next(SemLock._rand))
+
 #
 # Semaphore
 #
@@ -122,7 +165,7 @@
     def __repr__(self):
         try:
             if self._semlock._is_mine():
-                name = current_process().name
+                name = process.current_process().name
                 if threading.current_thread().name != 'MainThread':
                     name += '|' + threading.current_thread().name
             elif self._semlock._get_value() == 1:
@@ -147,7 +190,7 @@
     def __repr__(self):
         try:
             if self._semlock._is_mine():
-                name = current_process().name
+                name = process.current_process().name
                 if threading.current_thread().name != 'MainThread':
                     name += '|' + threading.current_thread().name
                 count = self._semlock._count()
@@ -175,7 +218,7 @@
         self._make_methods()
 
     def __getstate__(self):
-        assert_spawning(self)
+        popen.assert_spawning(self)
         return (self._lock, self._sleeping_count,
                 self._woken_count, self._wait_semaphore)
 
@@ -342,7 +385,7 @@
 
     def __init__(self, parties, action=None, timeout=None):
         import struct
-        from multiprocessing.heap import BufferWrapper
+        from .heap import BufferWrapper
         wrapper = BufferWrapper(struct.calcsize('i') * 2)
         cond = Condition()
         self.__setstate__((parties, action, timeout, cond, wrapper))
diff --git a/Lib/multiprocessing/util.py b/Lib/multiprocessing/util.py
--- a/Lib/multiprocessing/util.py
+++ b/Lib/multiprocessing/util.py
@@ -17,13 +17,13 @@
                         # cleanup function before multiprocessing does
 from subprocess import _args_from_interpreter_flags
 
-from multiprocessing.process import current_process, active_children
+from . import process
 
 __all__ = [
     'sub_debug', 'debug', 'info', 'sub_warning', 'get_logger',
     'log_to_stderr', 'get_temp_dir', 'register_after_fork',
     'is_exiting', 'Finalize', 'ForkAwareThreadLock', 'ForkAwareLocal',
-    'SUBDEBUG', 'SUBWARNING',
+    'close_all_fds_except', 'SUBDEBUG', 'SUBWARNING',
     ]
 
 #
@@ -71,8 +71,6 @@
 
             _logger = logging.getLogger(LOGGER_NAME)
             _logger.propagate = 0
-            logging.addLevelName(SUBDEBUG, 'SUBDEBUG')
-            logging.addLevelName(SUBWARNING, 'SUBWARNING')
 
             # XXX multiprocessing should cleanup before logging
             if hasattr(atexit, 'unregister'):
@@ -111,13 +109,14 @@
 
 def get_temp_dir():
     # get name of a temp directory which will be automatically cleaned up
-    if current_process()._tempdir is None:
+    tempdir = process.current_process()._config.get('tempdir')
+    if tempdir is None:
         import shutil, tempfile
         tempdir = tempfile.mkdtemp(prefix='pymp-')
         info('created temp directory %s', tempdir)
         Finalize(None, shutil.rmtree, args=[tempdir], exitpriority=-100)
-        current_process()._tempdir = tempdir
-    return current_process()._tempdir
+        process.current_process()._config['tempdir'] = tempdir
+    return tempdir
 
 #
 # Support for reinitialization of objects when bootstrapping a child process
@@ -273,8 +272,8 @@
 _exiting = False
 
 def _exit_function(info=info, debug=debug, _run_finalizers=_run_finalizers,
-                   active_children=active_children,
-                   current_process=current_process):
+                   active_children=process.active_children,
+                   current_process=process.current_process):
     # We hold on to references to functions in the arglist due to the
     # situation described below, where this function is called after this
     # module's globals are destroyed.
@@ -303,7 +302,7 @@
             # #9207.
 
             for p in active_children():
-                if p._daemonic:
+                if p.daemon:
                     info('calling terminate() for daemon %s', p.name)
                     p._popen.terminate()
 
@@ -335,3 +334,54 @@
         register_after_fork(self, lambda obj : obj.__dict__.clear())
     def __reduce__(self):
         return type(self), ()
+
+#
+# Close fds except those specified
+#
+
+try:
+    MAXFD = os.sysconf("SC_OPEN_MAX")
+except Exception:
+    MAXFD = 256
+
+def close_all_fds_except(fds):
+    fds = list(fds) + [-1, MAXFD]
+    fds.sort()
+    assert fds[-1] == MAXFD, 'fd too large'
+    for i in range(len(fds) - 1):
+        os.closerange(fds[i]+1, fds[i+1])
+
+#
+# Start a program with only specified fds kept open
+#
+
+def spawnv_passfds(path, args, passfds):
+    import _posixsubprocess, fcntl
+    passfds = sorted(passfds)
+    tmp = []
+    # temporarily unset CLOEXEC on passed fds
+    for fd in passfds:
+        flag = fcntl.fcntl(fd, fcntl.F_GETFD)
+        if flag & fcntl.FD_CLOEXEC:
+            fcntl.fcntl(fd, fcntl.F_SETFD, flag & ~fcntl.FD_CLOEXEC)
+            tmp.append((fd, flag))
+    errpipe_read, errpipe_write = _posixsubprocess.cloexec_pipe()
+    try:
+        return _posixsubprocess.fork_exec(
+            args, [os.fsencode(path)], True, passfds, None, None,
+            -1, -1, -1, -1, -1, -1, errpipe_read, errpipe_write,
+            False, False, None)
+    finally:
+        os.close(errpipe_read)
+        os.close(errpipe_write)
+        # reset CLOEXEC where necessary
+        for fd, flag in tmp:
+            fcntl.fcntl(fd, fcntl.F_SETFD, flag)
+
+#
+# Return pipe with CLOEXEC set on fds
+#
+
+def pipe():
+    import _posixsubprocess
+    return _posixsubprocess.cloexec_pipe()
diff --git a/Lib/test/test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
rename from Lib/test/test_multiprocessing.py
rename to Lib/test/_test_multiprocessing.py
--- a/Lib/test/test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -43,7 +43,7 @@
 
 try:
     from multiprocessing import reduction
-    HAS_REDUCTION = True
+    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
 except ImportError:
     HAS_REDUCTION = False
 
@@ -99,6 +99,9 @@
 except:
     MAXFD = 256
 
+# To speed up tests when using the forkserver, we can preload these:
+PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
+
 #
 # Some tests require ctypes
 #
@@ -330,7 +333,6 @@
 
     @classmethod
     def _test_recursion(cls, wconn, id):
-        from multiprocessing import forking
         wconn.send(id)
         if len(id) < 2:
             for i in range(2):
@@ -378,7 +380,7 @@
         self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
         event.set()
         p.join()
-        self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
+        self.assertTrue(wait_for_handle(sentinel, timeout=1))
 
 #
 #
@@ -2493,7 +2495,7 @@
 
     @classmethod
     def tearDownClass(cls):
-        from multiprocessing.reduction import resource_sharer
+        from multiprocessing import resource_sharer
         resource_sharer.stop(timeout=5)
 
     @classmethod
@@ -2807,30 +2809,40 @@
 # Test that from ... import * works for each module
 #
 
-class _TestImportStar(BaseTestCase):
-
-    ALLOWED_TYPES = ('processes',)
+class _TestImportStar(unittest.TestCase):
+
+    def get_module_names(self):
+        import glob
+        folder = os.path.dirname(multiprocessing.__file__)
+        pattern = os.path.join(folder, '*.py')
+        files = glob.glob(pattern)
+        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
+        modules = ['multiprocessing.' + m for m in modules]
+        modules.remove('multiprocessing.__init__')
+        modules.append('multiprocessing')
+        return modules
 
     def test_import(self):
-        modules = [
-            'multiprocessing', 'multiprocessing.connection',
-            'multiprocessing.heap', 'multiprocessing.managers',
-            'multiprocessing.pool', 'multiprocessing.process',
-            'multiprocessing.synchronize', 'multiprocessing.util'
-            ]
-
-        if HAS_REDUCTION:
-            modules.append('multiprocessing.reduction')
-
-        if c_int is not None:
+        modules = self.get_module_names()
+        if sys.platform == 'win32':
+            modules.remove('multiprocessing.popen_fork')
+            modules.remove('multiprocessing.popen_forkserver')
+            modules.remove('multiprocessing.popen_spawn_posix')
+        else:
+            modules.remove('multiprocessing.popen_spawn_win32')
+            if not HAS_REDUCTION:
+                modules.remove('multiprocessing.popen_forkserver')
+
+        if c_int is None:
             # This module requires _ctypes
-            modules.append('multiprocessing.sharedctypes')
+            modules.remove('multiprocessing.sharedctypes')
 
         for name in modules:
             __import__(name)
             mod = sys.modules[name]
-
-            for attr in getattr(mod, '__all__', ()):
+            self.assertTrue(hasattr(mod, '__all__'), name)
+
+            for attr in mod.__all__:
                 self.assertTrue(
                     hasattr(mod, attr),
                     '%r does not have attribute %r' % (mod, attr)
@@ -2953,131 +2965,6 @@
         self.assertRaises((ValueError, OSError),
                           multiprocessing.connection.Connection, -1)
 
-#
-# Functions used to create test cases from the base ones in this module
-#
-
-def create_test_cases(Mixin, type):
-    result = {}
-    glob = globals()
-    Type = type.capitalize()
-    ALL_TYPES = {'processes', 'threads', 'manager'}
-
-    for name in list(glob.keys()):
-        if name.startswith('_Test'):
-            base = glob[name]
-            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
-            if type in base.ALLOWED_TYPES:
-                newname = 'With' + Type + name[1:]
-                class Temp(base, Mixin, unittest.TestCase):
-                    pass
-                result[newname] = Temp
-                Temp.__name__ = Temp.__qualname__ = newname
-                Temp.__module__ = Mixin.__module__
-    return result
-
-#
-# Create test cases
-#
-
-class ProcessesMixin(object):
-    TYPE = 'processes'
-    Process = multiprocessing.Process
-    connection = multiprocessing.connection
-    current_process = staticmethod(multiprocessing.current_process)
-    active_children = staticmethod(multiprocessing.active_children)
-    Pool = staticmethod(multiprocessing.Pool)
-    Pipe = staticmethod(multiprocessing.Pipe)
-    Queue = staticmethod(multiprocessing.Queue)
-    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
-    Lock = staticmethod(multiprocessing.Lock)
-    RLock = staticmethod(multiprocessing.RLock)
-    Semaphore = staticmethod(multiprocessing.Semaphore)
-    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
-    Condition = staticmethod(multiprocessing.Condition)
-    Event = staticmethod(multiprocessing.Event)
-    Barrier = staticmethod(multiprocessing.Barrier)
-    Value = staticmethod(multiprocessing.Value)
-    Array = staticmethod(multiprocessing.Array)
-    RawValue = staticmethod(multiprocessing.RawValue)
-    RawArray = staticmethod(multiprocessing.RawArray)
-
-testcases_processes = create_test_cases(ProcessesMixin, type='processes')
-globals().update(testcases_processes)
-
-
-class ManagerMixin(object):
-    TYPE = 'manager'
-    Process = multiprocessing.Process
-    Queue = property(operator.attrgetter('manager.Queue'))
-    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
-    Lock = property(operator.attrgetter('manager.Lock'))
-    RLock = property(operator.attrgetter('manager.RLock'))
-    Semaphore = property(operator.attrgetter('manager.Semaphore'))
-    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
-    Condition = property(operator.attrgetter('manager.Condition'))
-    Event = property(operator.attrgetter('manager.Event'))
-    Barrier = property(operator.attrgetter('manager.Barrier'))
-    Value = property(operator.attrgetter('manager.Value'))
-    Array = property(operator.attrgetter('manager.Array'))
-    list = property(operator.attrgetter('manager.list'))
-    dict = property(operator.attrgetter('manager.dict'))
-    Namespace = property(operator.attrgetter('manager.Namespace'))
-
-    @classmethod
-    def Pool(cls, *args, **kwds):
-        return cls.manager.Pool(*args, **kwds)
-
-    @classmethod
-    def setUpClass(cls):
-        cls.manager = multiprocessing.Manager()
-
-    @classmethod
-    def tearDownClass(cls):
-        # only the manager process should be returned by active_children()
-        # but this can take a bit on slow machines, so wait a few seconds
-        # if there are other children too (see #17395)
-        t = 0.01
-        while len(multiprocessing.active_children()) > 1 and t < 5:
-            time.sleep(t)
-            t *= 2
-        gc.collect()                       # do garbage collection
-        if cls.manager._number_of_objects() != 0:
-            # This is not really an error since some tests do not
-            # ensure that all processes which hold a reference to a
-            # managed object have been joined.
-            print('Shared objects which still exist at manager shutdown:')
-            print(cls.manager._debug_info())
-        cls.manager.shutdown()
-        cls.manager.join()
-        cls.manager = None
-
-testcases_manager = create_test_cases(ManagerMixin, type='manager')
-globals().update(testcases_manager)
-
-
-class ThreadsMixin(object):
-    TYPE = 'threads'
-    Process = multiprocessing.dummy.Process
-    connection = multiprocessing.dummy.connection
-    current_process = staticmethod(multiprocessing.dummy.current_process)
-    active_children = staticmethod(multiprocessing.dummy.active_children)
-    Pool = staticmethod(multiprocessing.Pool)
-    Pipe = staticmethod(multiprocessing.dummy.Pipe)
-    Queue = staticmethod(multiprocessing.dummy.Queue)
-    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
-    Lock = staticmethod(multiprocessing.dummy.Lock)
-    RLock = staticmethod(multiprocessing.dummy.RLock)
-    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
-    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
-    Condition = staticmethod(multiprocessing.dummy.Condition)
-    Event = staticmethod(multiprocessing.dummy.Event)
-    Barrier = staticmethod(multiprocessing.dummy.Barrier)
-    Value = staticmethod(multiprocessing.dummy.Value)
-    Array = staticmethod(multiprocessing.dummy.Array)
-
-testcases_threads = create_test_cases(ThreadsMixin, type='threads')
-globals().update(testcases_threads)
 
 
 class OtherTest(unittest.TestCase):
@@ -3427,7 +3314,7 @@
     def test_flags(self):
         import json, subprocess
         # start child process using unusual flags
-        prog = ('from test.test_multiprocessing import TestFlags; ' +
+        prog = ('from test._test_multiprocessing import TestFlags; ' +
                 'TestFlags.run_in_child()')
         data = subprocess.check_output(
             [sys.executable, '-E', '-S', '-O', '-c', prog])
@@ -3474,13 +3361,14 @@
 
 class TestNoForkBomb(unittest.TestCase):
     def test_noforkbomb(self):
+        sm = multiprocessing.get_start_method()
         name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
-        if WIN32:
-            rc, out, err = test.script_helper.assert_python_failure(name)
+        if sm != 'fork':
+            rc, out, err = test.script_helper.assert_python_failure(name, sm)
             self.assertEqual('', out.decode('ascii'))
             self.assertIn('RuntimeError', err.decode('ascii'))
         else:
-            rc, out, err = test.script_helper.assert_python_ok(name)
+            rc, out, err = test.script_helper.assert_python_ok(name, sm)
             self.assertEqual('123', out.decode('ascii').rstrip())
             self.assertEqual('', err.decode('ascii'))
 
@@ -3514,6 +3402,72 @@
         self.assertLessEqual(new_size, old_size)
 
 #
+# Check that non-forked child processes do not inherit unneeded fds/handles
+#
+
+class TestCloseFds(unittest.TestCase):
+
+    def get_high_socket_fd(self):
+        if WIN32:
+            # The child process will not have any socket handles, so
+            # calling socket.fromfd() should produce WSAENOTSOCK even
+            # if there is a handle of the same number.
+            return socket.socket().detach()
+        else:
+            # We want to produce a socket with an fd high enough that a
+            # freshly created child process will not have any fds as high.
+            fd = socket.socket().detach()
+            to_close = []
+            while fd < 50:
+                to_close.append(fd)
+                fd = os.dup(fd)
+            for x in to_close:
+                os.close(x)
+            return fd
+
+    def close(self, fd):
+        if WIN32:
+            socket.socket(fileno=fd).close()
+        else:
+            os.close(fd)
+
+    @classmethod
+    def _test_closefds(cls, conn, fd):
+        try:
+            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
+        except Exception as e:
+            conn.send(e)
+        else:
+            s.close()
+            conn.send(None)
+
+    def test_closefd(self):
+        if not HAS_REDUCTION:
+            raise unittest.SkipTest('requires fd pickling')
+
+        reader, writer = multiprocessing.Pipe()
+        fd = self.get_high_socket_fd()
+        try:
+            p = multiprocessing.Process(target=self._test_closefds,
+                                        args=(writer, fd))
+            p.start()
+            writer.close()
+            e = reader.recv()
+            p.join(timeout=5)
+        finally:
+            self.close(fd)
+            writer.close()
+            reader.close()
+
+        if multiprocessing.get_start_method() == 'fork':
+            self.assertIs(e, None)
+        else:
+            WSAENOTSOCK = 10038
+            self.assertIsInstance(e, OSError)
+            self.assertTrue(e.errno == errno.EBADF or
+                            e.winerror == WSAENOTSOCK, e)
+
+#
 # Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
 #
 
@@ -3557,10 +3511,10 @@
         def handler(signum, frame):
             pass
         signal.signal(signal.SIGUSR1, handler)
-        l = multiprocessing.connection.Listener()
-        conn.send(l.address)
-        a = l.accept()
-        a.send('welcome')
+        with multiprocessing.connection.Listener() as l:
+            conn.send(l.address)
+            a = l.accept()
+            a.send('welcome')
 
     @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
     def test_ignore_listener(self):
@@ -3581,26 +3535,221 @@
         finally:
             conn.close()
 
+class TestStartMethod(unittest.TestCase):
+    def test_set_get(self):
+        multiprocessing.set_forkserver_preload(PRELOAD)
+        count = 0
+        old_method = multiprocessing.get_start_method()
+        try:
+            for method in ('fork', 'spawn', 'forkserver'):
+                try:
+                    multiprocessing.set_start_method(method)
+                except ValueError:
+                    continue
+                self.assertEqual(multiprocessing.get_start_method(), method)
+                count += 1
+        finally:
+            multiprocessing.set_start_method(old_method)
+        self.assertGreaterEqual(count, 1)
+
+    def test_get_all(self):
+        methods = multiprocessing.get_all_start_methods()
+        if sys.platform == 'win32':
+            self.assertEqual(methods, ['spawn'])
+        else:
+            self.assertTrue(methods == ['fork', 'spawn'] or
+                            methods == ['fork', 'spawn', 'forkserver'])
+
 #
+# Check that killing process does not leak named semaphores
 #
+
+ at unittest.skipIf(sys.platform == "win32",
+                 "test semantics don't make sense on Windows")
+class TestSemaphoreTracker(unittest.TestCase):
+    def test_semaphore_tracker(self):
+        import subprocess
+        cmd = '''if 1:
+            import multiprocessing as mp, time, os
+            mp.set_start_method("spawn")
+            lock1 = mp.Lock()
+            lock2 = mp.Lock()
+            os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
+            os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
+            time.sleep(10)
+        '''
+        print("\nTestSemaphoreTracker will output warnings a bit like:\n"
+              "    ... There appear to be 2 leaked semaphores"
+                  " to clean up at shutdown\n"
+              "    ... '/mp-03jgqz': [Errno 2] No such file or directory",
+              file=sys.stderr)
+        r, w = os.pipe()
+        p = subprocess.Popen([sys.executable,
+                             #'-W', 'ignore:semaphore_tracker',
+                             '-c', cmd % (w, w)],
+                             pass_fds=[w])
+        os.close(w)
+        with open(r, 'rb', closefd=True) as f:
+            name1 = f.readline().rstrip().decode('ascii')
+            name2 = f.readline().rstrip().decode('ascii')
+        _multiprocessing.sem_unlink(name1)
+        p.terminate()
+        p.wait()
+        time.sleep(1.0)
+        with self.assertRaises(OSError) as ctx:
+            _multiprocessing.sem_unlink(name2)
+        # docs say it should be ENOENT, but OSX seems to give EINVAL
+        self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
+
 #
-
-def setUpModule():
-    if sys.platform.startswith("linux"):
+# Mixins
+#
+
+class ProcessesMixin(object):
+    TYPE = 'processes'
+    Process = multiprocessing.Process
+    connection = multiprocessing.connection
+    current_process = staticmethod(multiprocessing.current_process)
+    active_children = staticmethod(multiprocessing.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.Pipe)
+    Queue = staticmethod(multiprocessing.Queue)
+    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
+    Lock = staticmethod(multiprocessing.Lock)
+    RLock = staticmethod(multiprocessing.RLock)
+    Semaphore = staticmethod(multiprocessing.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.Condition)
+    Event = staticmethod(multiprocessing.Event)
+    Barrier = staticmethod(multiprocessing.Barrier)
+    Value = staticmethod(multiprocessing.Value)
+    Array = staticmethod(multiprocessing.Array)
+    RawValue = staticmethod(multiprocessing.RawValue)
+    RawArray = staticmethod(multiprocessing.RawArray)
+
+
+class ManagerMixin(object):
+    TYPE = 'manager'
+    Process = multiprocessing.Process
+    Queue = property(operator.attrgetter('manager.Queue'))
+    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
+    Lock = property(operator.attrgetter('manager.Lock'))
+    RLock = property(operator.attrgetter('manager.RLock'))
+    Semaphore = property(operator.attrgetter('manager.Semaphore'))
+    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
+    Condition = property(operator.attrgetter('manager.Condition'))
+    Event = property(operator.attrgetter('manager.Event'))
+    Barrier = property(operator.attrgetter('manager.Barrier'))
+    Value = property(operator.attrgetter('manager.Value'))
+    Array = property(operator.attrgetter('manager.Array'))
+    list = property(operator.attrgetter('manager.list'))
+    dict = property(operator.attrgetter('manager.dict'))
+    Namespace = property(operator.attrgetter('manager.Namespace'))
+
+    @classmethod
+    def Pool(cls, *args, **kwds):
+        return cls.manager.Pool(*args, **kwds)
+
+    @classmethod
+    def setUpClass(cls):
+        cls.manager = multiprocessing.Manager()
+
+    @classmethod
+    def tearDownClass(cls):
+        # only the manager process should be returned by active_children()
+        # but this can take a bit on slow machines, so wait a few seconds
+        # if there are other children too (see #17395)
+        t = 0.01
+        while len(multiprocessing.active_children()) > 1 and t < 5:
+            time.sleep(t)
+            t *= 2
+        gc.collect()                       # do garbage collection
+        if cls.manager._number_of_objects() != 0:
+            # This is not really an error since some tests do not
+            # ensure that all processes which hold a reference to a
+            # managed object have been joined.
+            print('Shared objects which still exist at manager shutdown:')
+            print(cls.manager._debug_info())
+        cls.manager.shutdown()
+        cls.manager.join()
+        cls.manager = None
+
+
+class ThreadsMixin(object):
+    TYPE = 'threads'
+    Process = multiprocessing.dummy.Process
+    connection = multiprocessing.dummy.connection
+    current_process = staticmethod(multiprocessing.dummy.current_process)
+    active_children = staticmethod(multiprocessing.dummy.active_children)
+    Pool = staticmethod(multiprocessing.Pool)
+    Pipe = staticmethod(multiprocessing.dummy.Pipe)
+    Queue = staticmethod(multiprocessing.dummy.Queue)
+    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
+    Lock = staticmethod(multiprocessing.dummy.Lock)
+    RLock = staticmethod(multiprocessing.dummy.RLock)
+    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
+    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
+    Condition = staticmethod(multiprocessing.dummy.Condition)
+    Event = staticmethod(multiprocessing.dummy.Event)
+    Barrier = staticmethod(multiprocessing.dummy.Barrier)
+    Value = staticmethod(multiprocessing.dummy.Value)
+    Array = staticmethod(multiprocessing.dummy.Array)
+
+#
+# Functions used to create test cases from the base ones in this module
+#
+
+def install_tests_in_module_dict(remote_globs, start_method):
+    __module__ = remote_globs['__name__']
+    local_globs = globals()
+    ALL_TYPES = {'processes', 'threads', 'manager'}
+
+    for name, base in local_globs.items():
+        if not isinstance(base, type):
+            continue
+        if issubclass(base, BaseTestCase):
+            if base is BaseTestCase:
+                continue
+            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
+            for type_ in base.ALLOWED_TYPES:
+                newname = 'With' + type_.capitalize() + name[1:]
+                Mixin = local_globs[type_.capitalize() + 'Mixin']
+                class Temp(base, Mixin, unittest.TestCase):
+                    pass
+                Temp.__name__ = Temp.__qualname__ = newname
+                Temp.__module__ = __module__
+                remote_globs[newname] = Temp
+        elif issubclass(base, unittest.TestCase):
+            class Temp(base, object):
+                pass
+            Temp.__name__ = Temp.__qualname__ = name
+            Temp.__module__ = __module__
+            remote_globs[name] = Temp
+
+    def setUpModule():
+        multiprocessing.set_forkserver_preload(PRELOAD)
+        remote_globs['old_start_method'] = multiprocessing.get_start_method()
         try:
-            lock = multiprocessing.RLock()
-        except OSError:
-            raise unittest.SkipTest("OSError raises on RLock creation, "
-                                    "see issue 3111!")
-    check_enough_semaphores()
-    util.get_temp_dir()     # creates temp directory for use by all processes
-    multiprocessing.get_logger().setLevel(LOG_LEVEL)
-
-
-def tearDownModule():
-    # pause a bit so we don't get warning about dangling threads/processes
-    time.sleep(0.5)
-
-
-if __name__ == '__main__':
-    unittest.main()
+            multiprocessing.set_start_method(start_method)
+        except ValueError:
+            raise unittest.SkipTest(start_method +
+                                    ' start method not supported')
+        print('Using start method %r' % multiprocessing.get_start_method())
+
+        if sys.platform.startswith("linux"):
+            try:
+                lock = multiprocessing.RLock()
+            except OSError:
+                raise unittest.SkipTest("OSError raises on RLock creation, "
+                                        "see issue 3111!")
+        check_enough_semaphores()
+        util.get_temp_dir()     # creates temp directory
+        multiprocessing.get_logger().setLevel(LOG_LEVEL)
+
+    def tearDownModule():
+        multiprocessing.set_start_method(remote_globs['old_start_method'])
+        # pause a bit so we don't get warning about dangling threads/processes
+        time.sleep(0.5)
+
+    remote_globs['setUpModule'] = setUpModule
+    remote_globs['tearDownModule'] = tearDownModule
diff --git a/Lib/test/mp_fork_bomb.py b/Lib/test/mp_fork_bomb.py
--- a/Lib/test/mp_fork_bomb.py
+++ b/Lib/test/mp_fork_bomb.py
@@ -7,6 +7,11 @@
 # correctly on Windows.  However, we should get a RuntimeError rather
 # than the Windows equivalent of a fork bomb.
 
+if len(sys.argv) > 1:
+    multiprocessing.set_start_method(sys.argv[1])
+else:
+    multiprocessing.set_start_method('spawn')
+
 p = multiprocessing.Process(target=foo)
 p.start()
 p.join()
diff --git a/Lib/test/regrtest.py b/Lib/test/regrtest.py
--- a/Lib/test/regrtest.py
+++ b/Lib/test/regrtest.py
@@ -149,7 +149,7 @@
 except ImportError:
     threading = None
 try:
-    import multiprocessing.process
+    import _multiprocessing, multiprocessing.process
 except ImportError:
     multiprocessing = None
 
diff --git a/Lib/test/test_multiprocessing_fork.py b/Lib/test/test_multiprocessing_fork.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_fork.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'fork')
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test_multiprocessing_forkserver.py b/Lib/test/test_multiprocessing_forkserver.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_forkserver.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'forkserver')
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Lib/test/test_multiprocessing_spawn.py b/Lib/test/test_multiprocessing_spawn.py
new file mode 100644
--- /dev/null
+++ b/Lib/test/test_multiprocessing_spawn.py
@@ -0,0 +1,7 @@
+import unittest
+import test._test_multiprocessing
+
+test._test_multiprocessing.install_tests_in_module_dict(globals(), 'spawn')
+
+if __name__ == '__main__':
+    unittest.main()
diff --git a/Makefile.pre.in b/Makefile.pre.in
--- a/Makefile.pre.in
+++ b/Makefile.pre.in
@@ -938,7 +938,9 @@
 
 QUICKTESTOPTS=	$(TESTOPTS) -x test_subprocess test_io test_lib2to3 \
 		test_multibytecodec test_urllib2_localnet test_itertools \
-		test_multiprocessing test_mailbox test_socket test_poll \
+		test_multiprocessing_fork test_multiprocessing_spawn \
+		test_multiprocessing_forkserver \
+		test_mailbox test_socket test_poll \
 		test_select test_zipfile test_concurrent_futures
 quicktest:	all platform
 		$(TESTRUNNER) $(QUICKTESTOPTS)
diff --git a/Modules/_multiprocessing/multiprocessing.c b/Modules/_multiprocessing/multiprocessing.c
--- a/Modules/_multiprocessing/multiprocessing.c
+++ b/Modules/_multiprocessing/multiprocessing.c
@@ -126,6 +126,7 @@
     {"recv", multiprocessing_recv, METH_VARARGS, ""},
     {"send", multiprocessing_send, METH_VARARGS, ""},
 #endif
+    {"sem_unlink", _PyMp_sem_unlink, METH_VARARGS, ""},
     {NULL}
 };
 
diff --git a/Modules/_multiprocessing/multiprocessing.h b/Modules/_multiprocessing/multiprocessing.h
--- a/Modules/_multiprocessing/multiprocessing.h
+++ b/Modules/_multiprocessing/multiprocessing.h
@@ -98,5 +98,6 @@
  */
 
 extern PyTypeObject _PyMp_SemLockType;
+extern PyObject *_PyMp_sem_unlink(PyObject *ignore, PyObject *args);
 
 #endif /* MULTIPROCESSING_H */
diff --git a/Modules/_multiprocessing/semaphore.c b/Modules/_multiprocessing/semaphore.c
--- a/Modules/_multiprocessing/semaphore.c
+++ b/Modules/_multiprocessing/semaphore.c
@@ -18,6 +18,7 @@
     int count;
     int maxvalue;
     int kind;
+    char *name;
 } SemLockObject;
 
 #define ISMINE(o) (o->count > 0 && PyThread_get_thread_ident() == o->last_tid)
@@ -397,7 +398,8 @@
  */
 
 static PyObject *
-newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue)
+newsemlockobject(PyTypeObject *type, SEM_HANDLE handle, int kind, int maxvalue,
+                 char *name)
 {
     SemLockObject *self;
 
@@ -409,21 +411,22 @@
     self->count = 0;
     self->last_tid = 0;
     self->maxvalue = maxvalue;
+    self->name = name;
     return (PyObject*)self;
 }
 
 static PyObject *
 semlock_new(PyTypeObject *type, PyObject *args, PyObject *kwds)
 {
-    char buffer[256];
     SEM_HANDLE handle = SEM_FAILED;
-    int kind, maxvalue, value;
+    int kind, maxvalue, value, unlink;
     PyObject *result;
-    static char *kwlist[] = {"kind", "value", "maxvalue", NULL};
-    static int counter = 0;
+    char *name, *name_copy = NULL;
+    static char *kwlist[] = {"kind", "value", "maxvalue", "name", "unlink",
+                             NULL};
 
-    if (!PyArg_ParseTupleAndKeywords(args, kwds, "iii", kwlist,
-                                     &kind, &value, &maxvalue))
+    if (!PyArg_ParseTupleAndKeywords(args, kwds, "iiisi", kwlist,
+                                     &kind, &value, &maxvalue, &name, &unlink))
         return NULL;
 
     if (kind != RECURSIVE_MUTEX && kind != SEMAPHORE) {
@@ -431,18 +434,23 @@
         return NULL;
     }
 
-    PyOS_snprintf(buffer, sizeof(buffer), "/mp%ld-%d", (long)getpid(), counter++);
+    if (!unlink) {
+        name_copy = PyMem_Malloc(strlen(name) + 1);
+        if (name_copy == NULL)
+            goto failure;
+        strcpy(name_copy, name);
+    }
 
     SEM_CLEAR_ERROR();
-    handle = SEM_CREATE(buffer, value, maxvalue);
+    handle = SEM_CREATE(name, value, maxvalue);
     /* On Windows we should fail if GetLastError()==ERROR_ALREADY_EXISTS */
     if (handle == SEM_FAILED || SEM_GET_LAST_ERROR() != 0)
         goto failure;
 
-    if (SEM_UNLINK(buffer) < 0)
+    if (unlink && SEM_UNLINK(name) < 0)
         goto failure;
 
-    result = newsemlockobject(type, handle, kind, maxvalue);
+    result = newsemlockobject(type, handle, kind, maxvalue, name_copy);
     if (!result)
         goto failure;
 
@@ -451,6 +459,7 @@
   failure:
     if (handle != SEM_FAILED)
         SEM_CLOSE(handle);
+    PyMem_Free(name_copy);
     _PyMp_SetError(NULL, MP_STANDARD_ERROR);
     return NULL;
 }
@@ -460,12 +469,30 @@
 {
     SEM_HANDLE handle;
     int kind, maxvalue;
+    char *name, *name_copy = NULL;
 
-    if (!PyArg_ParseTuple(args, F_SEM_HANDLE "ii",
-                          &handle, &kind, &maxvalue))
+    if (!PyArg_ParseTuple(args, F_SEM_HANDLE "iiz",
+                          &handle, &kind, &maxvalue, &name))
         return NULL;
 
-    return newsemlockobject(type, handle, kind, maxvalue);
+    if (name != NULL) {
+        name_copy = PyMem_Malloc(strlen(name) + 1);
+        if (name_copy == NULL)
+            return PyErr_NoMemory();
+        strcpy(name_copy, name);
+    }
+
+#ifndef MS_WINDOWS
+    if (name != NULL) {
+        handle = sem_open(name, 0);
+        if (handle == SEM_FAILED) {
+            PyMem_Free(name_copy);
+            return PyErr_SetFromErrno(PyExc_OSError);
+        }
+    }
+#endif
+
+    return newsemlockobject(type, handle, kind, maxvalue, name_copy);
 }
 
 static void
@@ -473,6 +500,7 @@
 {
     if (self->handle != SEM_FAILED)
         SEM_CLOSE(self->handle);
+    PyMem_Free(self->name);
     PyObject_Del(self);
 }
 
@@ -574,6 +602,8 @@
      ""},
     {"maxvalue", T_INT, offsetof(SemLockObject, maxvalue), READONLY,
      ""},
+    {"name", T_STRING, offsetof(SemLockObject, name), READONLY,
+     ""},
     {NULL}
 };
 
@@ -621,3 +651,23 @@
     /* tp_alloc          */ 0,
     /* tp_new            */ semlock_new,
 };
+
+/*
+ * Function to unlink semaphore names
+ */
+
+PyObject *
+_PyMp_sem_unlink(PyObject *ignore, PyObject *args)
+{
+    char *name;
+
+    if (!PyArg_ParseTuple(args, "s", &name))
+        return NULL;
+
+    if (SEM_UNLINK(name) < 0) {
+        _PyMp_SetError(NULL, MP_STANDARD_ERROR);
+        return NULL;
+    }
+
+    Py_RETURN_NONE;
+}

-- 
Repository URL: http://hg.python.org/cpython


More information about the Python-checkins mailing list