[Python-checkins] cpython: Fixes issue #6766: Updated multiprocessing Proxy Objects to support nesting

davin.potts python-checkins at python.org
Wed Sep 7 19:53:25 EDT 2016


https://hg.python.org/cpython/rev/39e7307f9aee
changeset:   103261:39e7307f9aee
user:        Davin Potts <python at discontinuity.net>
date:        Wed Sep 07 18:48:01 2016 -0500
summary:
  Fixes issue #6766: Updated multiprocessing Proxy Objects to support nesting

files:
  Doc/library/multiprocessing.rst   |  85 +++++++++++------
  Lib/multiprocessing/managers.py   |  95 ++++++++++++++-----
  Lib/test/_test_multiprocessing.py |  70 +++++++++++++-
  3 files changed, 193 insertions(+), 57 deletions(-)


diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst
--- a/Doc/library/multiprocessing.rst
+++ b/Doc/library/multiprocessing.rst
@@ -1682,7 +1682,9 @@
    of processes.  Objects of this type are returned by
    :func:`multiprocessing.Manager`.
 
-   It also supports creation of shared lists and dictionaries.
+   Its methods create and return :ref:`multiprocessing-proxy_objects` for a
+   number of commonly used data types to be synchronized across processes.
+   This notably includes shared lists and dictionaries.
 
    .. method:: Barrier(parties[, action[, timeout]])
 
@@ -1745,31 +1747,17 @@
                dict(mapping)
                dict(sequence)
 
-      Create a shared ``dict`` object and return a proxy for it.
+      Create a shared :class:`dict` object and return a proxy for it.
 
    .. method:: list()
                list(sequence)
 
-      Create a shared ``list`` object and return a proxy for it.
-
-   .. note::
-
-      Modifications to mutable values or items in dict and list proxies will not
-      be propagated through the manager, because the proxy has no way of knowing
-      when its values or items are modified.  To modify such an item, you can
-      re-assign the modified object to the container proxy::
-
-         # create a list proxy and append a mutable object (a dictionary)
-         lproxy = manager.list()
-         lproxy.append({})
-         # now mutate the dictionary
-         d = lproxy[0]
-         d['a'] = 1
-         d['b'] = 2
-         # at this point, the changes to d are not yet synced, but by
-         # reassigning the dictionary, the proxy is notified of the change
-         lproxy[0] = d
-
+      Create a shared :class:`list` object and return a proxy for it.
+
+   .. versionchanged:: 3.6
+      Shared objects are capable of being nested.  For example, a shared
+      container object such as a shared list can contain other shared objects
+      which will all be managed and synchronized by the :class:`SyncManager`.
 
 .. class:: Namespace
 
@@ -1881,6 +1869,8 @@
     >>> s = m.get_server()
     >>> s.serve_forever()
 
+.. _multiprocessing-proxy_objects:
+
 Proxy Objects
 ~~~~~~~~~~~~~
 
@@ -1890,8 +1880,7 @@
 
 A proxy object has methods which invoke corresponding methods of its referent
 (although not every method of the referent will necessarily be available through
-the proxy).  A proxy can usually be used in most of the same ways that its
-referent can:
+the proxy).  In this way, a proxy can be used just like its referent can:
 
 .. doctest::
 
@@ -1912,9 +1901,9 @@
 the proxy.
 
 An important feature of proxy objects is that they are picklable so they can be
-passed between processes.  Note, however, that if a proxy is sent to the
-corresponding manager's process then unpickling it will produce the referent
-itself.  This means, for example, that one shared object can contain a second:
+passed between processes.  As such, a referent can contain
+:ref:`multiprocessing-proxy_objects`.  This permits nesting of these managed
+lists, dicts, and other :ref:`multiprocessing-proxy_objects`:
 
 .. doctest::
 
@@ -1922,10 +1911,46 @@
    >>> b = manager.list()
    >>> a.append(b)         # referent of a now contains referent of b
    >>> print(a, b)
-   [[]] []
+   [<ListProxy object, typeid 'list' at ...>] []
    >>> b.append('hello')
-   >>> print(a, b)
-   [['hello']] ['hello']
+   >>> print(a[0], b)
+   ['hello'] ['hello']
+
+Similarly, dict and list proxies may be nested inside one another::
+
+   >>> l_outer = manager.list([ manager.dict() for i in range(2) ])
+   >>> d_first_inner = l_outer[0]
+   >>> d_first_inner['a'] = 1
+   >>> d_first_inner['b'] = 2
+   >>> l_outer[1]['c'] = 3
+   >>> l_outer[1]['z'] = 26
+   >>> print(l_outer[0])
+   {'a': 1, 'b': 2}
+   >>> print(l_outer[1])
+   {'c': 3, 'z': 26}
+
+If standard (non-proxy) :class:`list` or :class:`dict` objects are contained
+in a referent, modifications to those mutable values will not be propagated
+through the manager because the proxy has no way of knowing when the values
+contained within are modified.  However, storing a value in a container proxy
+(which triggers a ``__setitem__`` on the proxy object) does propagate through
+the manager and so to effectively modify such an item, one could re-assign the
+modified value to the container proxy::
+
+   # create a list proxy and append a mutable object (a dictionary)
+   lproxy = manager.list()
+   lproxy.append({})
+   # now mutate the dictionary
+   d = lproxy[0]
+   d['a'] = 1
+   d['b'] = 2
+   # at this point, the changes to d are not yet synced, but by
+   # updating the dictionary, the proxy is notified of the change
+   lproxy[0] = d
+
+This approach is perhaps less convenient than employing nested
+:ref:`multiprocessing-proxy_objects` for most use cases but also
+demonstrates a level of control over the synchronization.
 
 .. note::
 
