[Security-sig] Unified TLS API for Python

Christian Heimes christian at cheimes.de
Wed Jan 11 14:36:10 EST 2017


On 2017-01-11 20:01, Cory Benfield wrote:

> The ``Context`` abstract base class defines an object that allows configuration
> of TLS. It can be thought of as a factory for ``TLSWrappedSocket`` and
> ``TLSWrappedBuffer`` objects.
> 
> The ``Context`` abstract base class has the following class definition::
> 
>     TLSBufferObject = Union[TLSWrappedSocket, TLSWrappedBuffer]
>     ServerNameCallback = Callable[[TLSBufferObject, Optional[str], Context], Any]
> 
>     class _BaseContext(metaclass=ABCMeta):
> 
>         @property
>         @abstractmethod
>         def validate_certificates(self) -> bool:
>             """
>             Whether to validate the TLS certificates. This switch operates at a
>             very broad scope: either validation is enabled, in which case all
>             forms of validation are performed including hostname validation if
>             possible, or validation is disabled, in which case no validation is
>             performed.
> 
>             Not all backends support having their certificate validation
>             disabled. If a backend does not support having their certificate
>             validation disabled, attempting to set this property to ``False``
>             will throw a ``TLSError``.
>             """
> 
>         @validate_certificates.setter
>         @abstractmethod
>         def validate_certificates(self, val: bool) -> None:
>           pass


For 3.7 I'm planning to replace ssl.match_hostname() with OpenSSL
1.0.2's API. For now the one flag is enough. Later we can discuss
settings for wildcard, IP address and CN matching.

> 
>         @abstractmethod
>         def register_certificates(self,
>                                   certificates: str,
>                                   key=None: Optional[str],
>                                   password=None: Optional[Callable[[], Union[AnyStr, bytearray]]]) -> None:
>             """
>             Loads a certificate, a number of intermediate certificates, and the
>             corresponding private key. These certificates will be offered to
>             the remote peer during the handshake if required.
> 
>             The ``certificates`` argument must be a bytestring containing the
>             PEM-encoded certificates. The first PEM-encoded certificate must be
>             the leaf certificate. All subsequence certificates will be offered
>             as intermediate additional certificates.
> 
>             The ``key`` argument, if present, must contain the PEM-encoded
>             private key associated with the leaf certificate. If not present,
>             the private key will be extracted from ``certificates``.
> 
>             The ``password`` argument may be a function to call to get the
>             password for decrypting the private key. It will only be called if
>             the private key is encrypted and a password is necessary. It will
>             be called with no arguments, and it should return a string, bytes,
>             or bytearray. If the return value is a string it will be encoded as
>             UTF-8 before using it to decrypt the key. Alternatively a string,
>             bytes, or bytearray value may be supplied directly as the password
>             argument. It will be ignored if the private key is not encrypted
>             and no password is needed.
>             """

I don't think this function works for all libraries and use cases. For
some implementations the order of certificates is very important. For
NSS and PKCS#11 we rather need to specify the slot or nick name of the
cert. For 3.7 I also like to introduce X509 objects and EVP_Key wrapper,
so this function would need to consume a stack of certificates.

Since this function is only required for TLS servers and TLS client cert
authentication, I'd rather mark this function provisional or not define
it in the first version.



>         @abstractmethod
>         def set_ciphers(self, ciphers: List[Ciphers]) -> None:
>             """
>             Set the available ciphers for TLS connections created with this
>             context. ``ciphers`` should be a list of ciphers from the
>             ``Cipher`` registry. If none of the ``ciphers`` provided to this
>             object are supported or available, a ``TLSError`` will be raised.
>             """

Implementors should initial context with sensible default settings,
preferable system-wide settings. For example Fedora is currently
implementing https://fedoraproject.org/wiki/Changes/CryptoPolicy for
OpenSSL, NSS and GnuTLS.



