|Title:||Asynchronous IO Support Rebooted|
|Last-Modified:||2013-05-22 13:22:52 -0700 (Wed, 22 May 2013)|
|Author:||Guido van Rossum <guido at python.org>|
- Event Loop Interface Specification
- Module Namespace
- Event Loop Policy: Getting and Setting the Current Event Loop
- Event Loop Classes
- Event Loop Methods Overview
- Specifying Times
- Required Event Loop Methods
- Optional Event Loop Methods
- Callback Sequencing
- Callback Style
- Coroutines and the Scheduler
- Open Issues
This is a proposal for asynchronous I/O in Python 3, starting with Python 3.3. Consider this the concrete proposal that is missing from PEP 3153. The proposal includes a pluggable event loop API, transport and protocol abstractions similar to those in Twisted, and a higher-level scheduler based on yield from (PEP 380). A reference implementation is in the works under the code name Tulip. The Tulip repo is linked from the References section at the end.
The event loop is the place where most interoperability occurs. It should be easy for (Python 3.3 ports of) frameworks like Twisted, Tornado, or even gevents to either adapt the default event loop implementation to their needs using a lightweight wrapper or proxy, or to replace the default event loop implementation with an adaptation of their own event loop implementation. (Some frameworks, like Twisted, have multiple event loop implementations. This should not be a problem since these all have the same interface.)
In most cases it should be possible for two different third-party frameworks to interoperate, either by sharing the default event loop implementation (each using its own adapter), or by sharing the event loop implementation of either framework. In the latter case two levels of adaptation would occur (from framework A's event loop to the standard event loop interface, and from there to framework B's event loop). Which event loop implementation is used should be under control of the main program (though a default policy for event loop selection is provided).
For this interoperability to be effective, the preferred direction of adaptation in third party frameworks is to keep the default event loop and adapt it to the framework's API. Ideally all third party frameworks would give up their own event loop implementation in favor of the standard implementation. But not all frameworks may be satisfied with the functionality provided by the standard implementation.
In order to support both directions of adaptation, two separate APIs are defined:
- getting and setting the current event loop object
- the interface of a conforming event loop and its minimum guarantees
An event loop implementation may provide additional methods and guarantees.
The event loop interface does not depend on yield from. Rather, it uses a combination of callbacks, additional interfaces (transports and protocols), and Futures. The latter are similar to those defined in PEP 3148, but have a different implementation and are not tied to threads. In particular, they have no wait() method; the user is expected to use callbacks.
For users (like myself) who don't like using callbacks, a scheduler is provided for writing asynchronous I/O code as coroutines using the PEP 380 yield from expressions. The scheduler is not pluggable; pluggability occurs at the event loop level, and the scheduler should work with any conforming event loop implementation.
For interoperability between code written using coroutines and other async frameworks, the scheduler has a Task class that behaves like a Future. A framework that interoperates at the event loop level can wait for a Future to complete by adding a callback to the Future. Likewise, the scheduler offers an operation to suspend a coroutine until a callback is called.
Limited interoperability with threads is provided by the event loop interface; there is an API to submit a function to an executor (see PEP 3148) which returns a Future that is compatible with the event loop.
For those not familiar with Twisted, a quick explanation of the difference between transports and protocols is in order. At the highest level, the transport is concerned with how bytes are transmitted, while the protocol determines which bytes to transmit (and to some extent when).
The most common type of transport is a bidirectional stream transport. It represents a pair of streams (one in each direction) that each transmit a sequence of bytes. The most common example of a bidirectional stream transport is probably a TCP connection. Another common example is an SSL connection. But there are some other things that can be viewed this way, for example an SSH session or a pair of UNIX pipes. Typically there aren't many different transport implementations, and most of them come with the event loop implementation. Note that transports don't need to use sockets, not even if they use TCP -- sockets are a platform-specific implementation detail.
A bidirectional stream transport has two "sides": one side talks to the network (or another process, or whatever low-level interface it wraps), and the other side talks to the protocol. The former uses whatever API is necessary to implement the transport; but the interface between transport and protocol is standardized by this PEP.
A protocol represents some kind of "application-level" protocol such as HTTP or SMTP. Its primary interface is with the transport. While some popular protocols will probably have a standard implementation, often applications implement custom protocols. It also makes sense to have libraries of useful 3rd party protocol implementations that can be downloaded and installed from pypi.python.org.
There general notion of transport and protocol includes other interfaces, where the transport wraps some other communication abstraction. Examples include interfaces for sending and receiving datagrams (e.g. UDP), or a subprocess manager. The separation of concerns is the same as for bidirectional stream transports and protocols, but the specific interface between transport and protocol is different in each case.
Details of the interfaces defined by the various standard types of transports and protocols are given later.
Python 3.3 is required for many of the proposed features. The reference implementation (Tulip) requires no new language or standard library features beyond Python 3.3, no third-party modules or packages, and no C code, except for the proactor-based event loop on Windows.
The specification here will live in a new toplevel package. Different components will live in separate submodules of that package. The package will import common APIs from their respective submodules and make them available as package attributes (similar to the way the email package works).
The name of the toplevel package is currently unspecified. The reference implementation uses the name 'tulip', but the name will change to something more boring if and when the implementation is moved into the standard library (hopefully for Python 3.4).
Until the boring name is chosen, this PEP will use 'tulip' as the toplevel package name. Classes and functions given without a module name are assumed to be accessed via the toplevel package.
Event loop management is controlled by an event loop policy, which is a global (per-process) state. There is a default policy, and an API to change the policy. The policy defines the notion of context; the default policy's notion of context is defined as the current thread.
Certain platforms or programming frameworks may change the default policy to something more suitable to the expectations of the users of that platform or framework. Such platforms or frameworks must document their policy and at what point during their initialization sequence the policy is set. in order to avoid undefined behavior when multiple active frameworks want to override the default policy.
An event loop policy may but does not have to enforce that there is only one event loop in existence. The default event loop policy does not enforce this, but it does enforce that there is only one event loop per thread (as far as get_event_loop() is concerned).
To get the current event loop, use get_event_loop(). This returns an event loop object implementing the interface specified below, or None in case no current event loop has been set and the current policy does not specify how to create one for the current context. It is expected that get_event_loop() returns a different object depending on the context, and the default policy will only create a default event loop in the main thread; in other threads an event loop must be explicitly set (but other policies may behave differently). Event loop creation is lazy; i.e. the first call to get_event_loop() creates an event loop instance if necessary and specified by the current policy.
To set the current event loop, use set_event_loop(event_loop), where event_loop is an event loop object. It is allowed to set the current event loop to None (although under the default policy, if the main thread's current event loop is set to None, and get_event_loop() is called subsequently, it will create a new event loop instance.
For the benefit of unit tests and other special cases there's a third policy function: new_event_loop(), which creates and returns a new event loop object according to the policy's default rules. To make this the current event loop, you must call set_event_loop().
To change the event loop policy, call set_event_loop_policy(policy), where policy is an event loop policy object or None. The policy object must be an object that has methods get_event_loop(), set_event_loop(loop) and new_event_loop(), all behaving like the functions described above. Passing a policy value of None restores the default event loop policy (overriding the alternate default set by the platform or framework). The default event loop policy is an instance of the class DefaultEventLoopPolicy. The current event loop policy object can be retrieved by calling get_event_loop_policy(). (TBD: Require inheriting from AbstractEventLoopPolicy?)
There is no actual class named EventLoop. There is an AbstractEventLoop class which defines all the methods without implementations, and serves primarily as documentation. The following concrete classes are defined:
- SelectorEventLoop is a concrete implementation of the full API based on the selectors module. (This module is part of Tulip, but not specified by this PEP. It is separately proposed for inclusion in the standard library.) The constructor takes one optional argument, a selectors.Selector object. By default an instance of selectors.DefaultSelector is created and used.
- ProactorEventLoop is a concrete implementation of the API except for the I/O event handling and signal handling methods. It is only defined on Windows (or on other platforms which support a similar API for "overlapped I/O"). The constructor takes one optional argument, a Proactor object. By default an instance of IocpProactor is created and used. (The IocpProactor class is not specified by this PEP. Its inclusion in the standard library is not currently under consideration; it is just an implementation detail of the ProactorEventLoop class.
The methods of a conforming event loop are grouped into several categories. A brief overview of the categories. The first set of categories must be supported by all conforming event loop implementations. (However, in some cases a partially-conforming implementation may choose not to implement the internet/socket methods, and still conform to the other methods.)
- Miscellaneous: close(), time().
- Starting and stopping: run_forever(), run_until_complete(), stop(), is_running().
- Basic callbacks: call_soon(), call_later(), call_at().
- Thread interaction: call_soon_threadsafe(), run_in_executor(), set_default_executor().
- Internet name lookups: getaddrinfo(), getnameinfo().
- Internet connections: create_connection(), start_serving(), stop_serving(), create_datagram_endpoint().
- Wrapped socket methods: sock_recv(), sock_sendall(), sock_connect(), sock_accept().
The second set of categories may be supported by conforming event loop implementations. If not supported, they will raise NotImplementedError. (In the current state of Tulip, SelectorEventLoop on UNIX systems supports all of these; SelectorEventLoop on Windows supports the I/O event handling category; ProactorEventLoop on Windows supports None. The intention is to add support for pipes and subprocesses on Windows as well, using the subprocess module in the standard library.)
- I/O callbacks: add_reader(), remove_reader(), add_writer(), remove_writer().
- Pipes and subprocesses: connect_read_pipe(), connect_write_pipe(), spawn_subprocess().
- Signal callbacks: add_signal_handler(), remove_signal_handler().
As usual in Python, all timeouts, intervals and delays are measured in seconds, and may be ints or floats. The accuracy and precision of the clock are up to the implementation; the default implementation uses time.monotonic(). Books could be written about the implications of this choice. Better read the docs for the stdandard library time module.
- close(). Closes the event loop, releasing any resources it may hold, such as the file descriptor used by epoll() or kqueue(). This should not be called while the event loop is running. After it has been called the event loop may not be used again. It may be called multiple times; subsequent calls are no-ops.
- time(). Returns the current time according to the event loop's clock. This may be time.time() or time.monotonic() or some other system-specific clock, but it must return a float expressing the time in units of approximately one second since some epoch. (No clock is perfect -- see PEP 418.)
An (unclosed) event loop can be in one of two states: running or stopped. These methods deal with starting and stopping an event loop:
- run_forever(). Runs the event loop until stop() is called. This cannot be called when the event loop is already running. (This has a long name in part to avoid confusion with earlier versions of this PEP, where run() had different behavior, in part because there are already too many APIs that have a method named run(), and in part because there shouldn't be many places where this is called anyway.)
- run_until_complete(future, timeout=None). Runs the event loop until the Future is done. If a timeout is given, it waits at most that long. If the Future is done, its result is returned, or its exception is raised; if the timeout expires before the Future is done, or if stop() is called, TimeoutError is raised (but the Future is not cancelled). This cannot be called when the event loop is already running.
- stop(). Stops the event loop as soon as it is convenient. It is fine to restart the loop with run_forever() or run_until_complete() subsequently; no scheduled callbacks will be lost if this is done. Note: stop() returns normally and the current callback is allowed to continue. How soon after this point the event loop stops is up to the implementation, but the intention is to stop short of polling for I/O, and not to run any callbacks scheduled in the future; the major freedom an implementation has is how much of the "ready queue" (callbacks already scheduled with call_soon()) it processes before stopping.
- is_running(). Returns True if the event loop is currently running, False if it is stopped. (TBD: Do we need another inquiry method to tell whether the loop is in the process of stopping?)
- call_soon(callback, *args). This schedules a callback to be called as soon as possible. Returns a Handle representing the callback, whose cancel() method can be used to cancel the callback. It guarantees that callbacks are called in the order in which they were scheduled.
- call_later(delay, callback, *args). Arrange for callback(*args) to be called approximately delay seconds in the future, once, unless cancelled. Returns a Handle representing the callback, whose cancel() method can be used to cancel the callback. Callbacks scheduled in the past or at exactly the same time will be called in an undefined order.
- call_at(when, callback, *args). This is like call_later(), but the time is expressed as an absolute time. Returns a similar Handle. There is a simple equivalency: loop.call_later(delay, callback, *args) is the same as loop.call_at(loop.time() + delay, callback, *args).
Note: A previous version of this PEP defined a method named call_repeatedly(), which promised to call a callback at regular intervals. This has been withdrawn because the design of such a function is overspecified. On the one hand, a simple timer loop can easily be emulated using a callback that reschedules itself using call_later(); it is also easy to write coroutine containing a loop and a sleep() call (a toplevel function in the module, see below). On the other hand, due to the complexities of accurate timekeeping there are many traps and pitfalls here for the unaware (see PEP 418), and different use cases require different behavior in edge cases. It is impossible to offer an API for this purpose that is bullet-proof in all cases, so it is deemed better to let application designers decide for themselves what kind of timer loop to implement.
- call_soon_threadsafe(callback, *args). Like call_soon(callback, *args), but when called from another thread while the event loop is blocked waiting for I/O, unblocks the event loop. This is the only method that is safe to call from another thread. (To schedule a callback for a later time in a threadsafe manner, you can use loop.call_soon_threadsafe(loop.call_later, when, callback, *args).) Note: this is not safe to call from a signal handler (since it may use locks). In fact, no API is signal-safe; if you want to handle signals, use add_signal_handler() described below.
- run_in_executor(executor, callback, *args). Arrange to call callback(*args) in an executor (see PEP 3148). Returns a Future whose result on success is the return value of that call. This is equivalent to wrap_future(executor.submit(callback, *args)). If executor is None, the default executor set by set_default_executor() is used. If no default executor has been set yet, a ThreadPoolExecutor with 5 threads is created and set as the default executor.
- set_default_executor(executor). Set the default executor used by run_in_executor(). The argument must be a PEP 3148 Executor instance or None, in order to reset the default executor.
See also the wrap_future() function described in the section about Futures.
These methods are useful if you want to connect or bind a socket to an address without the risk of blocking for the name lookup. They are usually called implicitly by create_connection(), start_serving() or create_datagram_endpoint().
getaddrinfo(host, port, family=0, type=0, proto=0, flags=0). Similar to the socket.getaddrinfo() function but returns a Future. The Future's result on success will be a list of the same format as returned by socket.getaddrinfo(), i.e. a list of (address_family, socket_type, socket_protocol, canonical_name, address) where address is a 2-tuple (ipv4_address, port) for IPv4 addresses and a 4-tuple (ipv4_address, port, flow_info, scope_id) for IPv6 addresses. If the family argument is zero or unspecified, the list returned may contain a mixture of IPv4 and IPv6 addresses; otherwise the addresses returned are constrained by the family value (similar for proto and flags). The default implementation calls socket.getaddrinfo() using run_in_executor(), but other implementations may choose to implement their own DNS lookup. The optional arguments must be specified as keyword arguments.
Note: implementations are allowed to implement a subset of the full socket.getaddrinfo() interface; e.g. they may not support symbolic port names, or they may ignore or incompletely implement the type, proto and flags arguments. However, if type and proto are ignored, the argument values passed in should be copied unchanged into the return tuples' socket_type and socket_protocol elements. (You can't ignore family, since IPv4 and IPv6 addresses must be looked up differently. The only permissible values for family are socket.AF_UNSPEC (0), socket.AF_INET and socket.AF_INET6, and the latter only if it is defined by the platform.)
getnameinfo(sockaddr, flags=0). Similar to socket.getnameinfo() but returns a Future. The Future's result on success will be a tuple (host, port). Same implementation remarks as for getaddrinfo().
These are the high-level interfaces for managing internet connections. Their use is recommended over the corresponding lower-level interfaces because they abstract away the differences between selector-based and proactor-based event loops.
Note that the client and server side of stream connections use the same transport and protocol interface. However, datagram endpoints use a different transport and protocol interface.
create_connection(protocol_factory, host, port, **kwargs). Creates a stream connection to a given internet host and port. This is a task that is typically called from the client side of the connection. It creates an implementation-dependent (bidirectional stream) Transport to represent the connection, then calls protocol_factory() to instantiate (or retrieve) the user's Protocol implementation, and finally ties the two together. (See below for the definitions of Transport and Protocol.) The user's Protocol implementation is created or retrieved by calling protocol_factory() without arguments(*). The coroutine's result on success is the (transport, protocol) pair; if a failure prevents the creation of a successful connection, an appropriate exception will be raised. Note that when the coroutine completes, the protocol's connection_made() method has not yet been called; that will happen when the connection handshake is complete.
(*) There is no requirement that protocol_factory is a class. If your protocol class needs to have specific arguments passed to its constructor, you can use lambda or functools.partial(). You can also pass a trivial lambda that returns a previously constructed Protocol instance.
Optional keyword arguments:
- ssl: Pass True to create an SSL transport (by default a plain TCP transport is created). Or pass an ssl.SSLContext object to override the default SSL context object to be used.
- family, proto, flags: Address family, protocol and flags to be passed through to getaddrinfo(). These all default to 0, which means "not specified". (The socket type is always SOCK_STREAM.) If any of these values are not specified, the getaddrinfo() method will choose appropriate values. Note: proto has nothing to do with the high-level Protocol concept or the protocol_factory argument.
- sock: An optional socket to be used instead of using the host, port, family, proto, and flags arguments. If this is given, host and port must be omitted; otherwise, host and port are required.
- local_addr: If given, a (host, port) tuple used to bind the socket to locally. This is rarely needed but on multi-homed servers you occasionally need to force a connection to come from a specific address. This is how you would do that. The host and port are looked up using getaddrinfo().
start_serving(protocol_factory, host, port, **kwds). Enters a serving loop that accepts connections. This is a Task that completes once the serving loop is set up to serve. The return value is a list of one or more sockets in listening mode. (Multiple sockets may be returned if the specified address allows both IPv4 and IPv6 connections.) You can use stop_serving() to stop the serving loop. Each time a connection is accepted, protocol_factory is called without arguments(*) to create a Protocol, a (bidirectional stream) Transport is created to represent the network side of the connection, and the two are tied together by calling protocol.connection_made(transport).
(*) See footnote above for create_connection(). However, since protocol_factory() is called once for each new incoming connection, it should return a new Protocol object each time it is called.
Optional keyword arguments:
ssl: Pass an ssl.SSLContext object to override the default SSL context object to be used. (Unlike create_connection(), passing True does not make sense -- the SSLContext object is required to specify the certificate and key.)
backlog: Backlog value to be passed to the listen() call. Defaults to 100.
reuse_address: Whether to set the SO_REUSEADDR option on the socket. The default is True on UNIX, False on Windows.
- family, flags: Address family and flags to be passed
through to getaddrinfo(). The family defaults to AF_UNSPEC; the flags default to AI_PASSIVE. (The socket type is always SOCK_STREAM; the socket protocol always set to 0, to let getaddrinfo() choose.)
sock: An optional socket to be used instead of using the host, port, family, and flags arguments. If this is given, host and port must be omitted; otherwise, host and port are required. The return value will be the one-element list [sock].
stop_serving(sock). The argument should be a socket from the list returned by start_serving(). The serving loop associated with that socket will be stopped. Connections that have already been accepted will not be affected. (TBD: What if start_serving() doesn't use sockets? Then it should probably return a list of opaque objects that can be passed to stop_serving().)
create_datagram_endpoint(protocol_factory, local_addr, remote_addr, **kwds). Creates an endpoint for sending and receiving datagrams (typically UDP packets). Because of the nature of datagram traffic, there are no separate calls to set up client and server side, since usually a single endpoint acts as both client and server. This is a coroutine that returns a (transport, protocol) pair on success, or raises an exception on failure. If the coroutine returns successfully, the transport will call callbacks on the protocol whenever a datagram is received or the socket is closed; it is up to the protocol to call methods on the protocol to send datagrams. Note that the transport and protocol interfaces used here are different than those for stream connections.
- protocol_factory: A class or factory function that will be called, without arguments, to construct the protocol object to be returned. The interface between datagram transport and protocol is described below.
- local_addr: An optional tuple indicating the address to which the socket will be bound. If given this must be a (host, port) pair. It will be passed to getaddrinfo() to be resolved and the result will be passed to the bind() method of the socket created. If getaddrinfo() returns more than one address, they will be tried in turn. If omitted, no bind() call will be made.
- remote_addr: An optional tuple indicating the address to which the socket will be "connected". (Since there is no such thing as a datagram connection, this just specifies a default value for the destination address of outgoing datagrams.) If given this must be a (host, port) pair. It will be passed to getaddrinfo() to be resolved and the result will be passed to sock_connect() together with the socket created. If getaddrinfo() returns more than one address, they will be tried in turn. If omitted, no sock_connect() will be made.
- family, proto, flags: Address family, protocol and flags to be passed through to getaddrinfo(). These all default to 0, which means "not specified". (The socket type is always SOCK_DGRAM.) If any of these values are not specified, the getaddrinfo() method will choose appropriate values.
Note that if both local_addr and remote_addr are present, all combinations of local and remote addresses with matching address family will be tried.
The following methods for doing async I/O on sockets are not for general use. They are primarily meant for transport implementations working with IOCP through the ProactorEventLoop class. However, they are easily implementable for other event loop types, so there is no reason not to require them. The socket argument has to be a non-blocking socket.
- sock_recv(sock, n). Receive up to n bytes from socket sock. Returns a Future whose result on success will be a bytes object.
- sock_sendall(sock, data). Send bytes data to socket sock. Returns a Future whose result on success will be None. Note: the name uses sendall instead of send, to reflect that the semantics and signature of this method echo those of the standard library socket method sendall() rather than send(). (TBD: but maybe it would be better to emulate send() after all? That would be better for datagram sockets.)
- sock_connect(sock, address). Connect to the given address. Returns a Future whose result on success will be None.
- sock_accept(sock). Accept a connection from a socket. The socket must be in listening mode and bound to an address. Returns a Future whose result on success will be a tuple (conn, peer) where conn is a connected non-blocking socket and peer is the peer address.
These methods are primarily meant for transport implementations working with a selector. They are implemented by SelectorEventLoop but not by ProactorEventLoop. Custom event loop implementations may or may not implement them.
The fd arguments below may be integer file descriptors, or "file-like" objects with a fileno() method that wrap integer file descriptors. Not all file-like objects or file descriptors are acceptable. Sockets (and socket file descriptors) are always accepted. On Windows no other types are supported. On UNIX, pipes and possibly tty devices are also supported, but disk files are not. Exactly which special file types are supported may vary by platform and per selector implementation. (Experimentally, there is at least one kind of pseudo-tty on OSX that is supported by select and poll but not by kqueue: it is used by Emacs shell windows.)
- add_reader(fd, callback, *args). Arrange for callback(*args) to be called whenever file descriptor fd is deemed ready for reading. Calling add_reader() again for the same file descriptor implies a call to remove_reader() for the same file descriptor.
- add_writer(fd, callback, *args). Like add_reader(), but registers the callback for writing instead of for reading.
- remove_reader(fd). Cancels the current read callback for file descriptor fd, if one is set. If no callback is currently set for the file descriptor, this is a no-op and returns False. Otherwise, it removes the callback arrangement and returns True.
- remove_writer(fd). This is to add_writer() as remove_reader() is to add_reader().
TBD: Should these really take stream objects? The stream objects are not useful for reading or writing because they would cause blocking I/O. This section of the API is clearly not yet ready for review.
- connect_read_pipe(protocol_factory, pipe): Create a unidrectional stream connection from a file-like object wrapping the read end of a UNIX pipe. The protocol/transport interface is the read half of the bidirectional stream interface.
- connect_write_pipe(protocol_factory, pipe): Create a unidrectional stream connection from a file-like object wrapping the write end of a UNIX pipe. The protocol/transport interface is the write half of the bidirectional stream interface.
- TBD: A way to run a subprocess with stdin, stdout and stderr connected to pipe transports. (This is being designed but not yet ready.)
TBD: offer the same interface on Windows for e.g. named pipes. (This should be possible given that the standard library subprocess module is supported on Windows.)
- add_signal_handler(sig, callback, *args). Whenever signal ``sig is received, arrange for callback(*args) to be called. Specifying another callback for the same signal replaces the previous handler (only one handler can be active per signal). The sig must be a valid sigal number defined in the signal module. If the signal cannot be handled this raises an exception: ValueError if it is not a valid signal or if it is an uncatchable signale (e.g. SIGKILL), RuntimeError if this particular event loop instance cannot handle signals (since signals are global per process, only an event loop associated with the main thread can handle signals).
- remove_signal_handler(sig). Removes the handler for signal sig, if one is set. Raises the same exceptions as add_signal_handler() (except that it may return False instead raising RuntimeError for uncatchable signals). Returns True if a handler was removed successfully, False if no handler was set.
Note: If these methods are statically known to be unsupported, they may return NotImplementedError instead of RuntimeError.
When two callbacks are scheduled for the same time, they are run in the order in which they are registered. For example:
guarantees that foo() is called before bar().
If call_soon() is used, this guarantee is true even if the system clock were to run backwards. This is also the case for call_later(0, callback, *args). However, if call_later() is used with a nonzero delay, all bets are off if the system clock were to runs backwards. (A good event loop implementation should use time.monotonic() to avoid problems when the clock runs backward. See PEP 418.)
All event loops have a notion of context. For the default event loop implementation, the context is a thread. An event loop implementation should run all callbacks in the same context. An event loop implementation should run only one callback at a time, so callbacks can assume automatic mutual exclusion with other callbacks scheduled in the same event loop.
There are two categories of exceptions in Python: those that derive from the Exception class and those that derive from BaseException. Exceptions deriving from Exception will generally be caught and handled appropriately; for example, they will be passed through by Futures, and they will be logged and ignored when they occur in a callback.
However, exceptions deriving only from BaseException are never caught, and will usually cause the program to terminate with a traceback. (Examples of this category include KeyboardInterrupt and SystemExit; it is usually unwise to treat these the same as most other exceptions.)
The various methods for registering one-off callbacks (call_soon(), call_later(), call_at() and call_soon_threadsafe()) all return an object representing the registration that can be used to cancel the callback. This object is called a Handle (although its class name is not necessarily Handle). Handles are opaque and have only one public method:
- cancel(). Cancel the callback.
The tulip.Future class here is intentionally similar to the concurrent.futures.Future class specified by PEP 3148, but there are slight differences. Whenever this PEP talks about Futures or futures this should be understood to refer to tulip.Future unless concurrent.futures.Future is explicitly mentioned. The supported public API is as follows, indicating the differences with PEP 3148:
- cancel(). If the Future is already done (or cancelled), return False. Otherwise, change the Future's state to cancelled (this implies done), schedule the callbacks, and return True.
- cancelled(). Returns True if the Future was cancelled.
- running(). Always returns False. Difference with PEP 3148: there is no "running" state.
- done(). Returns True if the Future is done. Note that a cancelled Future is considered done too (here and everywhere).
- result(). Returns the result set with set_result(), or raises the exception set with set_exception(). Raises CancelledError if cancelled. Difference with PEP 3148: This has no timeout argument and does not wait; if the future is not yet done, it raises an exception.
- exception(). Returns the exception if set with set_exception(), or None if a result was set with set_result(). Raises CancelledError if cancelled. Difference with PEP 3148: This has no timeout argument and does not wait; if the future is not yet done, it raises an exception.
- add_done_callback(fn). Add a callback to be run when the Future becomes done (or is cancelled). If the Future is already done (or cancelled), schedules the callback to using call_soon(). Difference with PEP 3148: The callback is never called immediately, and always in the context of the caller. (Typically, a context is a thread.) You can think of this as calling the callback through call_soon(). Note that in order to match PEP 3148, the callback (unlike all other callbacks defined in this PEP, and ignoring the convention from the section "Callback Style" below) is always called with a single argument, the Future object.
- set_result(result). The Future must not be done (nor cancelled) already. This makes the Future done and schedules the callbacks. Difference with PEP 3148: This is a public API.
- set_exception(exception). The Future must not be done (nor cancelled) already. This makes the Future done and schedules the callbacks. Difference with PEP 3148: This is a public API.
The internal method set_running_or_notify_cancel() is not supported; there is no way to set the running state.
The following exceptions are defined:
- InvalidStateError. Raised whenever the Future is not in a state acceptable to the method being called (e.g. calling set_result() on a Future that is already done, or calling result() on a Future that is not yet done).
- InvalidTimeoutError. Raised by result() and exception() when a nonzero timeout argument is given.
- CancelledError. An alias for concurrent.futures.CancelledError. Raised when result() or exception() is called on a Future that is cancelled.
- TimeoutError. An alias for concurrent.futures.TimeoutError. May be raised by run_until_complete().
A Future is associated with the default event loop when it is created. (TBD: Optionally pass in an alternative event loop instance?)
A tulip.Future object is not acceptable to the wait() and as_completed() functions in the concurrent.futures package. However, there are similar APIs tulip.wait() and tulip.as_completed(), described below.
A tulip.Future object is acceptable to a yield from expression when used in a coroutine. This is implemented through the __iter__() interface on the Future. See the section "Coroutines and the Scheduler" below.
When a Future is garbage-collected, if it has an associated exception but neither result() nor exception() nor __iter__() has ever been called (or the latter hasn't raised the exception yet -- details TBD), the exception should be logged. TBD: At what level?
In the future (pun intended) we may unify tulip.Future and concurrent.futures.Future, e.g. by adding an __iter__() method to the latter that works with yield from. To prevent accidentally blocking the event loop by calling e.g. result() on a Future that's not done yet, the blocking operation may detect that an event loop is active in the current thread and raise an exception instead. However the current PEP strives to have no dependencies beyond Python 3.3, so changes to concurrent.futures.Future are off the table for now.
There are some public functions related to Futures:
- async(arg). This takes an argument that is either a coroutine object or a Future (i.e., anything you can use with yield from) and returns a Future. If the argument is a Future, it is returned unchanged; if it is a coroutine object, it wraps it in a Task (remember that Task is a subclass of Future).
- wrap_future(future). This takes a PEP 3148 Future (i.e., an instance of concurrent.futures.Future) and returns a Future compatible with the event loop (i.e., a tulip.Future instance).
Transports and protocols are strongly influenced by Twisted and PEP 3153. Users rarely implement or instantiate transports -- rather, event loops offer utility methods to set up transports.
Transports work in conjunction with protocols. Protocols are typically written without knowing or caring about the exact type of transport used, and transports can be used with a wide variety of protocols. For example, an HTTP client protocol implementation may be used with either a plain socket transport or an SSL transport. The plain socket transport can be used with many different protocols besides HTTP (e.g. SMTP, IMAP, POP, FTP, IRC, SPDY).
The most common type of transport is a bidirectional stream transport. There are also unidirectional stream transports (used for pipes) and datagram transports (used by the create_datagram_endpoint() method).
- get_extra_info(name, default=None). This is a catch-all method that returns implementation-specific information about a transport. The first argument is the name of the extra field to be retrieved. The optional second argument is a default value to be returned. Consult the implementation documentation to find out the supported extra field names. For an unsupported name, the default is always returned.
A bidrectional stream transport is an abstraction on top of a socket or something similar (for example, a pair of UNIX pipes or an SSL connection).
Most connections have an asymmetric nature: the client and server usually have very different roles and behaviors. Hence, the interface between transport and protocol is also asymmetric. From the protocol's point of view, writing data is done by calling the write() method on the transport object; this buffers the data and returns immediately. However, the transport takes a more active role in reading data: whenever some data is read from the socket (or other data source), the transport calls the protocol's data_received() method.
Nevertheless, the interface between transport and protocol used by bidirectional streams is the same for clients as it is for servers, since the connection between a client and a server is essentially a pair of streams, one in each direction.
Bidirectional stream transports have the following public methods:
write(data). Write some bytes. The argument must be a bytes object. Returns None. The transport is free to buffer the bytes, but it must eventually cause the bytes to be transferred to the entity at the other end, and it must maintain stream behavior. That is, t.write(b'abc'); t.write(b'def') is equivalent to t.write(b'abcdef'), as well as to:
t.write(b'a') t.write(b'b') t.write(b'c') t.write(b'd') t.write(b'e') t.write(b'f')
writelines(iterable). Equivalent to:
for data in iterable: self.write(data)
write_eof(). Close the writing end of the connection. Subsequent calls to write() are not allowed. Once all buffered data is transferred, the transport signals to the other end that no more data will be received. Some protocols don't support this operation; in that case, calling write_eof() will raise an exception. (Note: This used to be called half_close(), but unless you already know what it is for, that name doesn't indicate which end is closed.)
can_write_eof(). Return True if the protocol supports write_eof(), False if it does not. (This method typically returns a fixed value that depends only on the specific Transport class, not on the state of the Transport object. It is needed because some protocols need to change their behavior when write_eof() is unavailable. For example, in HTTP, to send data whose size is not known ahead of time, the end of the data is typically indicated using write_eof(); however, SSL does not support this, and an HTTP protocol implementation would have to use the "chunked" transfer encoding in this case. But if the data size is known ahead of time, the best approach in both cases is to use the Content-Length header.)
pause(). Suspend delivery of data to the protocol until a subsequent resume() call. Between pause() and resume(), the protocol's data_received() method will not be called. This has no effect on write().
resume(). Restart delivery of data to the protocol via data_received().
pause_writing(). Suspend sending data to the network until a subsequent resume_writing() call. Between pause_writing() and resume_writing() the transport's write() method will just be accumulating data in an internal buffer.
resume_writing(). Restart sending data to the network.
discard_output(). Discard all data buffered by write() but not yet sent to the network.
close(). Sever the connection with the entity at the other end. Any data buffered by write() will (eventually) be transferred before the connection is actually closed. The protocol's data_received() method will not be called again. Once all buffered data has been flushed, the protocol's connection_lost() method will be called with None as the argument. Note that this method does not wait for all that to happen.
abort(). Immediately sever the connection. Any data still buffered by the transport is thrown away. Soon, the protocol's connection_lost() method will be called with None as argument. (TBD: Distinguish in the connection_lost() argument between close(), abort() or a close initated by the other end? Or add a transport method to inquire about this? Glyph's proposal was to pass different exceptions for this purpose.)
TBD: Provide flow control the other way -- the transport may need to suspend the protocol if the amount of data buffered becomes a burden. Proposal: let the transport call protocol.pause() and protocol.resume() if they exist; if they don't exist, the protocol doesn't support flow control. (Perhaps different names to avoid confusion between protocols and transports?)
A writing stream transport supports the write(), writelines(), write_eof(), can_write_eof(), close() and abort() methods described for bidrectional stream transports.
A reading stream transport supports the pause(), resume() and close() methods described for bidrectional stream transports.
A writing stream transport calls only connection_made() and connection_lost() on its associated protocol.
A reading stream transport can call all protocol methods specified in the Protocols section below (i.e., the previous two plus data_received() and eof_received()).
Datagram transports have these methods:
- sendto(data, addr=None). Sends a datagram (a bytes object). The optional second argument is the destination address. If omitted, remote_addr must have been specified in the create_datagram_endpoint() call that created this transport. If present, and remote_addr was specified, they must match. The (data, addr) pair may be sent immediately or buffered. The return value is None.
- abort(). Immediately close the transport. Buffered data will be discarded.
- close(). Close the transport. Buffered data will be transmitted asynchronously.
Datagram transports call the following methods on the associated protocol object: connection_made(), connection_lost(), connection_refused(), and datagram_received(). ("Connection" in these method names is a slight misnomer, but the concepts still exist: connection_made() means the transport representing the endpoint has been created, and connection_lost() means the transport is closed. The connection_refused() method is called before connection_lost() when remote_addr was given and an explicit negative acknowledgement was received (this is a UDP feature). (TBD: Do we need the latter? It seems easy enough to implement this in the protocol if it needs to make the distinction.)
Protocols are always used in conjunction with transports. While a few common protocols are provided (e.g. decent though not necessarily excellent HTTP client and server implementations), most protocols will be implemented by user code or third-party libraries.
Like for transports, we distinguish between stream protocols, datagram protocols, and perhaps other custom protocols. The most common type of protocol is a bidirectional stream protocol. (There are no unidirectional protocols.)
(TBD: should protocol callbacks be allowed to be coroutines?)
A (bidirectional) stream protocol must implement the following methods, which will be called by the transport. Think of these as callbacks that are always called by the event loop in the right context. (See the "Context" section way above.)
connection_made(transport). Indicates that the transport is ready and connected to the entity at the other end. The protocol should probably save the transport reference as an instance variable (so it can call its write() and other methods later), and may write an initial greeting or request at this point.
data_received(data). The transport has read some bytes from the connection. The argument is always a non-empty bytes object. There are no guarantees about the minimum or maximum size of the data passed along this way. p.data_received(b'abcdef') should be treated exactly equivalent to:
eof_received(). This is called when the other end called write_eof() (or something equivalent). The default implementation calls close() on the transport, which causes connection_lost() to be called (eventually) on the protocol.
connection_lost(exc). The transport has been closed or aborted, has detected that the other end has closed the connection cleanly, or has encountered an unexpected error. In the first three cases the argument is None; for an unexpected error, the argument is the exception that caused the transport to give up. (TBD: Do we need to distinguish between the first three cases?)
Here is a chart indicating the order and multiplicity of calls:
- connection_made() -- exactly once
- data_received() -- zero or more times
- eof_received() -- at most once
- connection_lost() -- exactly once
TBD: Discuss whether user code needs to do anything to make sure that protocol and transport aren't garbage-collected prematurely.
Datagram protocols have connection_made() and connection_lost() methods with the same signatures as stream protocols. (As explained in the section about datagram transports, we prefer the slightly odd nomenclature over defining different method names to indicating the opening and closing of the socket.)
In addition, they have the following methods:
- datagram_received(data, addr). Indicates that a datagram data (a bytes objects) was received from remote address addr (an IPv4 2-tuple or an IPv6 4-tuple).
- connection_refused(exc). Indicates that a send or receive operation raised a ConnectionRefused exception. This typically indicates that a negative acknowledgment was received for a previously sent datagram (not for the datagram that was being sent, if the exception was raised by a send operation). Immediately after this the socket will be closed and connection_lost() will be called with the same exception argument.
Here is a chart indicating the order and multiplicity of calls:
- connection_made() -- exactly once
- datagram_received() -- zero or more times
- connection_refused() -- at most once
- connection_lost() -- exactly once
Most interfaces taking a callback also take positional arguments. For instance, to arrange for foo("abc", 42) to be called soon, you call loop.call_soon(foo, "abc", 42). To schedule the call foo(), use loop.call_soon(foo). This convention greatly reduces the number of small lambdas required in typical callback programming.
This convention specifically does not support keyword arguments. Keyword arguments are used to pass optional extra information about the callback. This allows graceful evolution of the API without having to worry about whether a keyword might be significant to a callee somewhere. If you have a callback that must be called with a keyword argument, you can use a lambda or functools.partial. For example:
loop.call_soon(functools.partial(foo, "abc", repeat=42))
This is a separate toplevel section because its status is different from the event loop interface. Usage of coroutines is optional, and it is perfectly fine to write code using callbacks only. On the other hand, there is only one implementation of the scheduler/coroutine API, and if you're using coroutines, that's the one you're using.
A coroutine is a generator that follows certain conventions. For documentation purposes, all coroutines should be decorated with @tulip.coroutine, but this cannot be strictly enforced.
Coroutines use the yield from syntax introduced in PEP 380, instead of the original yield syntax.
The word "coroutine", like the word "generator", is used for two different (though related) concepts:
- The function that defines a coroutine (a function definition decorated with tulip.coroutine). If disambiguation is needed we will call this a coroutine function.
- The object obtained by calling a coroutine function. This object represents a computation or an I/O operation (usually a combination) that will complete eventually. If disambiguation is needed we will call it a coroutine object.
Things a coroutine can do:
- result = yield from future -- suspends the coroutine until the future is done, then returns the future's result, or raises an exception, which will be propagated. (If the future is cancelled, it will raise a CancelledError exception.) Note that tasks are futures, and everything said about futures also applies to tasks.
- result = yield from coroutine -- wait for another coroutine to produce a result (or raise an exception, which will be propagated). The coroutine expression must be a call to another coroutine.
- return expression -- produce a result to the coroutine that is waiting for this one using yield from.
- raise exception -- raise an exception in the coroutine that is waiting for this one using yield from.
Calling a coroutine does not start its code running -- it is just a generator, and the coroutine object returned by the call is really a generator object, which doesn't do anything until you iterate over it. In the case of a coroutine object, there are two basic ways to start it running: call yield from coroutine from another coroutine (assuming the other coroutine is already running!), or convert it to a Task (see below).
Coroutines (and tasks) can only run when the event loop is running.
To wait for multiple coroutines or Futures, two APIs similar to the wait() and as_completed() APIs in the concurrent.futures package are provided:
tulip.wait(fs, timeout=None, return_when=ALL_COMPLETED). This is a coroutine that waits for the Futures or coroutines given by fs to complete. Coroutine arguments will be wrapped in Tasks (see below). This returns a Future whose result on success is a tuple of two sets of Futures, (done, pending), where done is the set of original Futures (or wrapped coroutines) that are done (or cancelled), and pending is the rest, i.e. those that are still not done (nor cancelled). Note that with the defaults for timeout and return_when, done will always be an empty list. Optional arguments timeout and return_when have the same meaning and defaults as for concurrent.futures.wait(): timeout, if not None, specifies a timeout for the overall operation; return_when, specifies when to stop. The constants FIRST_COMPLETED, FIRST_EXCEPTION, ALL_COMPLETED are defined with the same values and the same meanings as in PEP 3148:
- ALL_COMPLETED (default): Wait until all Futures are done or completed (or until the timeout occurs).
- FIRST_COMPLETED: Wait until at least one Future is done or cancelled (or until the timeout occurs).
- FIRST_EXCEPTION: Wait until at least one Future is done (not cancelled) with an exception set. (The exclusion of cancelled Futures from the filter is surprising, but PEP 3148 does it this way.)
tulip.as_completed(fs, timeout=None). Returns an iterator whose values are Futures; waiting for successive values waits until the next Future or coroutine from the set fs completes, and returns its result (or raises its exception). The optional argument timeout has the same meaning and default as it does for concurrent.futures.wait(): when the timeout occurs, the next Future returned by the iterator will raise TimeoutError when waited for. Example of use:
for f in as_completed(fs): result = yield from f # May raise an exception. # Use result.
Note: if you do not wait for the futures as they are produced by the iterator, your for loop may not make progress (since you are not allowing other tasks to run).
The coroutine sleep(delay) returns after a given time delay.
(TBD: Should the optional second argument, result, be part of the spec?)
A Task is an object that manages an independently running coroutine. The Task interface is the same as the Future interface, and in fact Task is a subclass of Future. The task becomes done when its coroutine returns or raises an exception; if it returns a result, that becomes the task's result, if it raises an exception, that becomes the task's exception.
Cancelling a task that's not done yet prevents its coroutine from completing. In this case a CancelledError exception is thrown into the coroutine, which it may catch to propagate cancellation to other Futures. If the exception is not caught, the generator will be properly finalized anyway, as described in PEP 342.
Tasks are also useful for interoperating between coroutines and callback-based frameworks like Twisted. After converting a coroutine into a Task, callbacks can be added to the Task.
There are two ways to convert a coroutine into a task: explicitly, by calling the coroutine function and then passing the resulting coroutine object to the tulip.Task() constructor; or implicitly, by decorating the coroutine with @tulip.task (instead of @tulip.coroutine).
You may ask, why not automatically convert all coroutines to Tasks? The @tulip.coroutine decorator could do this. However, this would slow things down considerably in the case where one coroutine calls another (and so on), as switching to a "bare" coroutine has much less overhead than switching to a Task.
The scheduler has no public interface. You interact with it by using yield from future and yield from task. In fact, there is no single object representing the scheduler -- its behavior is implemented by the Task and Future classes using only the public interface of the event loop, so it will work with third-party event loop implementations, too.
The best way to use coroutines to implement an Internet protocol such as FTP is probably to use a streaming buffer that gets filled by data_received() and can be read asynchronously using methods like read(n) and readline() that are coroutines or return a Future. When the connection is closed, read() should eventually produce b'', or raise an exception if connection_closed() is called with an exception.
To write a response, the write() method (and friends) on the transport can be used -- these do not return Futures. A standard protocol implementation should be provided that sets this up and kicks off the coroutine when connection_made() is called.
- A fuller public API for Handle? What's the use case?
- Should we require all event loops to implement sock_recv() and friends? Is the use case strong enough? (Maybe some transports usable with both SelectorEventLoop and ProactorEventLoop use them? That'd be a good enough use case for me.)
- Should we require callers of create_connection() to create and pass an SSLContext explicitly, instead of allowing ssl=True? (For start_serving() we already require an SSLContext.)
- A debugging API? E.g. something that logs a lot of stuff, or logs unusual conditions (like queues filling up faster than they drain) or even callbacks taking too much time...
- Do we need introspection APIs? E.g. asking for the read callback given a file descriptor. Or when the next scheduled call is. Or the list of file descriptors registered with callbacks. Right now these would all require using Tulip internals.
- Locks and queues? The Tulip implementation contains implementations of most types of locks and queues modeled after the standard library threading and queue modules. Should we incorporate these in the PEP?
- Probably need more socket I/O methods, e.g. sock_sendto() and sock_recvfrom(), and perhaps others like pipe_read(). Or users can write their own (it's not rocket science).
- We may need APIs to control various timeouts. E.g. we may want to limit the time spent in DNS resolution, connecting, ssl handshake, idle connection, close/shutdown, even per session. Possibly it's sufficient to add timeout keyword arguments to some methods, and other timeouts can probably be implemented by clever use of call_later() and Task.cancel(). But it's possible that some operations need default timeouts, and we may want to change the default for a specific operation globally (i.e., per event loop).
- PEP 380 describes the semantics of yield from.
- Greg Ewing's yield from tutorials: http://www.cosc.canterbury.ac.nz/greg.ewing/python/yield-from/yield_from.html
- PEP 3148 describes concurrent.futures.Future.
- PEP 3153, while rejected, has a good write-up explaining the need to separate transports and protocols.
- PEP 418 discusses the issues of timekeeping.
- Tulip repo: http://code.google.com/p/tulip/
- Nick Coghlan wrote a nice blog post with some background, thoughts about different approaches to async I/O, gevent, and how to use futures with constructs like while, for and with: http://python-notes.boredomandlaziness.org/en/latest/pep_ideas/async_programming.html
- TBD: references to the relevant parts of Twisted, Tornado, ZeroMQ, pyftpdlib, libevent, libev, pyev, libuv, wattle, and so on.
Apart from PEP 3153, influences include PEP 380 and Greg Ewing's tutorial for yield from, Twisted, Tornado, ZeroMQ, pyftpdlib, tulip (the author's attempts at synthesis of all these), wattle (Steve Dower's counter-proposal), numerous discussions on python-ideas from September through December 2012, a Skype session with Steve Dower and Dino Viehland, email exchanges with Ben Darnell, an audience with Niels Provos (original author of libevent), and two in-person meetings with several Twisted developers, including Glyph, Brian Warner, David Reid, and Duncan McGreggor. Also, the author's previous work on async support in the NDB library for Google App Engine was an important influence.
This document has been placed in the public domain.