[py-svn] r13661 - py/branch/execnet-refactoring
arigo at codespeak.net
arigo at codespeak.net
Tue Jun 21 18:15:17 CEST 2005
Author: arigo
Date: Tue Jun 21 18:15:15 2005
New Revision: 13661
Modified:
py/branch/execnet-refactoring/NOTES
py/branch/execnet-refactoring/channel.py
Log:
Updated the NOTES with precise states and invariants.
This helped clarify the order in which to do some
operations in channel.py.
Modified: py/branch/execnet-refactoring/NOTES
==============================================================================
--- py/branch/execnet-refactoring/NOTES (original)
+++ py/branch/execnet-refactoring/NOTES Tue Jun 21 18:15:15 2005
@@ -1,3 +1,7 @@
+=============================================================================
+ Channel implementation notes
+=============================================================================
+
The public API of channels make them appear either opened or closed.
When a channel is closed, we can't send any more items, and it will not
@@ -27,69 +31,92 @@
---> {id: callback}
-Channel:
- __del__():
- if not closed:
- if has_callback:
- send a CHANNEL_LAST_MESSAGE
- else:
- send a CHANNEL_CLOSE
- if stickyerror:
- warning("unhandled:", stickyerror)
-
- close():
- lock
- if not closed:
- send a CHANNEL_CLOSE
- closed = True
- channelfactory.closed(id)
- receive_closed.set()
- unlock
-
- send():
- lock
- if closed:
- raise!
- send a CHANNEL_DATA
- unlock
-
- waitclose():
- wait for receive_closed
- if stickyerror:
- stickyerror = None
- raise stickyerror
-
- receive():
- x = queue.pop()
- if x is END_MARKER:
- queue.push(x)
- stickyerror = None
- raise stickyerror
-
-
-receive CHANNEL_DATA(id, data):
- if id in callbacks:
- callbacks[id](data)
- else:
- c = channels[id]
- if no KeyError:
- c.queue.push(data)
-
-receive CHANNEL_CLOSE(id, error=EOFError()):
- del callbacks[id]
- c = channels.pop(id)
- if not KeyError:
- c.stickyerror = error
- c.closed = True
- c.receive_closed.set()
- c.queue.push(END_MARKER)
- elif error:
- warning("unhandled:", error)
-
-receive CHANNEL_LAST_MESSAGE(id):
- del callbacks[id]
- c = channels.pop(id)
- if not KeyError:
- c.receive_closed.set()
- c.queue.push(END_MARKER)
+State and invariants of Channel objects
+---------------------------------------
+
+_channels and _callbacks are dictionaries on the ChannelFactory.
+Other attributes are on the Channel objects.
+
+All states are valid at any time (even with multithreading) unless
+marked with {E}, which means that they may be temporary invalid.
+They are eventually restored.
+
+
+States ("sendonly" means opened but won't receive any more items):
+
+ opened sendonly closed deleted
+ ================= ============== ================== ===============
+ not _closed not _closed _closed <no ref left>
+ not _receiveclosed _receiveclosed {E} _receiveclosed
+
+In the presence of callbacks, "deleted" does not imply "closed" nor "sendonly".
+It only means that no more items can be sent. The (logical) channel can
+continue to receive data via the call-back even if the channel object no
+longer exists.
+
+
+The two kinds of channels, with or without callback:
+
+ items read by receive() has a callback
+ ============================= =======================================
+ _items is a Queue _items is None
+ id not in _callbacks
+ state==opened: id in _callbacks
+ {E} state==sendonly: there is {E} state!=opened: id not in _callbacks
+ an ENDMARKER in _items
+ {E} state==closed: there is
+ an ENDMARKER in _items
+
+Callback calls should be considered asynchronuous. The channel can be in any
+state and change its state while the callback runs.
+
+
+The ChannelFactory's WeakValueDictionary _channels maps some ids to their
+channel object, depending on their state:
+
+ opened sendonly closed deleted
+ ================= ============== ================ ===============
+ id in _channels {E} not in {E} not in not in
+
+
+All received RemoteErrors are handled exactly once: they are normally
+re-raised once in waitclose() or receive(). If it is not possible, they are
+at the moment dumped to stderr. (XXX should use logging/tracing)
+Only channels in {E} "closed" state can hold RemoteErrors.
+
+
+Methods:
+
+ * close() returns with the channel in "closed" state
+ * send() either send the data or raise if "closed"
+ * receive() wait for the next item. If no item left and the state
+ changes to non-"opened", raise
+ * waitclose() wait for a non-"opened" state
+
+
+Assuming the channel is connected and the connexion is alive, the local state
+eventually influences the state of the corresponding remote channel object:
+
+ local | opened sendonly closed deleted
+remote |
+=======================================================
+ |
+ opened | ok n/a (1) (2)
+ |
+ sendonly | n/a n/a n/a ok
+ |
+ closed | (1) n/a ok ok
+ |
+ deleted | (2) ok ok ok
+
+(1) The side with the closed channel object must send a CHANNEL_CLOSE message,
+ which will eventually put the other side's channel in "closed" state if
+ it is still "opened".
+
+(2) If the deleted channel has no callback, this is equivalent to (1).
+ Otherwide, the side with the deleted channel must send a
+ CHANNEL_LAST_MESSAGE, which will eventually put the other side's channel in
+ "sendonly" state if it is still "opened".
+
+n/a These configuration should never occur.
Modified: py/branch/execnet-refactoring/channel.py
==============================================================================
--- py/branch/execnet-refactoring/channel.py (original)
+++ py/branch/execnet-refactoring/channel.py Tue Jun 21 18:15:15 2005
@@ -32,9 +32,9 @@
self._items = None
else:
self._items = Queue.Queue()
- self._receiveclosed = threading.Event()
self._closed = False
- self._sendlock = threading.Lock()
+ self._receiveclosed = threading.Event()
+ self._remoteerrors = []
def __repr__(self):
flag = self.isclosed() and "closed" or "open"
@@ -44,22 +44,28 @@
if self.gateway is None: # can be None in tests
return
self.gateway.trace("Channel(%d).__del__" % self.id)
- if not self._closed:
+ # no multithreading issues here, because we have the last ref to 'self'
+ if self._closed:
+ # state transition "closed" --> "deleted"
+ for error in self._remoteerrors:
+ error.warn()
+ elif self._receiveclosed.isSet():
+ # state transition "sendonly" --> "deleted"
+ # the remote channel is already in "deleted" state, nothing to do
+ pass
+ else:
+ # state transition "opened" --> "deleted"
if self._items is None: # has_callback
Msg = Message.CHANNEL_LAST_MESSAGE
else:
Msg = Message.CHANNEL_CLOSE
self.gateway._outgoing.put(Msg(self.id))
- else:
- error = self._getstickyerror()
- if isinstance(error, RemoteError):
- error.warn()
- def _getstickyerror(self):
+ def _getremoteerror(self):
try:
- return self.__dict__.pop('_stickyerror')
- except KeyError:
- return EOFError()
+ return self._remoteerrors.pop(0)
+ except IndexError:
+ return None
#
# public API for channel objects
@@ -81,21 +87,22 @@
def close(self, error=None):
""" close down this channel on both sides. """
- self._sendlock.acquire()
- try:
- if not self._closed:
- put = self.gateway._outgoing.put
- if error is not None:
- put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
- else:
- put(Message.CHANNEL_CLOSE(self.id))
- if isinstance(error, RemoteError):
- self._stickyerror = error
- self._closed = True
- self.gateway.channelfactory._closed(self.id)
- self._receiveclosed.set()
- finally:
- self._sendlock.release()
+ if not self._closed:
+ # state transition "opened/sendonly" --> "closed"
+ # threads warning: the channel might be closed under our feet,
+ # but it's never damaging to send too many CHANNEL_CLOSE messages
+ put = self.gateway._outgoing.put
+ if error is not None:
+ put(Message.CHANNEL_CLOSE_ERROR(self.id, str(error)))
+ else:
+ put(Message.CHANNEL_CLOSE(self.id))
+ if isinstance(error, RemoteError):
+ self._remoteerrors.append(error)
+ self._closed = True # --> "closed"
+ self._receiveclosed.set()
+ if self._items is not None:
+ self._items.put(ENDMARKER)
+ self.gateway.channelfactory._no_longer_opened(self.id)
def waitclose(self, timeout):
""" wait until this channel is closed (or the remote side
@@ -105,11 +112,11 @@
the other side as channel.RemoteErrors containing a a textual
representation of the remote traceback.
"""
- self._receiveclosed.wait(timeout=timeout)
+ self._receiveclosed.wait(timeout=timeout) # wait for non-"opened" state
if not self._receiveclosed.isSet():
raise IOError, "Timeout"
- error = self._getstickyerror()
- if isinstance(error, self.RemoteError):
+ error = self._getremoteerror()
+ if error:
raise error
def send(self, item):
@@ -117,17 +124,13 @@
possibly blocking if the sender queue is full.
Note that an item needs to be marshallable.
"""
- self._sendlock.acquire()
- try:
- if self.isclosed():
- raise IOError, "cannot send to %r" %(self,)
- if isinstance(item, Channel):
- data = Message.CHANNEL_NEW(self.id, item.id)
- else:
- data = Message.CHANNEL_DATA(self.id, item)
- self.gateway._outgoing.put(data)
- finally:
- self._sendlock.release()
+ if self.isclosed():
+ raise IOError, "cannot send to %r" %(self,)
+ if isinstance(item, Channel):
+ data = Message.CHANNEL_NEW(self.id, item.id)
+ else:
+ data = Message.CHANNEL_DATA(self.id, item)
+ self.gateway._outgoing.put(data)
def receive(self):
"""receives an item that was sent from the other side,
@@ -141,7 +144,7 @@
x = self._items.get()
if x is ENDMARKER:
self._items.put(x) # for other receivers
- raise self._getstickyerror()
+ raise self._getremoteerror() or EOFError()
else:
return x
@@ -192,55 +195,60 @@
#
# internal methods, called from the receiver thread
#
- def _closed(self, id):
+ def _no_longer_opened(self, id):
try:
- del self._callbacks[id]
+ del self._channels[id]
except KeyError:
pass
try:
- channel = self._channels.pop(id)
+ del self._callbacks[id]
except KeyError:
- channel = None
- return channel
+ pass
- def _local_close(self, id, stickyerror=None):
- channel = self._closed(id)
+ def _local_close(self, id, remoteerror=None):
+ channel = self._channels.get(id)
if channel is None:
- if isinstance(stickyerror, RemoteError):
- stickyerror.warn()
+ # channel already in "deleted" state
+ if remoteerror:
+ remoteerror.warn()
else:
- if isinstance(stickyerror, RemoteError):
- channel._stickyerror = stickyerror
- channel._closed = True
+ # state transition to "closed" state
+ if remoteerror:
+ channel._remoteerrors.append(remoteerror)
+ channel._closed = True # --> "closed"
channel._receiveclosed.set()
if channel._items is not None:
channel._items.put(ENDMARKER)
+ self._no_longer_opened(id)
def _local_last_message(self, id):
- channel = self._closed(id)
- if channel is not None:
+ channel = self._channels.get(id)
+ if channel is None:
+ # channel already in "deleted" state
+ pass
+ else:
+ # state transition: if "opened", change to "sendonly"
channel._receiveclosed.set()
if channel._items is not None:
channel._items.put(ENDMARKER)
+ self._no_longer_opened(id)
def _local_receive(self, id, data):
# executes in receiver thread
- try:
- callback = self._callbacks[id]
- except KeyError:
- try:
- channel = self._channels[id]
- except KeyError:
+ callback = self._callbacks.get(id)
+ if callback is not None:
+ callback(data) # even if channel may be already closed
+ else:
+ channel = self._channels.get(id)
+ if channel is None or channel._items is None:
pass # drop data
else:
channel._items.put(data)
- else:
- callback(data)
def _finished_receiving(self):
- self._callbacks.clear()
for id in self._channels.keys():
self._local_last_message(id)
+ self._callbacks.clear()
class ChannelFile:
More information about the pytest-commit
mailing list