> 
>         @abstractmethod
>         def set_inner_protocols(self, protocols: List[NextProtocol]) -> None:
>             """
>             Specify which protocols the socket should advertise as supported
>             during the TLS handshake. This may be advertised using either or
>             both of ALPN or NPN.
> 
>             ``protocols`` should be a list of acceptable protocols in the form
>             of ``NextProtocol`` objects, such as ``[H2, HTTP1]``, ordered by
>             preference. The selection of the protocol will happen during the
>             handshake, and will use whatever protocol negotiation mechanisms
>             are available and supported by both peers.
> 
>             If the TLS implementation doesn't support protocol negotiation,
>             this method will raise ``NotImplementedError``.
>             """
> 
>         @abstractmethod
>         def set_sni_callback(self, callback: Optional[ServerNameCallback]) -> None:
>             """
>             Register a callback function that will be called after the TLS
>             Client Hello handshake message has been received by the TLS server
>             when the TLS client specifies a server name indication.
> 
>             Only one callback can be set per ``Context``. If ``callback`` is
>             ``None`` then the callback is disabled. Calling this function a
>             subsequent time will disable the previously registered callback.
> 
>             The ``callback`` function will be called with three arguments: the
>             first will be the ``TLSBufferObject`` for the connection; the
>             second will be a string that represents the server name that the
>             client is intending to communicate (or ``None`` if the TLS Client
>             Hello does not contain a server name); and the third argument will
>             be the original ``Context``. The server name argument will be the
>             IDNA *decoded* server name.
> 
>             The ``callback`` must return ``None`` to allow negotiation to
>             continue. Other return values signal errors. Attempting to control
>             what error is signaled by the underlying TLS implementation is not
>             specified in this API, but is up to the concrete implementation to
>             handle.
>             """
> 
>         @abstractmethod
>         def set_version_range(self, lower_bound=None: Optional[TLSVersion],
>                               upper_bound=None: Optional[TLSVersion]) -> None:
>             """
>             Set the minumum and maximum versions of TLS that should be allowed
>             on TLS connections made by this context.
> 
>             If present, ``lower_bound`` will set the lowest acceptable TLS
>             version. If present, ``upper_bound`` will set the highest
>             acceptable TLS version. If either argument is ``None``, this will
>             leave that bound unchanged.
>             """

https://bugs.python.org/issue27876

>         @abstractmethod
>         def wrap_socket(self, socket: socket.socket, server_side=False: bool,
>                         auto_handshake=True: bool,
>                         server_hostname=None: Optional[str]) -> TLSWrappedSocket:
>             """
>             Wrap an existing Python socket object ``socket`` and return a
>             ``TLSWrappedSocket`` object. ``socket`` must be a ``SOCK_STREAM``
>             socket: all other socket types are unsupported.
> 
>             The returned SSL socket is tied to the context, its settings and
>             certificates.
> 
>             The parameter ``server_side`` is a boolean which identifies whether
>             server-side or client-side behavior is desired from this socket.
> 
>             The parameter ``auto_handshake`` specifies whether to do the SSL
>             handshake automatically after doing a ``socket.connect()``, or
>             whether the application program will call it explicitly, by
>             invoking the ``TLSWrappedSocket.do_handshake()`` method. Calling
>             ``TLSWrappedSocket.do_handshake()`` explicitly gives the program
>             control over the blocking behavior of the socket I/O involved in
>             the handshake.
> 
>             On client connections, the optional parameter ``server_hostname``
>             specifies the hostname of the service which we are connecting to.
>             This allows a single server to host multiple SSL-based services
>             with distinct certificates, quite similarly to HTTP virtual hosts.
>             Specifying ``server_hostname`` will raise a ValueError if
>             ``server_side`` is ``True``.
>             """
> 
>         @abstractmethod
>         def wrap_buffers(self, incoming: Any, outgoing: Any,
>                          server_side=False: bool,
>                          server_hostname=None: Optional[str]) -> TLSWrappedBuffer:
>             """
>             Wrap a pair of buffer objects (``incoming`` and ``outgoing``) to
>             create an in-memory stream for TLS. The SSL routines will read data
>             from ``incoming`` and decrypt it, and write encrypted data to
>             ``outgoing``.
> 
>             The ``server_side`` and ``server_hostname`` parameters have the
>             same meaning as in ``wrap_socket``.
>             """


How about not defining this methods at all? IMO it makes no sense to
support client and server connections from the same context. This is
also the gist of a PEP I'm currently working on.

Basically I want to get rid of all context protocols except for two:
PROTOCOL_TLS_CLIENT and PROTOCOL_TLS_SERVER. The client protocol only
supports the client side part of a TLS handshake and has sensible
default settings for clients (require cert and hostname verification).

The server protocol only supports the server side part of a TLS
handshake and has cert validation disabled by default. This removes the
need for server_side argument because the value can be inferred from the
context.


