[Jython-checkins] jython: Refresh multiprocessing from CPython-2.7.11

darjus.loktevic jython-checkins at python.org
Sun Feb 28 21:52:59 EST 2016


https://hg.python.org/jython/rev/d5594657fdba
changeset:   7913:d5594657fdba
user:        Darjus Loktevic <darjus at gmail.com>
date:        Mon Feb 29 13:52:45 2016 +1100
summary:
  Refresh multiprocessing from CPython-2.7.11

files:
  lib-python/2.7/multiprocessing/connection.py     |  32 +++++--
  lib-python/2.7/multiprocessing/dummy/__init__.py |   2 +-
  lib-python/2.7/multiprocessing/forking.py        |  21 ++++-
  lib-python/2.7/multiprocessing/managers.py       |   1 +
  lib-python/2.7/multiprocessing/pool.py           |  43 ++++++---
  lib-python/2.7/multiprocessing/process.py        |   2 +-
  lib-python/2.7/multiprocessing/queues.py         |  20 ++--
  lib-python/2.7/multiprocessing/sharedctypes.py   |   7 +-
  lib-python/2.7/multiprocessing/synchronize.py    |   2 +-
  lib-python/2.7/multiprocessing/util.py           |  18 +++-
  10 files changed, 104 insertions(+), 44 deletions(-)


diff --git a/lib-python/2.7/multiprocessing/connection.py b/lib-python/2.7/multiprocessing/connection.py
--- a/lib-python/2.7/multiprocessing/connection.py
+++ b/lib-python/2.7/multiprocessing/connection.py
@@ -90,7 +90,7 @@
         return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
     elif family == 'AF_PIPE':
         return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
-                               (os.getpid(), _mmap_counter.next()))
+                               (os.getpid(), _mmap_counter.next()), dir="")
     else:
         raise ValueError('unrecognized family')
 
@@ -270,7 +270,14 @@
             self._unlink = None
 
     def accept(self):
-        s, self._last_accepted = self._socket.accept()
+        while True:
+            try:
+                s, self._last_accepted = self._socket.accept()
+            except socket.error as e:
+                if e.args[0] != errno.EINTR:
+                    raise
+            else:
+                break
         s.setblocking(True)
         fd = duplicate(s.fileno())
         conn = _multiprocessing.Connection(fd)
@@ -278,24 +285,29 @@
         return conn
 
     def close(self):
-        self._socket.close()
-        if self._unlink is not None:
-            self._unlink()
+        try:
+            self._socket.close()
+        finally:
+            unlink = self._unlink
+            if unlink is not None:
+                self._unlink = None
+                unlink()
 
 
 def SocketClient(address):
     '''
     Return a connection object connected to the socket given by `address`
     '''
-    family = address_type(address)
-    s = socket.socket( getattr(socket, family) )
-    s.setblocking(True)
+    family = getattr(socket, address_type(address))
     t = _init_timeout()
 
     while 1:
+        s = socket.socket(family)
+        s.setblocking(True)
         try:
             s.connect(address)
         except socket.error, e:
+            s.close()
             if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
                 debug('failed to connect to address %s', address)
                 raise
@@ -446,10 +458,10 @@
         return self._loads(s)
 
 def _xml_dumps(obj):
-    return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+    return xmlrpclib.dumps((obj,), None, None, None, 1)
 
 def _xml_loads(s):
