[Jython-checkins] jython: Refresh multiprocessing from CPython-2.7.11
darjus.loktevic
jython-checkins at python.org
Sun Feb 28 21:52:59 EST 2016
https://hg.python.org/jython/rev/d5594657fdba
changeset: 7913:d5594657fdba
user: Darjus Loktevic <darjus at gmail.com>
date: Mon Feb 29 13:52:45 2016 +1100
summary:
Refresh multiprocessing from CPython-2.7.11
files:
lib-python/2.7/multiprocessing/connection.py | 32 +++++--
lib-python/2.7/multiprocessing/dummy/__init__.py | 2 +-
lib-python/2.7/multiprocessing/forking.py | 21 ++++-
lib-python/2.7/multiprocessing/managers.py | 1 +
lib-python/2.7/multiprocessing/pool.py | 43 ++++++---
lib-python/2.7/multiprocessing/process.py | 2 +-
lib-python/2.7/multiprocessing/queues.py | 20 ++--
lib-python/2.7/multiprocessing/sharedctypes.py | 7 +-
lib-python/2.7/multiprocessing/synchronize.py | 2 +-
lib-python/2.7/multiprocessing/util.py | 18 +++-
10 files changed, 104 insertions(+), 44 deletions(-)
diff --git a/lib-python/2.7/multiprocessing/connection.py b/lib-python/2.7/multiprocessing/connection.py
--- a/lib-python/2.7/multiprocessing/connection.py
+++ b/lib-python/2.7/multiprocessing/connection.py
@@ -90,7 +90,7 @@
return tempfile.mktemp(prefix='listener-', dir=get_temp_dir())
elif family == 'AF_PIPE':
return tempfile.mktemp(prefix=r'\\.\pipe\pyc-%d-%d-' %
- (os.getpid(), _mmap_counter.next()))
+ (os.getpid(), _mmap_counter.next()), dir="")
else:
raise ValueError('unrecognized family')
@@ -270,7 +270,14 @@
self._unlink = None
def accept(self):
- s, self._last_accepted = self._socket.accept()
+ while True:
+ try:
+ s, self._last_accepted = self._socket.accept()
+ except socket.error as e:
+ if e.args[0] != errno.EINTR:
+ raise
+ else:
+ break
s.setblocking(True)
fd = duplicate(s.fileno())
conn = _multiprocessing.Connection(fd)
@@ -278,24 +285,29 @@
return conn
def close(self):
- self._socket.close()
- if self._unlink is not None:
- self._unlink()
+ try:
+ self._socket.close()
+ finally:
+ unlink = self._unlink
+ if unlink is not None:
+ self._unlink = None
+ unlink()
def SocketClient(address):
'''
Return a connection object connected to the socket given by `address`
'''
- family = address_type(address)
- s = socket.socket( getattr(socket, family) )
- s.setblocking(True)
+ family = getattr(socket, address_type(address))
t = _init_timeout()
while 1:
+ s = socket.socket(family)
+ s.setblocking(True)
try:
s.connect(address)
except socket.error, e:
+ s.close()
if e.args[0] != errno.ECONNREFUSED or _check_timeout(t):
debug('failed to connect to address %s', address)
raise
@@ -446,10 +458,10 @@
return self._loads(s)
def _xml_dumps(obj):
- return xmlrpclib.dumps((obj,), None, None, None, 1).encode('utf8')
+ return xmlrpclib.dumps((obj,), None, None, None, 1)
def _xml_loads(s):
- (obj,), method = xmlrpclib.loads(s.decode('utf8'))
+ (obj,), method = xmlrpclib.loads(s)
return obj
class XmlListener(Listener):
diff --git a/lib-python/2.7/multiprocessing/dummy/__init__.py b/lib-python/2.7/multiprocessing/dummy/__init__.py
--- a/lib-python/2.7/multiprocessing/dummy/__init__.py
+++ b/lib-python/2.7/multiprocessing/dummy/__init__.py
@@ -138,7 +138,7 @@
self._value = value
value = property(_get, _set)
def __repr__(self):
- return '<%r(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
+ return '<%s(%r, %r)>'%(type(self).__name__,self._typecode,self._value)
def Manager():
return sys.modules[__name__]
diff --git a/lib-python/2.7/multiprocessing/forking.py b/lib-python/2.7/multiprocessing/forking.py
--- a/lib-python/2.7/multiprocessing/forking.py
+++ b/lib-python/2.7/multiprocessing/forking.py
@@ -361,12 +361,13 @@
return [sys.executable, '--multiprocessing-fork']
else:
prog = 'from multiprocessing.forking import main; main()'
- return [_python_exe, '-c', prog, '--multiprocessing-fork']
+ opts = util._args_from_interpreter_flags()
+ return [_python_exe] + opts + ['-c', prog, '--multiprocessing-fork']
def main():
'''
- Run code specifed by data received over pipe
+ Run code specified by data received over pipe
'''
assert is_forking(sys.argv)
@@ -469,12 +470,26 @@
process.ORIGINAL_DIR = data['orig_dir']
if 'main_path' in data:
+ # XXX (ncoghlan): The following code makes several bogus
+ # assumptions regarding the relationship between __file__
+ # and a module's real name. See PEP 302 and issue #10845
+ # The problem is resolved properly in Python 3.4+, as
+ # described in issue #19946
+
main_path = data['main_path']
main_name = os.path.splitext(os.path.basename(main_path))[0]
if main_name == '__init__':
main_name = os.path.basename(os.path.dirname(main_path))
- if main_name != 'ipython':
+ if main_name == '__main__':
+ # For directory and zipfile execution, we assume an implicit
+ # "if __name__ == '__main__':" around the module, and don't
+ # rerun the main module code in spawned processes
+ main_module = sys.modules['__main__']
+ main_module.__file__ = main_path
+ elif main_name != 'ipython':
+ # Main modules not actually called __main__.py may
+ # contain additional code that should still be executed
import imp
if main_path is None:
diff --git a/lib-python/2.7/multiprocessing/managers.py b/lib-python/2.7/multiprocessing/managers.py
--- a/lib-python/2.7/multiprocessing/managers.py
+++ b/lib-python/2.7/multiprocessing/managers.py
@@ -763,6 +763,7 @@
elif kind == '#PROXY':
exposed, token = result
proxytype = self._manager._registry[token.typeid][-1]
+ token.address = self._token.address
proxy = proxytype(
token, self._serializer, manager=self._manager,
authkey=self._authkey, exposed=exposed
diff --git a/lib-python/2.7/multiprocessing/pool.py b/lib-python/2.7/multiprocessing/pool.py
--- a/lib-python/2.7/multiprocessing/pool.py
+++ b/lib-python/2.7/multiprocessing/pool.py
@@ -169,7 +169,8 @@
self._task_handler = threading.Thread(
target=Pool._handle_tasks,
- args=(self._taskqueue, self._quick_put, self._outqueue, self._pool)
+ args=(self._taskqueue, self._quick_put, self._outqueue,
+ self._pool, self._cache)
)
self._task_handler.daemon = True
self._task_handler._state = RUN
@@ -329,26 +330,38 @@
debug('worker handler exiting')
@staticmethod
- def _handle_tasks(taskqueue, put, outqueue, pool):
+ def _handle_tasks(taskqueue, put, outqueue, pool, cache):
thread = threading.current_thread()
for taskseq, set_length in iter(taskqueue.get, None):
+ task = None
i = -1
- for i, task in enumerate(taskseq):
- if thread._state:
- debug('task handler found thread._state != RUN')
- break
- try:
- put(task)
- except IOError:
- debug('could not put task on queue')
- break
- else:
+ try:
+ for i, task in enumerate(taskseq):
+ if thread._state:
+ debug('task handler found thread._state != RUN')
+ break
+ try:
+ put(task)
+ except Exception as e:
+ job, ind = task[:2]
+ try:
+ cache[job]._set(ind, (False, e))
+ except KeyError:
+ pass
+ else:
+ if set_length:
+ debug('doing set_length()')
+ set_length(i+1)
+ continue
+ break
+ except Exception as ex:
+ job, ind = task[:2] if task else (0, 0)
+ if job in cache:
+ cache[job]._set(ind + 1, (False, ex))
if set_length:
debug('doing set_length()')
set_length(i+1)
- continue
- break
else:
debug('task handler got sentinel')
@@ -565,6 +578,8 @@
self._cond.release()
del self._cache[self._job]
+AsyncResult = ApplyResult # create alias -- see #17805
+
#
# Class whose instances are returned by `Pool.map_async()`
#
diff --git a/lib-python/2.7/multiprocessing/process.py b/lib-python/2.7/multiprocessing/process.py
--- a/lib-python/2.7/multiprocessing/process.py
+++ b/lib-python/2.7/multiprocessing/process.py
@@ -267,7 +267,7 @@
else:
sys.stderr.write(str(e.args[0]) + '\n')
sys.stderr.flush()
- exitcode = 0 if isinstance(e.args[0], str) else 1
+ exitcode = 1
except:
exitcode = 1
import traceback
diff --git a/lib-python/2.7/multiprocessing/queues.py b/lib-python/2.7/multiprocessing/queues.py
--- a/lib-python/2.7/multiprocessing/queues.py
+++ b/lib-python/2.7/multiprocessing/queues.py
@@ -44,10 +44,10 @@
from Queue import Empty, Full
import _multiprocessing
-from multiprocessing import Pipe
-from multiprocessing.synchronize import Lock, BoundedSemaphore, Semaphore, Condition
-from multiprocessing.util import debug, info, Finalize, register_after_fork
-from multiprocessing.forking import assert_spawning
+from . import Pipe
+from .synchronize import Lock, BoundedSemaphore, Semaphore, Condition
+from .util import debug, info, Finalize, register_after_fork, is_exiting
+from .forking import assert_spawning
#
# Queue type using a pipe, buffer and thread
@@ -156,9 +156,13 @@
def close(self):
self._closed = True
- self._reader.close()
- if self._close:
- self._close()
+ try:
+ self._reader.close()
+ finally:
+ close = self._close
+ if close:
+ self._close = None
+ close()
def join_thread(self):
debug('Queue.join_thread()')
@@ -229,8 +233,6 @@
@staticmethod
def _feed(buffer, notempty, send, writelock, close):
debug('starting thread to feed data to pipe')
- from .util import is_exiting
-
nacquire = notempty.acquire
nrelease = notempty.release
nwait = notempty.wait
diff --git a/lib-python/2.7/multiprocessing/sharedctypes.py b/lib-python/2.7/multiprocessing/sharedctypes.py
--- a/lib-python/2.7/multiprocessing/sharedctypes.py
+++ b/lib-python/2.7/multiprocessing/sharedctypes.py
@@ -46,13 +46,18 @@
#
typecode_to_type = {
- 'c': ctypes.c_char, 'u': ctypes.c_wchar,
+ 'c': ctypes.c_char,
'b': ctypes.c_byte, 'B': ctypes.c_ubyte,
'h': ctypes.c_short, 'H': ctypes.c_ushort,
'i': ctypes.c_int, 'I': ctypes.c_uint,
'l': ctypes.c_long, 'L': ctypes.c_ulong,
'f': ctypes.c_float, 'd': ctypes.c_double
}
+try:
+ typecode_to_type['u'] = ctypes.c_wchar
+except AttributeError:
+ pass
+
#
#
diff --git a/lib-python/2.7/multiprocessing/synchronize.py b/lib-python/2.7/multiprocessing/synchronize.py
--- a/lib-python/2.7/multiprocessing/synchronize.py
+++ b/lib-python/2.7/multiprocessing/synchronize.py
@@ -226,7 +226,7 @@
num_waiters = (self._sleeping_count._semlock._get_value() -
self._woken_count._semlock._get_value())
except Exception:
- num_waiters = 'unkown'
+ num_waiters = 'unknown'
return '<Condition(%s, %s)>' % (self._lock, num_waiters)
def wait(self, timeout=None):
diff --git a/lib-python/2.7/multiprocessing/util.py b/lib-python/2.7/multiprocessing/util.py
--- a/lib-python/2.7/multiprocessing/util.py
+++ b/lib-python/2.7/multiprocessing/util.py
@@ -32,11 +32,13 @@
# SUCH DAMAGE.
#
+import os
import itertools
import weakref
import atexit
import threading # we want threading to install it's
# cleanup function before multiprocessing does
+from subprocess import _args_from_interpreter_flags
from multiprocessing.process import current_process, active_children
@@ -183,6 +185,7 @@
self._args = args
self._kwargs = kwargs or {}
self._key = (exitpriority, _finalizer_counter.next())
+ self._pid = os.getpid()
_finalizer_registry[self._key] = self
@@ -195,9 +198,13 @@
except KeyError:
sub_debug('finalizer no longer registered')
else:
- sub_debug('finalizer calling %s with args %s and kwargs %s',
- self._callback, self._args, self._kwargs)
- res = self._callback(*self._args, **self._kwargs)
+ if self._pid != os.getpid():
+ sub_debug('finalizer ignored because different process')
+ res = None
+ else:
+ sub_debug('finalizer calling %s with args %s and kwargs %s',
+ self._callback, self._args, self._kwargs)
+ res = self._callback(*self._args, **self._kwargs)
self._weakref = self._callback = self._args = \
self._kwargs = self._key = None
return res
@@ -328,10 +335,13 @@
class ForkAwareThreadLock(object):
def __init__(self):
+ self._reset()
+ register_after_fork(self, ForkAwareThreadLock._reset)
+
+ def _reset(self):
self._lock = threading.Lock()
self.acquire = self._lock.acquire
self.release = self._lock.release
- register_after_fork(self, ForkAwareThreadLock.__init__)
class ForkAwareLocal(threading.local):
def __init__(self):
--
Repository URL: https://hg.python.org/jython
More information about the Jython-checkins
mailing list