>     class ClientContext(metaclass=ABCMeta):
>         @abstractmethod
>         def wrap_socket(self, socket: socket.socket,
>                         auto_handshake=True: bool,
>                         server_hostname=None: Optional[str]) -> TLSWrappedSocket:
>             """
>             Wrap an existing Python socket object ``socket`` and return a
>             ``TLSWrappedSocket`` object. ``socket`` must be a ``SOCK_STREAM``
>             socket: all other socket types are unsupported.
> 
>             The returned SSL socket is tied to the context, its settings and
>             certificates.
> 
>             The parameter ``auto_handshake`` specifies whether to do the SSL
>             handshake automatically after doing a ``socket.connect()``, or
>             whether the application program will call it explicitly, by
>             invoking the ``TLSWrappedSocket.do_handshake()`` method. Calling
>             ``TLSWrappedSocket.do_handshake()`` explicitly gives the program
>             control over the blocking behavior of the socket I/O involved in
>             the handshake.
> 
>             The optional parameter ``server_hostname`` specifies the hostname
>             of the service which we are connecting to. This allows a single
>             server to host multiple SSL-based services with distinct
>             certificates, quite similarly to HTTP virtual hosts.
>             """
> 
>         @abstractmethod
>         def wrap_buffers(self, incoming: Any, outgoing: Any,
>                          server_hostname=None: Optional[str]) -> TLSWrappedBuffer:
>             """
>             Wrap a pair of buffer objects (``incoming`` and ``outgoing``) to
>             create an in-memory stream for TLS. The SSL routines will read data
>             from ``incoming`` and decrypt it, and write encrypted data to
>             ``outgoing``.
> 
>             The ``server_hostname`` parameter has the same meaning as in
>             ``wrap_socket``.
>             """
> 
> 
>     class ServerContext(metaclass=ABCMeta):
>         @abstractmethod
>         def wrap_socket(self, socket: socket.socket,
>                         auto_handshake=True: bool) -> TLSWrappedSocket:
>             """
>             Wrap an existing Python socket object ``socket`` and return a
>             ``TLSWrappedSocket`` object. ``socket`` must be a ``SOCK_STREAM``
>             socket: all other socket types are unsupported.
> 
>             The returned SSL socket is tied to the context, its settings and
>             certificates.
> 
>             The parameter ``auto_handshake`` specifies whether to do the SSL
>             handshake automatically after doing a ``socket.connect()``, or
>             whether the application program will call it explicitly, by
>             invoking the ``TLSWrappedSocket.do_handshake()`` method. Calling
>             ``TLSWrappedSocket.do_handshake()`` explicitly gives the program
>             control over the blocking behavior of the socket I/O involved in
>             the handshake.
>             """
> 
>         @abstractmethod
>         def wrap_buffers(self, incoming: Any, outgoing: Any) -> TLSWrappedBuffer:
>             """
>             Wrap a pair of buffer objects (``incoming`` and ``outgoing``) to
>             create an in-memory stream for TLS. The SSL routines will read data
>             from ``incoming`` and decrypt it, and write encrypted data to
>             ``outgoing``.
>             """
> 
> 
> Socket
> ~~~~~~
> 
> The socket-wrapper ABC will be defined by the ``TLSWrappedSocket`` ABC, which
> has the following definition::
> 
>     class TLSWrappedSocket(metaclass=ABCMeta):
>         # The various socket methods all must be implemented. Their definitions
>         # have been elided from this class defintion in the PEP because they
>         # aren't instructive.
>         @abstractmethod
>         def do_handshake(self) -> None:
>             """
>             Performs the TLS handshake. Also performs certificate validation
>             and hostname verification.
>             """
> 
>         @abstractmethod
>         def cipher(self) -> Optional[Cipher]:
>             """
>             Returns the Cipher entry for the cipher that has been negotiated on
>             the connection. If no connection has been negotiated, returns
>             ``None``.
>             """
> 
>         @abstractmethod
>         def negotiated_protocol(self) -> Optional[NextProtocol]:
>             """
>             Returns the protocol that was selected during the TLS handshake.
>             This selection may have been made using ALPN, NPN, or some future
>             negotiation mechanism.
> 
>             If ``Context.set_inner_protocols()`` was not called, if the other
>             party does not support protocol negotiation, if this socket does
>             not support any of the peer's proposed protocols, or if the
>             handshake has not happened yet, ``None`` is returned.
>             """
> 
>         @property
>         @abstractmethod
>         def context(self) -> Context:
>             """
>             The ``Context`` object this socket is tied to.
>             """
> 
>         @context.setter
>         @abstractmethod
>         def context(self, value: Context) -> None:
>             """
>             Set the value of the ``Context`` object this socket is tied to.
>             This operation (changing the context) may not always be supported.
>             """
> 
>         @abstractproperty
>         def negotiated_tls_version(self) -> Optional[TLSVersion]:
>             """
>             The version of TLS that has been negotiated on this connection.
>             """
> 
> 
> Buffer
> ~~~~~~
> 
> The buffer-wrapper ABC will be defined by the ``TLSWrappedBuffer`` ABC, which
> has the following definition::
> 
>     class TLSWrappedBuffer(metaclass=ABCMeta):
>         @abstractmethod
>         def read(self, len=1024: int, buffer=None: Any) -> Union[bytes, int]:
>             """
>             Read up to ``len`` bytes of data from the input buffer and return
>             the result as a ``bytes`` instance. If ``buffer`` is specified,
>             then read into the buffer instead, and return the number of bytes
>             read.
> 
>             Raise ``WantReadError`` or ``WantWriteError`` if there is
>             insufficient data in either the input or output buffer and the
>             operation would have caused data to be written or read.
> 
>             As at any time a re-negotiation is possible, a call to ``read()``
>             can also cause write operations.
>             """
> 
>         @abstractmethod
>         def write(self, buf: Any) -> int:
>             """
>             Write ``buf`` in encrypted form to the output buffer and return the
>             number of bytes written. The ``buf`` argument must be an object
>             supporting the buffer interface.
> 
>             Raise ``WantReadError`` or ``WantWriteError`` if there is
>             insufficient data in either the input or output buffer and the
>             operation would have caused data to be written or read.
> 
>             As at any time a re-negotiation is possible, a call to ``write()``
>             can also cause read operations.
>             """
> 
>         @abstractmethod
>         def do_handshake(self) -> None:
>             """
>             Performs the TLS handshake. Also performs certificate validation
>             and hostname verification.
>             """
> 
>         @abstractmethod
>         def cipher(self) -> Optional[Cipher]:
>             """
>             Returns the Cipher entry for the cipher that has been negotiated on
>             the connection. If no connection has been negotiated, returns
>             ``None``.
>             """
> 
>         @abstractmethod
>         def negotiated_protocol(self) -> Optional[NextProtocol]:
>             """
>             Returns the protocol that was selected during the TLS handshake.
>             This selection may have been made using ALPN, NPN, or some future
>             negotiation mechanism.
> 
>             If ``Context.set_inner_protocols()`` was not called, if the other
>             party does not support protocol negotiation, if this socket does
>             not support any of the peer's proposed protocols, or if the
>             handshake has not happened yet, ``None`` is returned.
>             """
> 
>         @property
>         @abstractmethod
>         def context(self) -> Context:
>             """
>             The ``Context`` object this socket is tied to.
>             """
> 
>         @context.setter
>         @abstractmethod
>         def context(self, value: Context) -> None:
>             """
>             Set the value of the ``Context`` object this socket is tied to.
>             This operation (changing the context) may not always be supported.
>             """
> 
>         @abstractproperty
>         def negotiated_tls_version(self) -> Optional[TLSVersion]:
>             """
>             The version of TLS that has been negotiated on this connection.
>             """
> 
> 
> Cipher Suites
> ~~~~~~~~~~~~~
> 
> Todo