-    (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+    (obj,), method = xmlrpclib.loads(s)
     return obj
 
 class XmlListener(Listener):
diff --git a/lib-python/2.7/multiprocessing/dummy/__init__.py b/lib-python/2.7/multiprocessing/dummy/__init__.py
--- a/lib-python/2.7/multiprocessing/dummy/__init__.py
+++ b/lib-python/2.7/multiprocessing/dummy/__init__.py
@@ -138,7 +138,7 @@
         self._value = value
     value = property(_get, _set)
     def __repr__(self):
-        return '<%r(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
+        return '<%s(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
 
 def Manager():
     return sys.modules[__name__]
diff --git a/lib-python/2.7/multiprocessing/forking.py b/lib-python/2.7/multiprocessing/forking.py
--- a/lib-python/2.7/multiprocessing/forking.py
+++ b/lib-python/2.7/multiprocessing/forking.py
@@ -361,12 +361,13 @@
             return [sys.executable, '--multiprocessing-fork']
         else:
             prog = 'from multiprocessing.forking import main; main()'
-            return [_python_exe, '-c', prog, '--multiprocessing-fork']
+            opts = util._args_from_interpreter_flags()
+            return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
 
 
     def main():
         '''
-        Run code specifed by data received over pipe
+        Run code specified by data received over pipe
         '''
         assert is_forking(sys.argv)
 
@@ -469,12 +470,26 @@
         process.ORIGINAL_DIR = data['orig_dir']
 
     if 'main_path' in data:
+        # XXX (ncoghlan): The following code makes several bogus
+        # assumptions regarding the relationship between __file__
+        # and a module's real name. See PEP 302 and issue #10845
+        # The problem is resolved properly in Python 3.4+, as
+        # described in issue #19946
+
         main_path = data['main_path']
         main_name = os.path.splitext(os.path.basename(main_path))[0]
         if main_name == '__init__':
             main_name = os.path.basename(os.path.dirname(main_path))
 
-        if main_name != 'ipython':
+        if main_name == '__main__':
+            # For directory and zipfile execution, we assume an implicit
+            # "if __name__ == '__main__':" around the module, and don't
+            # rerun the main module code in spawned processes
+            main_module = sys.modules['__main__']
+            main_module.__file__ = main_path
+        elif main_name != 'ipython':
+            # Main modules not actually called __main__.py may
+            # contain additional code that should still be executed
             import imp
 
             if main_path is None:
diff --git a/lib-python/2.7/multiprocessing/managers.py b/lib-python/2.7/multiprocessing/managers.py
--- a/lib-python/2.7/multiprocessing/managers.py
+++ b/lib-python/2.7/multiprocessing/managers.py
@@ -763,6 +763,7 @@
         elif kind == '#PROXY':
             exposed, token = result
             proxytype = self._manager._registry[token.typeid][-1]
+            token.address = self._token.address
             proxy = proxytype(
                 token, self._serializer, manager=self._manager,
                 authkey=self._authkey, exposed=exposed
diff --git a/lib-python/2.7/multiprocessing/pool.py b/lib-python/2.7/multiprocessing/pool.py
--- a/lib-python/2.7/multiprocessing/pool.py
+++ b/lib-python/2.7/multiprocessing/pool.py
@@ -169,7 +169,8 @@
 
         self._task_handler = threading.Thread(
             target=Pool._handle_tasks,
-            args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+            args=(self._taskqueue, self._quick_put, self._outqueue,
+                  self._pool, self._cache)
             )
         self._task_handler.daemon = True
         self._task_handler._state = RUN
@@ -329,26 +330,38 @@
         debug('worker handler exiting')
 
     @staticmethod
-    def _handle_tasks(taskqueue, put, outqueue, pool):
+    def _handle_tasks(taskqueue, put, outqueue, pool, cache):
         thread = threading.current_thread()
 
         for taskseq, set_length in iter(taskqueue.get, None):
+            task = None
             i = -1
-            for i, task in enumerate(taskseq):
-                if thread._state:
-                    debug('task handler found thread._state != RUN')
-                    break
-                try:
-                    put(task)
-                except IOError:
-                    debug('could not put task on queue')
-                    break
-            else:
+            try:
+                for i, task in enumerate(taskseq):
+                    if thread._state:
+                        debug('task handler found thread._state != RUN')
+                        break
+                    try:
+                        put(task)
+                    except Exception as e:
+                        job, ind = task[:2]
+                        try:
+                            cache[job]._set(ind, (False, e))
+                        except KeyError:
+                            pass
+                else:
+                    if set_length:
+                        debug('doing set_length()')
+                        set_length(i+1)
+                    continue
+                break
+            except Exception as ex:
+                job, ind = task[:2] if task else (0, 0)
+                if job in cache:
+                    cache[job]._set(ind + 1, (False, ex))
                 if set_length:
                     debug('doing set_length()')
                     set_length(i+1)
-                continue
-            break
         else:
             debug('task handler got sentinel')
 
@@ -565,6 +578,8 @@
             self._cond.release()
         del self._cache[self._job]
 
+AsyncResult = ApplyResult       # create alias -- see #17805
+
 #
 # Class whose instances are returned by `Pool.map_async()`
 #
diff --git a/lib-python/2.7/multiprocessing/process.py b/lib-python/2.7/multiprocessing/process.py
--- a/lib-python/2.7/multiprocessing/process.py
+++ b/lib-python/2.7/multiprocessing/process.py
@@ -267,7 +267,7 @@
             else:
                 sys.stderr.write(str(e.args[0]) + '\n')
                 sys.stderr.flush()
-                exitcode = 0 if isinstance(e.args[0], str) else 1
+                exitcode = 1
         except:
             exitcode = 1
             import traceback
diff --git a/lib-python/2.7/multiprocessing/queues.py b/lib-python/2.7/multiprocessing/queues.py
--- a/lib-python/2.7/multiprocessing/queues.py
+++ b/lib-python/2.7/multiprocessing/queues.py
@@ -44,10 +44,10 @@
 
 from Queue import Empty, Full
 import _multiprocessing
-from multiprocessing import Pipe
-from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
-from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from . import Pipe
+from .synchronize import Lock, BoundedSemaphore, Semaphore, Condition
+from .util import debug, info, Finalize, register_after_fork, is_exiting
+from .forking import assert_spawning
 
 #
 # Queue type using a pipe, buffer and thread
@@ -156,9 +156,13 @@
 
     def close(self):
         self._closed = True
-        self._reader.close()
-        if self._close:
-            self._close()
+        try:
+            self._reader.close()
+        finally:
+            close = self._close
+            if close:
+                self._close = None
+                close()
 
     def join_thread(self):
         debug('Queue.join_thread()')
@@ -229,8 +233,6 @@
     @staticmethod
     def _feed(buffer, notempty, send, writelock, close):
         debug('starting thread to feed data to pipe')
-        from .util import is_exiting
-
         nacquire = notempty.acquire
         nrelease = notempty.release
         nwait = notempty.wait
diff --git a/lib-python/2.7/multiprocessing/sharedctypes.py b/lib-python/2.7/multiprocessing/sharedctypes.py
--- a/lib-python/2.7/multiprocessing/sharedctypes.py
+++ b/lib-python/2.7/multiprocessing/sharedctypes.py
@@ -46,13 +46,18 @@
 #
 
 typecode_to_type = {
-    'c': ctypes.c_char,  'u': ctypes.c_wchar,
+    'c': ctypes.c_char,
     'b': ctypes.c_byte,  'B': ctypes.c_ubyte,
     'h': ctypes.c_short, 'H': ctypes.c_ushort,
     'i': ctypes.c_int,   'I': ctypes.c_uint,
     'l': ctypes.c_long,  'L': ctypes.c_ulong,
     'f': ctypes.c_float, 'd': ctypes.c_double
     }
+try:
+    typecode_to_type['u'] = ctypes.c_wchar
+except AttributeError:
+    pass
+
 
 #
 #
diff --git a/lib-python/2.7/multiprocessing/synchronize.py b/lib-python/2.7/multiprocessing/synchronize.py
--- a/lib-python/2.7/multiprocessing/synchronize.py
+++ b/lib-python/2.7/multiprocessing/synchronize.py
@@ -226,7 +226,7 @@
             num_waiters = (self._sleeping_count._semlock._get_value() -
                            self._woken_count._semlock._get_value())
         except Exception:
-            num_waiters = 'unkown'
+            num_waiters = 'unknown'
         return '<Condition(%s, %s)>' % (self._lock, num_waiters)
 
     def wait(self, timeout=None):
diff --git a/lib-python/2.7/multiprocessing/util.py b/lib-python/2.7/multiprocessing/util.py
--- a/lib-python/2.7/multiprocessing/util.py
+++ b/lib-python/2.7/multiprocessing/util.py
@@ -32,11 +32,13 @@
 # SUCH DAMAGE.
 #
 
+import os
 import itertools
 import weakref
 import atexit
 import threading        # we want threading to install it's
                         # cleanup function before multiprocessing does
+from subprocess import _args_from_interpreter_flags
 
 from multiprocessing.process import current_process, active_children
 
@@ -183,6 +185,7 @@
         self._args = args
         self._kwargs = kwargs or {}
         self._key = (exitpriority, _finalizer_counter.next())
+        self._pid = os.getpid()
 
         _finalizer_registry[self._key] = self
 
@@ -195,9 +198,13 @@
         except KeyError:
             sub_debug('finalizer no longer registered')
         else:
-            sub_debug('finalizer calling %s with args %s and kwargs %s',
-                     self._callback, self._args, self._kwargs)
-            res = self._callback(*self._args, **self._kwargs)
+            if self._pid != os.getpid():
+                sub_debug('finalizer ignored because different process')
+                res = None
+            else:
+                sub_debug('finalizer calling %s with args %s and kwargs %s',
+                          self._callback, self._args, self._kwargs)
+                res = self._callback(*self._args, **self._kwargs)
             self._weakref = self._callback = self._args = \
                             self._kwargs = self._key = None
             return res
@@ -328,10 +335,13 @@
 
 class ForkAwareThreadLock(object):
     def __init__(self):
+        self._reset()
+        register_after_fork(self, ForkAwareThreadLock._reset)
+
+    def _reset(self):
         self._lock = threading.Lock()
         self.acquire = self._lock.acquire
         self.release = self._lock.release
-        register_after_fork(self, ForkAwareThreadLock.__init__)
 
 class ForkAwareLocal(threading.local):
     def __init__(self):

-- 
Repository URL: https://hg.python.org/jython


More information about the Jython-checkins mailing list