diff --git a/Lib/multiprocessing/managers.py b/Lib/multiprocessing/managers.py
--- a/Lib/multiprocessing/managers.py
+++ b/Lib/multiprocessing/managers.py
@@ -142,7 +142,8 @@
 
         self.id_to_obj = {'0': (None, ())}
         self.id_to_refcount = {}
-        self.mutex = threading.RLock()
+        self.id_to_local_proxy_obj = {}
+        self.mutex = threading.Lock()
 
     def serve_forever(self):
         '''
@@ -227,7 +228,14 @@
                 methodname = obj = None
                 request = recv()
                 ident, methodname, args, kwds = request
-                obj, exposed, gettypeid = id_to_obj[ident]
+                try:
+                    obj, exposed, gettypeid = id_to_obj[ident]
+                except KeyError as ke:
+                    try:
+                        obj, exposed, gettypeid = \
+                            self.id_to_local_proxy_obj[ident]
+                    except KeyError as second_ke:
+                        raise ke
 
                 if methodname not in exposed:
                     raise AttributeError(
@@ -308,7 +316,7 @@
         '''
         with self.mutex:
             result = []
-            keys = list(self.id_to_obj.keys())
+            keys = list(self.id_to_refcount.keys())
             keys.sort()
             for ident in keys:
                 if ident != '0':
@@ -321,7 +329,8 @@
         '''
         Number of shared objects
         '''
-        return len(self.id_to_obj) - 1      # don't count ident='0'
+        # Doesn't use (len(self.id_to_obj) - 1) as we shouldn't count ident='0'
+        return len(self.id_to_refcount)
 
     def shutdown(self, c):
         '''
@@ -363,13 +372,9 @@
             self.id_to_obj[ident] = (obj, set(exposed), method_to_typeid)
             if ident not in self.id_to_refcount:
                 self.id_to_refcount[ident] = 0
-            # increment the reference count immediately, to avoid
-            # this object being garbage collected before a Proxy
-            # object for it can be created.  The caller of create()
-            # is responsible for doing a decref once the Proxy object
-            # has been created.
-            self.incref(c, ident)
-            return ident, tuple(exposed)
+
+        self.incref(c, ident)
+        return ident, tuple(exposed)
 
     def get_methods(self, c, token):
         '''
@@ -387,15 +392,45 @@
 
     def incref(self, c, ident):
         with self.mutex:
-            self.id_to_refcount[ident] += 1
+            try:
+                self.id_to_refcount[ident] += 1
+            except KeyError as ke:
+                # If no external references exist but an internal (to the
+                # manager) still does and a new external reference is created
+                # from it, restore the manager's tracking of it from the
+                # previously stashed internal ref.
+                if ident in self.id_to_local_proxy_obj:
+                    self.id_to_refcount[ident] = 1
+                    self.id_to_obj[ident] = \
+                        self.id_to_local_proxy_obj[ident]
+                    obj, exposed, gettypeid = self.id_to_obj[ident]
+                    util.debug('Server re-enabled tracking & INCREF %r', ident)
+                else:
+                    raise ke
 
     def decref(self, c, ident):
+        if ident not in self.id_to_refcount and \
+            ident in self.id_to_local_proxy_obj:
+            util.debug('Server DECREF skipping %r', ident)
+            return
+
         with self.mutex:
             assert self.id_to_refcount[ident] >= 1
             self.id_to_refcount[ident] -= 1
             if self.id_to_refcount[ident] == 0:
-                del self.id_to_obj[ident], self.id_to_refcount[ident]
-                util.debug('disposing of obj with id %r', ident)
+                del self.id_to_refcount[ident]
+
+        if ident not in self.id_to_refcount:
+            # Two-step process in case the object turns out to contain other
+            # proxy objects (e.g. a managed list of managed lists).
+            # Otherwise, deleting self.id_to_obj[ident] would trigger the
+            # deleting of the stored value (another managed object) which would
+            # in turn attempt to acquire the mutex that is already held here.
+            self.id_to_obj[ident] = (None, (), None)  # thread-safe
+            util.debug('disposing of obj with id %r', ident)
+            with self.mutex:
+                del self.id_to_obj[ident]
+
 
 #
 # Class to represent state of a manager
@@ -658,7 +693,7 @@
     _mutex = util.ForkAwareThreadLock()
 
     def __init__(self, token, serializer, manager=None,
-                 authkey=None, exposed=None, incref=True):
+                 authkey=None, exposed=None, incref=True, manager_owned=False):
         with BaseProxy._mutex:
             tls_idset = BaseProxy._address_to_local.get(token.address, None)
             if tls_idset is None:
@@ -680,6 +715,12 @@
         self._serializer = serializer
         self._Client = listener_client[serializer][1]
 
+        # Should be set to True only when a proxy object is being created
+        # on the manager server; primary use case: nested proxy objects.
+        # RebuildProxy detects when a proxy is being created on the manager
+        # and sets this value appropriately.
+        self._owned_by_manager = manager_owned
+
         if authkey is not None:
             self._authkey = process.AuthenticationString(authkey)
         elif self._manager is not None:
@@ -738,6 +779,10 @@
         return self._callmethod('#GETVALUE')
 
     def _incref(self):
+        if self._owned_by_manager:
+            util.debug('owned_by_manager skipped INCREF of %r', self._token.id)
+            return
+
         conn = self._Client(self._token.address, authkey=self._authkey)
         dispatch(conn, None, 'incref', (self._id,))
         util.debug('INCREF %r', self._token.id)
@@ -822,19 +867,19 @@
 def RebuildProxy(func, token, serializer, kwds):
     '''
     Function used for unpickling proxy objects.
-
-    If possible the shared object is returned, or otherwise a proxy for it.
     '''
     server = getattr(process.current_process(), '_manager_server', None)
-
     if server and server.address == token.address:
-        return server.id_to_obj[token.id][0]
-    else:
-        incref = (
-            kwds.pop('incref', True) and
-            not getattr(process.current_process(), '_inheriting', False)
-            )
-        return func(token, serializer, incref=incref, **kwds)
+        util.debug('Rebuild a proxy owned by manager, token=%r', token)
+        kwds['manager_owned'] = True
+        if token.id not in server.id_to_local_proxy_obj:
+            server.id_to_local_proxy_obj[token.id] = \
+                server.id_to_obj[token.id]
+    incref = (
+        kwds.pop('incref', True) and
+        not getattr(process.current_process(), '_inheriting', False)
+        )
+    return func(token, serializer, incref=incref, **kwds)
 
 #
 # Functions to create proxies and proxy types
diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py
--- a/Lib/test/_test_multiprocessing.py
+++ b/Lib/test/_test_multiprocessing.py
@@ -1628,13 +1628,33 @@
         d = [a, b]
         e = self.list(d)
         self.assertEqual(
-            e[:],
+            [element[:] for element in e],
             [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
             )
 
         f = self.list([a])
         a.append('hello')
-        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
+        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
+
+    def test_list_proxy_in_list(self):
+        a = self.list([self.list(range(3)) for _i in range(3)])
+        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
+
+        a[0][-1] = 55
+        self.assertEqual(a[0][:], [0, 1, 55])
+        for i in range(1, 3):
+            self.assertEqual(a[i][:], [0, 1, 2])
+
+        self.assertEqual(a[1].pop(), 2)
+        self.assertEqual(len(a[1]), 2)
+        for i in range(0, 3, 2):
+            self.assertEqual(len(a[i]), 3)
+
+        del a
+
+        b = self.list()
+        b.append(b)
+        del b
 
     def test_dict(self):
         d = self.dict()
@@ -1646,6 +1666,52 @@
         self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
         self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
 
+    def test_dict_proxy_nested(self):
+        pets = self.dict(ferrets=2, hamsters=4)
+        supplies = self.dict(water=10, feed=3)
+        d = self.dict(pets=pets, supplies=supplies)
+
+        self.assertEqual(supplies['water'], 10)
+        self.assertEqual(d['supplies']['water'], 10)
+
+        d['supplies']['blankets'] = 5
+        self.assertEqual(supplies['blankets'], 5)
+        self.assertEqual(d['supplies']['blankets'], 5)
+
+        d['supplies']['water'] = 7
+        self.assertEqual(supplies['water'], 7)
+        self.assertEqual(d['supplies']['water'], 7)
+
+        del pets
+        del supplies
+        self.assertEqual(d['pets']['ferrets'], 2)
+        d['supplies']['blankets'] = 11
+        self.assertEqual(d['supplies']['blankets'], 11)
+
+        pets = d['pets']
+        supplies = d['supplies']
+        supplies['water'] = 7
+        self.assertEqual(supplies['water'], 7)
+        self.assertEqual(d['supplies']['water'], 7)
+
+        d.clear()
+        self.assertEqual(len(d), 0)
+        self.assertEqual(supplies['water'], 7)
+        self.assertEqual(pets['hamsters'], 4)
+
+        l = self.list([pets, supplies])
+        l[0]['marmots'] = 1
+        self.assertEqual(pets['marmots'], 1)
+        self.assertEqual(l[0]['marmots'], 1)
+
+        del pets
+        del supplies
+        self.assertEqual(l[0]['marmots'], 1)
+
+        outer = self.list([[88, 99], l])
+        self.assertIsInstance(outer[0], list)  # Not a ListProxy
+        self.assertEqual(outer[-1][-1]['feed'], 3)
+
     def test_namespace(self):
         n = self.Namespace()
         n.name = 'Bob'

-- 
Repository URL: https://hg.python.org/cpython


More information about the Python-checkins mailing list