[py-svn] r13661 - py/branch/execnet-refactoring

arigo at codespeak.net arigo at codespeak.net
Tue Jun 21 18:15:17 CEST 2005


Author: arigo
Date: Tue Jun 21 18:15:15 2005
New Revision: 13661

Modified:
   py/branch/execnet-refactoring/NOTES
   py/branch/execnet-refactoring/channel.py
Log:
Updated the NOTES with precise states and invariants.
This helped clarify the order in which to do some
operations in channel.py.


Modified: py/branch/execnet-refactoring/NOTES
==============================================================================
--- py/branch/execnet-refactoring/NOTES	(original)
+++ py/branch/execnet-refactoring/NOTES	Tue Jun 21 18:15:15 2005
@@ -1,3 +1,7 @@
+=============================================================================
+                      Channel implementation notes
+=============================================================================
+
 
 The public API of channels make them appear either opened or closed.
 When a channel is closed, we can't send any more items, and it will not
@@ -27,69 +31,92 @@
                                   ---> {id: callback}
 
 
-Channel:
 
-    __del__():
-        if not closed:
-            if has_callback:
-                send a CHANNEL_LAST_MESSAGE
-            else:
-                send a CHANNEL_CLOSE
-        if stickyerror:
-            warning("unhandled:", stickyerror)
-
-    close():
-        lock
-        if not closed:
-            send a CHANNEL_CLOSE
-            closed = True
-            channelfactory.closed(id)
-            receive_closed.set()
-        unlock
-
-    send():
-        lock
-        if closed:
-            raise!
-        send a CHANNEL_DATA
-        unlock
-
-    waitclose():
-        wait for receive_closed
-        if stickyerror:
-            stickyerror = None
-            raise stickyerror
-
-    receive():
-        x = queue.pop()
-        if x is END_MARKER:
-            queue.push(x)
-            stickyerror = None
-            raise stickyerror
-
-
-receive CHANNEL_DATA(id, data):
-    if id in callbacks:
-        callbacks[id](data)
-    else:
-        c = channels[id]
-        if no KeyError:
-            c.queue.push(data)
-
-receive CHANNEL_CLOSE(id, error=EOFError()):
-    del callbacks[id]
-    c = channels.pop(id)
-    if not KeyError:
-        c.stickyerror = error
-        c.closed = True
-        c.receive_closed.set()
-        c.queue.push(END_MARKER)
-    elif error:
-        warning("unhandled:", error)
-
-receive CHANNEL_LAST_MESSAGE(id):
-    del callbacks[id]
-    c = channels.pop(id)
-    if not KeyError:
-        c.receive_closed.set()
-        c.queue.push(END_MARKER)
+State and invariants of Channel objects
+---------------------------------------
+
+_channels and _callbacks are dictionaries on the ChannelFactory.
+Other attributes are on the Channel objects.
+
+All states are valid at any time (even with multithreading) unless
+marked with {E}, which means that they may be temporary invalid.
+They are eventually restored.
+
+
+States ("sendonly" means opened but won't receive any more items):
+
+  opened               sendonly          closed                deleted
+ =================    ==============    ==================    ===============
+  not _closed          not _closed       _closed               <no ref left>
+  not _receiveclosed   _receiveclosed    {E} _receiveclosed
+
+In the presence of callbacks, "deleted" does not imply "closed" nor "sendonly".
+It only means that no more items can be sent.  The (logical) channel can
+continue to receive data via the call-back even if the channel object no
+longer exists.
+
+
+The two kinds of channels, with or without callback:
+
+   items read by receive()           has a callback
+  =============================     =======================================
+   _items is a Queue                 _items is None
+   id not in _callbacks
+                                     state==opened: id in _callbacks
+   {E} state==sendonly: there is     {E} state!=opened: id not in _callbacks
+         an ENDMARKER in _items
+   {E} state==closed: there is
+         an ENDMARKER in _items
+
+Callback calls should be considered asynchronuous.  The channel can be in any
+state and change its state while the callback runs.
+
+
+The ChannelFactory's WeakValueDictionary _channels maps some ids to their
+channel object, depending on their state:
+
+  opened               sendonly          closed              deleted
+ =================    ==============    ================    ===============
+  id in _channels      {E} not in        {E} not in          not in
+
+
+All received RemoteErrors are handled exactly once: they are normally
+re-raised once in waitclose() or receive().  If it is not possible, they are
+at the moment dumped to stderr.  (XXX should use logging/tracing)
+Only channels in {E} "closed" state can hold RemoteErrors.
+
+
+Methods:
+
+ * close()      returns with the channel in "closed" state
+ * send()       either send the data or raise if "closed"
+ * receive()    wait for the next item.  If no item left and the state
+                   changes to non-"opened", raise
+ * waitclose()  wait for a non-"opened" state
+
+
+Assuming the channel is connected and the connexion is alive, the local state
+eventually influences the state of the corresponding remote channel object:
+
+    local |   opened    sendonly    closed    deleted
+remote    |
+=======================================================
+          |
+   opened |     ok         n/a        (1)       (2)
+          |
+ sendonly |     n/a        n/a        n/a       ok
+          |
+   closed |     (1)        n/a        ok        ok
+          |
+  deleted |     (2)        ok         ok        ok
+
+(1)  The side with the closed channel object must send a CHANNEL_CLOSE message,
+     which will eventually put the other side's channel in "closed" state if
+     it is still "opened".
+
+(2)  If the deleted channel has no callback, this is equivalent to (1).
+     Otherwide, the side with the deleted channel must send a
+     CHANNEL_LAST_MESSAGE, which will eventually put the other side's channel in
+     "sendonly" state if it is still "opened".
+
+n/a  These configuration should never occur.

Modified: py/branch/execnet-refactoring/channel.py
==============================================================================
--- py/branch/execnet-refactoring/channel.py	(original)
+++ py/branch/execnet-refactoring/channel.py	Tue Jun 21 18:15:15 2005
@@ -32,9 +32,9 @@
             self._items = None
         else:
             self._items = Queue.Queue()
-        self._receiveclosed = threading.Event()
         self._closed = False
-        self._sendlock = threading.Lock()
+        self._receiveclosed = threading.Event()
+        self._remoteerrors = []
 
     def __repr__(self):
         flag = self.isclosed() and "closed" or "open"
@@ -44,22 +44,28 @@
         if self.gateway is None:   # can be None in tests
             return
         self.gateway.trace("Channel(%d).__del__" % self.id)
-        if not self._closed:
+        # no multithreading issues here, because we have the last ref to 'self'
+        if self._closed:
+            # state transition "closed" --> "deleted"
+            for error in self._remoteerrors:
+                error.warn()
+        elif self._receiveclosed.isSet():
+            # state transition "sendonly" --> "deleted"
+            # the remote channel is already in "deleted" state, nothing to do
+            pass
+        else:
+            # state transition "opened" --> "deleted"
             if self._items is None:    # has_callback
                 Msg = Message.CHANNEL_LAST_MESSAGE
             else:
                 Msg = Message.CHANNEL_CLOSE
             self.gateway._outgoing.put(Msg(self.id))
-        else:
-            error = self._getstickyerror()
-            if isinstance(error, RemoteError):
-                error.warn()
 
-    def _getstickyerror(self):
+    def _getremoteerror(self):
         try:
-            return self.__dict__.pop('_stickyerror')
-        except KeyError:
-            return EOFError()
+            return self._remoteerrors.pop(0)
+        except IndexError:
+            return None
 
     #
     # public API for channel objects 
@@ -81,21 +87,22 @@
 
     def close(self, error=None):
         """ close down this channel on both sides. """
-        self._sendlock.acquire()
-        try:
-            if not self._closed:
-                put = self.gateway._outgoing.put
-                if error is not None:
-                    put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
-                else:
-                    put(Message.CHANNEL_CLOSE(self.id))
-                if isinstance(error, RemoteError):
-                    self._stickyerror = error
-                self._closed = True
-                self.gateway.channelfactory._closed(self.id)
-                self._receiveclosed.set()
-        finally:
-            self._sendlock.release()
+        if not self._closed:
+            # state transition "opened/sendonly" --> "closed"
+            # threads warning: the channel might be closed under our feet,
+            # but it's never damaging to send too many CHANNEL_CLOSE messages
+            put = self.gateway._outgoing.put
+            if error is not None:
+                put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
+            else:
+                put(Message.CHANNEL_CLOSE(self.id))
+            if isinstance(error, RemoteError):
+                self._remoteerrors.append(error)
+            self._closed = True         # --> "closed"
+            self._receiveclosed.set()
+            if self._items is not None:
+                self._items.put(ENDMARKER)
+            self.gateway.channelfactory._no_longer_opened(self.id)
 
     def waitclose(self, timeout):
         """ wait until this channel is closed (or the remote side
@@ -105,11 +112,11 @@
         the other side as channel.RemoteErrors containing a a textual
         representation of the remote traceback.
         """
-        self._receiveclosed.wait(timeout=timeout)
+        self._receiveclosed.wait(timeout=timeout)  # wait for non-"opened" state
         if not self._receiveclosed.isSet():
             raise IOError, "Timeout"
-        error = self._getstickyerror()
-        if isinstance(error, self.RemoteError):
+        error = self._getremoteerror()
+        if error:
             raise error
 
     def send(self, item):
@@ -117,17 +124,13 @@
         possibly blocking if the sender queue is full.
         Note that an item needs to be marshallable.
         """
-        self._sendlock.acquire()
-        try:
-            if self.isclosed(): 
-                raise IOError, "cannot send to %r" %(self,) 
-            if isinstance(item, Channel):
-                data = Message.CHANNEL_NEW(self.id, item.id)
-            else:
-                data = Message.CHANNEL_DATA(self.id, item)
-            self.gateway._outgoing.put(data)
-        finally:
-            self._sendlock.release()
+        if self.isclosed(): 
+            raise IOError, "cannot send to %r" %(self,) 
+        if isinstance(item, Channel):
+            data = Message.CHANNEL_NEW(self.id, item.id)
+        else:
+            data = Message.CHANNEL_DATA(self.id, item)
+        self.gateway._outgoing.put(data)
 
     def receive(self):
         """receives an item that was sent from the other side,
@@ -141,7 +144,7 @@
         x = self._items.get()
         if x is ENDMARKER: 
             self._items.put(x)  # for other receivers 
-            raise self._getstickyerror()
+            raise self._getremoteerror() or EOFError()
         else: 
             return x
     
@@ -192,55 +195,60 @@
     #
     # internal methods, called from the receiver thread 
     #
-    def _closed(self, id):
+    def _no_longer_opened(self, id):
         try:
-            del self._callbacks[id]
+            del self._channels[id]
         except KeyError:
             pass
         try:
-            channel = self._channels.pop(id)
+            del self._callbacks[id]
         except KeyError:
-            channel = None
-        return channel
+            pass
 
-    def _local_close(self, id, stickyerror=None):
-        channel = self._closed(id)
+    def _local_close(self, id, remoteerror=None):
+        channel = self._channels.get(id)
         if channel is None:
-            if isinstance(stickyerror, RemoteError):
-                stickyerror.warn()
+            # channel already in "deleted" state
+            if remoteerror:
+                remoteerror.warn()
         else:
-            if isinstance(stickyerror, RemoteError):
-                channel._stickyerror = stickyerror
-            channel._closed = True
+            # state transition to "closed" state
+            if remoteerror:
+                channel._remoteerrors.append(remoteerror)
+            channel._closed = True          # --> "closed"
             channel._receiveclosed.set()
             if channel._items is not None:
                 channel._items.put(ENDMARKER)
+        self._no_longer_opened(id)
 
     def _local_last_message(self, id):
-        channel = self._closed(id)
-        if channel is not None:
+        channel = self._channels.get(id)
+        if channel is None:
+            # channel already in "deleted" state
+            pass
+        else:
+            # state transition: if "opened", change to "sendonly"
             channel._receiveclosed.set()
             if channel._items is not None:
                 channel._items.put(ENDMARKER)
+        self._no_longer_opened(id)
 
     def _local_receive(self, id, data): 
         # executes in receiver thread
-        try:
-            callback = self._callbacks[id]
-        except KeyError:
-            try:
-                channel = self._channels[id]
-            except KeyError:
+        callback = self._callbacks.get(id)
+        if callback is not None:
+            callback(data)   # even if channel may be already closed
+        else:
+            channel = self._channels.get(id)
+            if channel is None or channel._items is None:
                 pass    # drop data
             else:
                 channel._items.put(data)
-        else:
-            callback(data)
 
     def _finished_receiving(self):
-        self._callbacks.clear()
         for id in self._channels.keys():
             self._local_last_message(id)
+        self._callbacks.clear()
 
 
 class ChannelFile:



More information about the pytest-commit mailing list