Cipher suites are going to be a mess! Too bad OpenSSL and GnuTLS do not
use IANA names. :( It should be good enough to focus on a subset and use
the wire protocol values as internal identifiers.

https://raw.githubusercontent.com/tiran/tlsdb/master/tlsdb.json

> 
> Protocol Negotiation
> ~~~~~~~~~~~~~~~~~~~~
> 
> Both NPN and ALPN allow for protocol negotiation as part of the HTTP/2
> handshake. While NPN and ALPN are, at their fundamental level, built on top of
> bytestrings, string-based APIs are frequently problematic as they allow for
> errors in typing that can be hard to detect.
> 
> For this reason, this module would define a type that protocol negotiation
> implementations can pass and be passed. This type would wrap a bytestring, but
> allow for aliases for well-known protocols. This allows us to avoid the
> problems inherent in typos for well-known protocols, while allowing the full
> extensibility of the protocol negotiation layer if needed.
> 
> ::
> 
>     NextProtocol = namedtuple('NextProtocol', ['name'])
> 
>     H2 = NextProtocol(b'h2')
>     H2C = NextProtocol(b'h2c')
>     HTTP1 = NextProtocol(b'http/1.1')
>     WEBRTC = NextProtocol(b'webrtc')
>     C_WEBRTC = NextProtocol(b'c-webrtc')
>     FTP = NextProtocol(b'ftp')
>     STUN = NextProtocol(b'stun.nat-discovery')
>     TURN = NextProtocol(b'stun.turn')
> 
> TLS Versions
> ~~~~~~~~~~~~
> 
> It is often useful to be able to restrict the versions of TLS you're willing to
> support. There are many security advantages in refusing to use old versions of
> TLS, and some misbehaving servers will mishandle TLS clients advertising
> support for newer versions.
> 
> The following enumerated type can be used to gate TLS versions. Forward-looking
> applications should almost never set a maximum TLS version unless they
> absolutely must, as a TLS backend that is newer than the Python that uses it
> may support TLS versions that are not in this enumerated type.
> 
> Additionally, this enumerated type defines two additional flags that can always
> be used to request either the lowest or highest TLS version supported by an
> implementation.
> 
> ::
> 
>     class TLSVersion(Enum):
>         MINIMUM_SUPPORTED
>         SSLv2
>         SSLv3
>         TLSv1
>         TLSv1_1
>         TLSv1_2
>         TLSv1_3
>         MAXIMUM_SUPPORTED
> 
> 
> Errors
> ~~~~~~
> 
> This module would define three base classes for use with error handling. Unlike
> the other classes defined here, these classes are not *abstract*, as they have
> no behaviour. They exist simply to signal certain common behaviours. Backends
> should subclass these exceptions in their own packages, but needn't define any
> behaviour for them. These exceptions should *never* be thrown directly, they
> should always be subclassed.
> 
> The definitions of the errors are below::
> 
>     class TLSError(Exception):
>         """
>         The base exception for all TLS related errors from any backend.
>         Catching this error should be sufficient to catch *all* TLS errors,
>         regardless of what backend is used.
>         """
> 
>     class WantWriteError(TLSError):
>         """
>         A special signaling exception used only when non-blocking or
>         buffer-only I/O is used. This error signals that the requested
>         operation cannot complete until more data is written to the network,
>         or until the output buffer is drained.
>         """
> 
>     class WantReadError(TLSError):
>         """
>         A special signaling exception used only when non-blocking or
>         buffer-only I/O is used. This error signals that the requested
>         operation cannot complete until more data is read from the network, or
>         until more data is available in the input buffer.
>         """
> 
> Changes to the Standard Library
> ===============================
> 
> The portions of the standard library that interact with TLS should be revised
> to use these ABCs. This will allow them to function with other TLS backends.
> This includes the following modules:
> 
> - asyncio
> - ftplib
> - http.client
> - imaplib
> - nntplib
> - poplib
> - smtplib
> 
> 
> Future
> ======
> 
> Major future TLS features may require revisions of these ABCs. These revisions
> should be made cautiously: many backends may not be able to move forward
> swiftly, and will be invalidated by changes in these ABCs. This is acceptable,
> but wherever possible features that are specific to individual implementations
> should not be added to the ABCs. The ABCs should restrict themselves to
> high-level descriptions of IETF-specified features.
> 
> 
> ToDo
> ====
> 
> * Consider adding a new parameter (``valid_subjects``?) to ``wrap_socket`` and
>   ``wrap_buffers`` that specifies in a *typed* manner what kind of entries in
>   the SAN field are acceptable. This would break the union between SNI and
>   cert validation, which may be a good thing (you can't SNI an IP address, but
>   you can validate a cert with one if you want).
> * It's annoying that there's no type signature for fileobj. Do I really have to
>   define one as part of this PEP? Otherwise, how do I define the types of the
>   arguments to ``wrap_buffers``?
> * Do we need ways to control hostname validation?
> * Do we need to support getpeercert? Should we always return DER instead of the
>   weird semi-structured thing?
> * How do we load certs from locations on disk? What about HSMs?
> * How do we signal to load certs from the OS? What happens if an implementation
>   doesn't let you *not* load those certs?
> 
> 
> References
> ==========
> 
> .. _ssl module: https://docs.python.org/3/library/ssl.html
> .. _OpenSSL Library: https://www.openssl.org/
> .. _PyOpenSSL: https://pypi.org/project/pyOpenSSL/
> .. _certifi: https://pypi.org/project/certifi/
> .. _SSLContext: https://docs.python.org/3/library/ssl.html#ssl.SSLContext
> .. _SSLSocket: https://docs.python.org/3/library/ssl.html#ssl.SSLSocket
> .. _SSLObject: https://docs.python.org/3/library/ssl.html#ssl.SSLObject
> .. _SSLError: https://docs.python.org/3/library/ssl.html#ssl.SSLError
> 
> 
> Copyright
> =========
> 
> This document has been placed in the public domain.
> 
> 
> 
> _______________________________________________
> Security-SIG mailing list
> Security-SIG at python.org
> https://mail.python.org/mailman/listinfo/security-sig
> 



More information about the Security-SIG mailing list