[Python-ideas] Tulip / PEP 3156 - subprocess events

Nick Coghlan ncoghlan at gmail.com
Fri Jan 18 12:55:19 CET 2013


On Fri, Jan 18, 2013 at 7:33 PM, Paul Moore <p.f.moore at gmail.com> wrote:
> On 18 January 2013 08:38, Nick Coghlan <ncoghlan at gmail.com> wrote:
> I have now (finally!) got Guido's point that implementing a process
> protocol will give me a good insight into how this stuff is meant to
> work. I'm still struggling to understand why he thinks it needs a
> dedicated method on the event loop, rather than being a higher-level
> layer like you're suggesting, but I'm at least starting to understand
> what questions to ask.

The creation of the pipe transport needs to be on the event loop,
precisely because of cross-platform differences when it comes to
Windows. On *nix, on the other hand, the pipe transport should look an
awful lot like the socket transport and thus be able to use the
existing file descriptor based interfaces on the event loop.

The protocol part is then about adapting the transport API to
coroutine friendly readlines/writelines API (the part that Guido
points out needs more detail in
http://www.python.org/dev/peps/pep-3156/#coroutines-and-protocols)

As a rough untested sketch (the buffering here could likely be a lot smarter):

    # Remember we're not using preemptive threading, so we don't need
locking for thread safety
    # Note that the protocol isn't designed to support reconnection -
a new connection means
    # a new protocol instance. The create_* APIs on the event loop
accept a protocol factory
    # specifically in order to encourage this approach
    class SimpleStreamingProtocol:
        def __init__(self):
            self._transport = None
            self._data = bytearray()
            self._pending = None

        def connection_made(self, transport):
            self._transport = transport
        def connection_lost(self, exc):
            self._transport = None
            # Could also store the exc directly on the protocol and raise
            # it in subsequent write calls
            if self._pending is not None:
                self._pending.set_exception(exc)
        def received_eof(self):
            self.transport = None
            if self._pending is not None:
                self._pending.set_result(False)
        def received_data(self, data):
            self.data.extend(data)
            if self._pending is not None:
                self._pending.set_result(True)

        # The writing side is fairly easy, as we just pass it through
to the transport
        # These are all defined by PEP 3156 as non-blocking calls
        def write(self, data):
            if self._transport is None:
                raise RuntimeError("Connection not open")
            self._transport.write(data)
        def writelines(self, iterable):
            if self._transport is None:
                raise RuntimeError("Connection not open")
            self._transport.writelines(iterable)
        def close(self):
            if self._transport is not None:
                self._transport.close()
                self._transport = None

        def _read_from_buffer(self):
            data = bytes(self._data)
            self._data.clear()
            return data

        # The reading side has to adapt between coroutines and callbacks
        @coroutine
        def read(self):
            if self._transport is None:
                raise RuntimeError("Connection not open")
            if self._pending is not None:
                raise RuntimeError("Concurrent reads not permitted")
            # First check if we already have data waiting
            data = self._read_from_buffer()
            if data:
                return data
            # Otherwise wait for data
            # This method can easily be updated to use a loop and multiple
            # futures in order to support a "minimum read" parameter
            f = self._pending = tulip.Future()
            finished = yield from f
            data = b'' if finished else self._read_from_buffer()
            return data

        # This uses async iteration as described at [1]
        # We yield coroutines, which must then be invoked with yield from
        def readlines(self):
            cached_lines = self._data.split(b'\n')
            self._data.clear()
            if cached_lines[-1]: # Last line is incomplete
                self._data.extend(cached_lines[-1])
            del cached_lines[-1]
            while not finished:
                # When we already have the data, a simple future will do
                for line in cached_lines:
                    f = tulip.Future()
                    f.set_result(line)
                    yield f
                # Otherwise, we hand control to the event loop
                @coroutine
                def wait_for_line():
                    nonlocal finished
                    data = yield from self.read()
                    if not data:
                        finished = True
                        return b''
                    lines = data.split(b'\n')
                    if lines[-1]: # Last line is incomplete
                        self._data.extend(lines[-1])
                    cached_lines.extend(lines[1:-1])
                    return lines[0]
                yield wait_for_line()

    # Used as:
    pipe, stream = event_loop.create_pipe(SimpleStreamingProtocol)
    # Or even as:
    conn, stream = event_loop.create_connection(SimpleStreamingProtocol,
                                                ... # connection details)

    # Reading from the stream in a coroutine
    for f in stream.readlines():
        line = yield from f

[1] http://python-notes.boredomandlaziness.org/en/latest/pep_ideas/async_programming.html#asynchronous-iterators

Cheers,
Nick.

--
Nick Coghlan   |   ncoghlan at gmail.com   |   Brisbane, Australia



More information about the Python-ideas mailing list