[Python-checkins] bpo-43880: Show DeprecationWarnings for deprecated ssl module features (GH-25455)

tiran webhook-mailer at python.org
Mon Apr 19 01:27:19 EDT 2021


https://github.com/python/cpython/commit/2875c603b2a7691b55c2046aca54831c91efda8e
commit: 2875c603b2a7691b55c2046aca54831c91efda8e
branch: master
author: Christian Heimes <christian at python.org>
committer: tiran <christian at python.org>
date: 2021-04-19T07:27:10+02:00
summary:

bpo-43880: Show DeprecationWarnings for deprecated ssl module features (GH-25455)

* ssl.OP_NO_SSLv2
* ssl.OP_NO_SSLv3
* ssl.OP_NO_TLSv1
* ssl.OP_NO_TLSv1_1
* ssl.OP_NO_TLSv1_2
* ssl.OP_NO_TLSv1_3
* ssl.PROTOCOL_SSLv2
* ssl.PROTOCOL_SSLv3
* ssl.PROTOCOL_SSLv23 (alias for PROTOCOL_TLS)
* ssl.PROTOCOL_TLS
* ssl.PROTOCOL_TLSv1
* ssl.PROTOCOL_TLSv1_1
* ssl.PROTOCOL_TLSv1_2
* ssl.TLSVersion.SSLv3
* ssl.TLSVersion.TLSv1
* ssl.TLSVersion.TLSv1_1
* ssl.wrap_socket()
* ssl.RAND_pseudo_bytes()
* ssl.RAND_egd() (already removed since it's not supported by OpenSSL 1.1.1)
* ssl.SSLContext() without a protocol argument
* ssl.match_hostname()
* hashlib.pbkdf2_hmac() (pure Python implementation, fast OpenSSL
  function will stay)

Signed-off-by: Christian Heimes <christian at python.org>

files:
A Misc/NEWS.d/next/Library/2021-04-17-13-53-33.bpo-43880.-fC2JD.rst
M Doc/library/hashlib.rst
M Doc/library/ssl.rst
M Lib/hashlib.py
M Lib/ssl.py
M Lib/test/pythoninfo.py
M Lib/test/test_asyncio/utils.py
M Lib/test/test_ftplib.py
M Lib/test/test_hashlib.py
M Lib/test/test_imaplib.py
M Lib/test/test_nntplib.py
M Lib/test/test_poplib.py
M Lib/test/test_ssl.py
M Modules/_ssl.c

diff --git a/Doc/library/hashlib.rst b/Doc/library/hashlib.rst
index 48c0bab1eb306..d77a2e5912d5e 100644
--- a/Doc/library/hashlib.rst
+++ b/Doc/library/hashlib.rst
@@ -266,6 +266,12 @@ include a `salt <https://en.wikipedia.org/wiki/Salt_%28cryptography%29>`_.
       Python implementation uses an inline version of :mod:`hmac`. It is about
       three times slower and doesn't release the GIL.
 
+   .. deprecated:: 3.10
+
+      Slow Python implementation of *pbkdf2_hmac* is deprecated. In the
+      future the function will only be available when Python is compiled
+      with OpenSSL.
+
 .. function:: scrypt(password, *, salt, n, r, p, maxmem=0, dklen=64)
 
    The function provides scrypt password-based key derivation function as
diff --git a/Doc/library/ssl.rst b/Doc/library/ssl.rst
index 8bac365ffc0e4..c954d9c8febb0 100644
--- a/Doc/library/ssl.rst
+++ b/Doc/library/ssl.rst
@@ -25,8 +25,8 @@ probably additional platforms, as long as OpenSSL is installed on that platform.
 
    Some behavior may be platform dependent, since calls are made to the
    operating system socket APIs.  The installed version of OpenSSL may also
-   cause variations in behavior. For example, TLSv1.1 and TLSv1.2 come with
-   openssl version 1.0.1.
+   cause variations in behavior. For example, TLSv1.3 with OpenSSL version
+   1.1.1.
 
 .. warning::
    Don't use this module without reading the :ref:`ssl-security`.  Doing so
@@ -63,6 +63,8 @@ by SSL sockets created through the :meth:`SSLContext.wrap_socket` method.
    :pep:`644` has been implemented. The ssl module requires OpenSSL 1.1.1
    or newer.
 
+   Use of deprecated constants and functions result in deprecation warnings.
+
 
 Functions, Constants, and Exceptions
 ------------------------------------
@@ -136,8 +138,9 @@ purposes.
    :const:`None`, this function can choose to trust the system's default
    CA certificates instead.
 
-   The settings are: :data:`PROTOCOL_TLS`, :data:`OP_NO_SSLv2`, and
-   :data:`OP_NO_SSLv3` with high encryption cipher suites without RC4 and
+   The settings are: :data:`PROTOCOL_TLS_CLIENT` or
+   :data:`PROTOCOL_TLS_SERVER`, :data:`OP_NO_SSLv2`, and :data:`OP_NO_SSLv3`
+   with high encryption cipher suites without RC4 and
    without unauthenticated cipher suites. Passing :data:`~Purpose.SERVER_AUTH`
    as *purpose* sets :data:`~SSLContext.verify_mode` to :data:`CERT_REQUIRED`
    and either loads CA certificates (when at least one of *cafile*, *capath* or
@@ -185,6 +188,12 @@ purposes.
 
       Support for key logging to :envvar:`SSLKEYLOGFILE` was added.
 
+   .. versionchanged:: 3.10
+
+      The context now uses :data:`PROTOCOL_TLS_CLIENT` or
+      :data:`PROTOCOL_TLS_SERVER` protocol instead of generic
+      :data:`PROTOCOL_TLS`.
+
 
 Exceptions
 ^^^^^^^^^^
@@ -417,7 +426,7 @@ Certificate handling
       previously. Return an integer (no fractions of a second in the
       input format)
 
-.. function:: get_server_certificate(addr, ssl_version=PROTOCOL_TLS, ca_certs=None)
+.. function:: get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, ca_certs=None)
 
    Given the address ``addr`` of an SSL-protected server, as a (*hostname*,
    *port-number*) pair, fetches the server's certificate, and returns it as a
@@ -654,6 +663,8 @@ Constants
 
    .. versionadded:: 3.6
 
+   .. deprecated:: 3.10
+
 .. data:: PROTOCOL_TLS_CLIENT
 
    Auto-negotiate the highest protocol version like :data:`PROTOCOL_TLS`,
@@ -707,7 +718,10 @@ Constants
    .. deprecated:: 3.6
 
       OpenSSL has deprecated all version specific protocols. Use the default
-      protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
+      protocol :data:`PROTOCOL_TLS_SERVER` or :data:`PROTOCOL_TLS_CLIENT`
+      with :attr:`SSLContext.minimum_version` and
+      :attr:`SSLContext.maximum_version` instead.
+
 
 .. data:: PROTOCOL_TLSv1
 
@@ -715,8 +729,7 @@ Constants
 
    .. deprecated:: 3.6
 
-      OpenSSL has deprecated all version specific protocols. Use the default
-      protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
+      OpenSSL has deprecated all version specific protocols.
 
 .. data:: PROTOCOL_TLSv1_1
 
@@ -727,8 +740,7 @@ Constants
 
    .. deprecated:: 3.6
 
-      OpenSSL has deprecated all version specific protocols. Use the default
-      protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
+      OpenSSL has deprecated all version specific protocols.
 
 .. data:: PROTOCOL_TLSv1_2
 
@@ -739,8 +751,7 @@ Constants
 
    .. deprecated:: 3.6
 
-      OpenSSL has deprecated all version specific protocols. Use the default
-      protocol :data:`PROTOCOL_TLS` with flags like :data:`OP_NO_SSLv3` instead.
+      OpenSSL has deprecated all version specific protocols.
 
 .. data:: OP_ALL
 
@@ -762,7 +773,6 @@ Constants
 
       SSLv2 is deprecated
 
-
 .. data:: OP_NO_SSLv3
 
    Prevents an SSLv3 connection.  This option is only applicable in
@@ -1068,6 +1078,11 @@ Constants
 
    SSL 3.0 to TLS 1.3.
 
+   .. deprecated:: 3.10
+
+      All :class:`TLSVersion` members except :attr:`TLSVersion.TLSv1_2` and
+      :attr:`TLSVersion.TLSv1_3` are deprecated.
+
 
 SSL Sockets
 -----------
@@ -1423,7 +1438,7 @@ such as SSL configuration options, certificate(s) and private key(s).
 It also manages a cache of SSL sessions for server-side sockets, in order
 to speed up repeated connections from the same clients.
 
-.. class:: SSLContext(protocol=PROTOCOL_TLS)
+.. class:: SSLContext(protocol=None)
 
    Create a new SSL context.  You may pass *protocol* which must be one
    of the ``PROTOCOL_*`` constants defined in this module.  The parameter
@@ -1472,6 +1487,12 @@ to speed up repeated connections from the same clients.
       ciphers, no ``NULL`` ciphers and no ``MD5`` ciphers (except for
       :data:`PROTOCOL_SSLv2`).
 
+   .. deprecated:: 3.10
+
+      :class:`SSLContext` without protocol argument is deprecated. The
+      context class will either require :data:`PROTOCOL_TLS_CLIENT` or
+      :data:`PROTOCOL_TLS_SERVER` protocol in the future.
+
 
 :class:`SSLContext` objects have the following methods and attributes:
 
@@ -1934,7 +1955,7 @@ to speed up repeated connections from the same clients.
 .. attribute:: SSLContext.num_tickets
 
    Control the number of TLS 1.3 session tickets of a
-   :attr:`TLS_PROTOCOL_SERVER` context. The setting has no impact on TLS
+   :attr:`PROTOCOL_TLS_SERVER` context. The setting has no impact on TLS
    1.0 to 1.2 connections.
 
    .. versionadded:: 3.8
@@ -1951,6 +1972,12 @@ to speed up repeated connections from the same clients.
          >>> ssl.create_default_context().options  # doctest: +SKIP
          <Options.OP_ALL|OP_NO_SSLv3|OP_NO_SSLv2|OP_NO_COMPRESSION: 2197947391>
 
+   .. deprecated:: 3.7
+
+      All ``OP_NO_SSL*`` and ``OP_NO_TLS*`` options have been deprecated since
+      Python 3.7. Use :attr:`SSLContext.minimum_version` and
+      :attr:`SSLContext.maximum_version` instead.
+
 .. attribute:: SSLContext.post_handshake_auth
 
    Enable TLS 1.3 post-handshake client authentication. Post-handshake auth
@@ -2623,8 +2650,8 @@ disabled by default.
 ::
 
    >>> client_context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
-   >>> client_context.options |= ssl.OP_NO_TLSv1
-   >>> client_context.options |= ssl.OP_NO_TLSv1_1
+   >>> client_context.minimum_version = ssl.TLSVersion.TLSv1_3
+   >>> client_context.maximum_version = ssl.TLSVersion.TLSv1_3
 
 
 The SSL context created above will only allow TLSv1.2 and later (if
diff --git a/Lib/hashlib.py b/Lib/hashlib.py
index ffa3be049a4f3..21a73f3bf6cb6 100644
--- a/Lib/hashlib.py
+++ b/Lib/hashlib.py
@@ -181,6 +181,7 @@ def __hash_new(name, data=b'', **kwargs):
     # OpenSSL's PKCS5_PBKDF2_HMAC requires OpenSSL 1.0+ with HMAC and SHA
     from _hashlib import pbkdf2_hmac
 except ImportError:
+    from warnings import warn as _warn
     _trans_5C = bytes((x ^ 0x5C) for x in range(256))
     _trans_36 = bytes((x ^ 0x36) for x in range(256))
 
@@ -191,6 +192,11 @@ def pbkdf2_hmac(hash_name, password, salt, iterations, dklen=None):
         as OpenSSL's PKCS5_PBKDF2_HMAC for short passwords and much faster
         for long passwords.
         """
+        _warn(
+            "Python implementation of pbkdf2_hmac() is deprecated.",
+            category=DeprecationWarning,
+            stacklevel=2
+        )
         if not isinstance(hash_name, str):
             raise TypeError(hash_name)
 
diff --git a/Lib/ssl.py b/Lib/ssl.py
index 99d0852dad18d..d631805d6cabe 100644
--- a/Lib/ssl.py
+++ b/Lib/ssl.py
@@ -381,6 +381,11 @@ def match_hostname(cert, hostname):
     CertificateError is raised on failure. On success, the function
     returns nothing.
     """
+    warnings.warn(
+        "ssl module: match_hostname() is deprecated",
+        category=DeprecationWarning,
+        stacklevel=2
+    )
     if not cert:
         raise ValueError("empty or no certificate, match_hostname needs a "
                          "SSL socket or SSL context with either "
@@ -479,7 +484,15 @@ class SSLContext(_SSLContext):
     sslsocket_class = None  # SSLSocket is assigned later.
     sslobject_class = None  # SSLObject is assigned later.
 
-    def __new__(cls, protocol=PROTOCOL_TLS, *args, **kwargs):
+    def __new__(cls, protocol=None, *args, **kwargs):
+        if protocol is None:
+            warnings.warn(
+                "ssl module: "
+                "SSLContext() without protocol argument is deprecated.",
+                category=DeprecationWarning,
+                stacklevel=2
+            )
+            protocol = PROTOCOL_TLS
         self = _SSLContext.__new__(cls, protocol)
         return self
 
@@ -518,6 +531,7 @@ def wrap_bio(self, incoming, outgoing, server_side=False,
         )
 
     def set_npn_protocols(self, npn_protocols):
+        warnings.warn("NPN is deprecated, use ALPN instead", stacklevel=2)
         protos = bytearray()
         for protocol in npn_protocols:
             b = bytes(protocol, 'ascii')
@@ -734,12 +748,15 @@ def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
     # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
     # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
     # by default.
-    context = SSLContext(PROTOCOL_TLS)
-
     if purpose == Purpose.SERVER_AUTH:
         # verify certs and host name in client mode
+        context = SSLContext(PROTOCOL_TLS_CLIENT)
         context.verify_mode = CERT_REQUIRED
         context.check_hostname = True
+    elif purpose == Purpose.CLIENT_AUTH:
+        context = SSLContext(PROTOCOL_TLS_SERVER)
+    else:
+        raise ValueError(purpose)
 
     if cafile or capath or cadata:
         context.load_verify_locations(cafile, capath, cadata)
@@ -755,7 +772,7 @@ def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None,
             context.keylog_filename = keylogfile
     return context
 
-def _create_unverified_context(protocol=PROTOCOL_TLS, *, cert_reqs=CERT_NONE,
+def _create_unverified_context(protocol=None, *, cert_reqs=CERT_NONE,
                            check_hostname=False, purpose=Purpose.SERVER_AUTH,
                            certfile=None, keyfile=None,
                            cafile=None, capath=None, cadata=None):
@@ -772,10 +789,18 @@ def _create_unverified_context(protocol=PROTOCOL_TLS, *, cert_reqs=CERT_NONE,
     # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION,
     # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE
     # by default.
-    context = SSLContext(protocol)
+    if purpose == Purpose.SERVER_AUTH:
+        # verify certs and host name in client mode
+        if protocol is None:
+            protocol = PROTOCOL_TLS_CLIENT
+    elif purpose == Purpose.CLIENT_AUTH:
+        if protocol is None:
+            protocol = PROTOCOL_TLS_SERVER
+    else:
+        raise ValueError(purpose)
 
-    if not check_hostname:
-        context.check_hostname = False
+    context = SSLContext(protocol)
+    context.check_hostname = check_hostname
     if cert_reqs is not None:
         context.verify_mode = cert_reqs
     if check_hostname:
@@ -909,6 +934,9 @@ def selected_npn_protocol(self):
         """Return the currently selected NPN protocol as a string, or ``None``
         if a next protocol was not negotiated or if NPN is not supported by one
         of the peers."""
+        warnings.warn(
+            "ssl module: NPN is deprecated, use ALPN instead", stacklevel=2
+        )
 
     def selected_alpn_protocol(self):
         """Return the currently selected ALPN protocol as a string, or ``None``
@@ -1123,6 +1151,9 @@ def getpeercert(self, binary_form=False):
     @_sslcopydoc
     def selected_npn_protocol(self):
         self._checkClosed()
+        warnings.warn(
+            "ssl module: NPN is deprecated, use ALPN instead", stacklevel=2
+        )
         return None
 
     @_sslcopydoc
@@ -1382,7 +1413,11 @@ def wrap_socket(sock, keyfile=None, certfile=None,
                 do_handshake_on_connect=True,
                 suppress_ragged_eofs=True,
                 ciphers=None):
-
+    warnings.warn(
+        "ssl module: wrap_socket is deprecated, use SSLContext.wrap_socket()",
+        category=DeprecationWarning,
+        stacklevel=2
+    )
     if server_side and not certfile:
         raise ValueError("certfile must be specified for server-side "
                          "operations")
@@ -1460,7 +1495,7 @@ def PEM_cert_to_DER_cert(pem_cert_string):
     d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)]
     return base64.decodebytes(d.encode('ASCII', 'strict'))
 
-def get_server_certificate(addr, ssl_version=PROTOCOL_TLS, ca_certs=None):
+def get_server_certificate(addr, ssl_version=PROTOCOL_TLS_CLIENT, ca_certs=None):
     """Retrieve the certificate from the server at the specified address,
     and return it as a PEM-encoded string.
     If 'ca_certs' is specified, validate the server cert against it.
diff --git a/Lib/test/pythoninfo.py b/Lib/test/pythoninfo.py
index cc228fb3b54c9..278dfe7f7da7a 100644
--- a/Lib/test/pythoninfo.py
+++ b/Lib/test/pythoninfo.py
@@ -504,7 +504,7 @@ def format_attr(attr, value):
     copy_attributes(info_add, ssl, 'ssl.%s', attributes, formatter=format_attr)
 
     for name, ctx in (
-        ('SSLContext', ssl.SSLContext()),
+        ('SSLContext', ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)),
         ('default_https_context', ssl._create_default_https_context()),
         ('stdlib_context', ssl._create_stdlib_context()),
     ):
diff --git a/Lib/test/test_asyncio/utils.py b/Lib/test/test_asyncio/utils.py
index 0032c9a331f75..3765194cd0dd2 100644
--- a/Lib/test/test_asyncio/utils.py
+++ b/Lib/test/test_asyncio/utils.py
@@ -91,7 +91,7 @@ def dummy_ssl_context():
     if ssl is None:
         return None
     else:
-        return ssl.SSLContext(ssl.PROTOCOL_TLS)
+        return simple_client_sslcontext(disable_verify=True)
 
 
 def run_briefly(loop):
@@ -158,7 +158,7 @@ def finish_request(self, request, client_address):
         # contains the ssl key and certificate files) differs
         # between the stdlib and stand-alone asyncio.
         # Prefer our own if we can find it.
-        context = ssl.SSLContext()
+        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
         context.load_cert_chain(ONLYCERT, ONLYKEY)
 
         ssock = context.wrap_socket(request, server_side=True)
diff --git a/Lib/test/test_ftplib.py b/Lib/test/test_ftplib.py
index 154dce15e2c37..a48b429ca3802 100644
--- a/Lib/test/test_ftplib.py
+++ b/Lib/test/test_ftplib.py
@@ -324,7 +324,7 @@ class SSLConnection(asyncore.dispatcher):
         _ssl_closing = False
 
         def secure_connection(self):
-            context = ssl.SSLContext()
+            context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
             context.load_cert_chain(CERTFILE)
             socket = context.wrap_socket(self.socket,
                                          suppress_ragged_eofs=False,
diff --git a/Lib/test/test_hashlib.py b/Lib/test/test_hashlib.py
index bf9f559400460..9e9c874445c27 100644
--- a/Lib/test/test_hashlib.py
+++ b/Lib/test/test_hashlib.py
@@ -21,6 +21,7 @@
 from test.support import _4G, bigmemtest
 from test.support.import_helper import import_fresh_module
 from test.support import threading_helper
+from test.support import warnings_helper
 from http.client import HTTPException
 
 # Were we compiled --with-pydebug or with #define Py_DEBUG?
@@ -1021,7 +1022,10 @@ def _test_pbkdf2_hmac(self, pbkdf2, supported):
 
     @unittest.skipIf(builtin_hashlib is None, "test requires builtin_hashlib")
     def test_pbkdf2_hmac_py(self):
-        self._test_pbkdf2_hmac(builtin_hashlib.pbkdf2_hmac, builtin_hashes)
+        with warnings_helper.check_warnings():
+            self._test_pbkdf2_hmac(
+                builtin_hashlib.pbkdf2_hmac, builtin_hashes
+            )
 
     @unittest.skipUnless(hasattr(openssl_hashlib, 'pbkdf2_hmac'),
                      '   test requires OpenSSL > 1.0')
diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py
index 0cab7897a96dc..c2b935f58164e 100644
--- a/Lib/test/test_imaplib.py
+++ b/Lib/test/test_imaplib.py
@@ -96,7 +96,7 @@ class SecureTCPServer(socketserver.TCPServer):
 
         def get_request(self):
             newsocket, fromaddr = self.socket.accept()
-            context = ssl.SSLContext()
+            context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
             context.load_cert_chain(CERTFILE)
             connstream = context.wrap_socket(newsocket, server_side=True)
             return connstream, fromaddr
diff --git a/Lib/test/test_nntplib.py b/Lib/test/test_nntplib.py
index 4dbf941036f09..230a444389467 100644
--- a/Lib/test/test_nntplib.py
+++ b/Lib/test/test_nntplib.py
@@ -1602,7 +1602,7 @@ def run_server(self, sock):
                 elif cmd == b'STARTTLS\r\n':
                     reader.close()
                     client.sendall(b'382 Begin TLS negotiation now\r\n')
-                    context = ssl.SSLContext()
+                    context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
                     context.load_cert_chain(certfile)
                     client = context.wrap_socket(
                         client, server_side=True)
diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py
index 548868362a397..c5ae9f77e4f00 100644
--- a/Lib/test/test_poplib.py
+++ b/Lib/test/test_poplib.py
@@ -155,7 +155,7 @@ def cmd_utf8(self, arg):
         def cmd_stls(self, arg):
             if self.tls_active is False:
                 self.push('+OK Begin TLS negotiation')
-                context = ssl.SSLContext()
+                context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
                 context.load_cert_chain(CERTFILE)
                 tls_sock = context.wrap_socket(self.socket,
                                                server_side=True,
diff --git a/Lib/test/test_ssl.py b/Lib/test/test_ssl.py
index 7b70979026dcf..a2c79ff5927f1 100644
--- a/Lib/test/test_ssl.py
+++ b/Lib/test/test_ssl.py
@@ -224,7 +224,7 @@ def has_tls_version(version):
 
     # check runtime and dynamic crypto policy settings. A TLS version may
     # be compiled in but disabled by a policy or config option.
-    ctx = ssl.SSLContext()
+    ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
     if (
             hasattr(ctx, 'minimum_version') and
             ctx.minimum_version != ssl.TLSVersion.MINIMUM_SUPPORTED and
@@ -306,12 +306,20 @@ def asn1time(cert_time):
 
 needs_sni = unittest.skipUnless(ssl.HAS_SNI, "SNI support needed for this test")
 
+ignore_deprecation = warnings_helper.ignore_warnings(
+    category=DeprecationWarning
+)
+
 
-def test_wrap_socket(sock, ssl_version=ssl.PROTOCOL_TLS, *,
+def test_wrap_socket(sock, *,
                      cert_reqs=ssl.CERT_NONE, ca_certs=None,
                      ciphers=None, certfile=None, keyfile=None,
                      **kwargs):
-    context = ssl.SSLContext(ssl_version)
+    if not kwargs.get("server_side"):
+        kwargs["server_hostname"] = SIGNED_CERTFILE_HOSTNAME
+        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+    else:
+        context = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
     if cert_reqs is not None:
         if cert_reqs == ssl.CERT_NONE:
             context.check_hostname = False
@@ -378,8 +386,8 @@ def test_private_init(self):
     def test_str_for_enums(self):
         # Make sure that the PROTOCOL_* constants have enum-like string
         # reprs.
-        proto = ssl.PROTOCOL_TLS
-        self.assertEqual(str(proto), 'PROTOCOL_TLS')
+        proto = ssl.PROTOCOL_TLS_CLIENT
+        self.assertEqual(str(proto), 'PROTOCOL_TLS_CLIENT')
         ctx = ssl.SSLContext(proto)
         self.assertIs(ctx.protocol, proto)
 
@@ -390,7 +398,8 @@ def test_random(self):
                              % (v, (v and "sufficient randomness") or
                                 "insufficient randomness"))
 
-        data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
+        with warnings_helper.check_warnings():
+            data, is_cryptographic = ssl.RAND_pseudo_bytes(16)
         self.assertEqual(len(data), 16)
         self.assertEqual(is_cryptographic, v == 1)
         if v:
@@ -401,48 +410,13 @@ def test_random(self):
 
         # negative num is invalid
         self.assertRaises(ValueError, ssl.RAND_bytes, -5)
-        self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
+        with warnings_helper.check_warnings():
+            self.assertRaises(ValueError, ssl.RAND_pseudo_bytes, -5)
 
-        if hasattr(ssl, 'RAND_egd'):
-            self.assertRaises(TypeError, ssl.RAND_egd, 1)
-            self.assertRaises(TypeError, ssl.RAND_egd, 'foo', 1)
         ssl.RAND_add("this is a random string", 75.0)
         ssl.RAND_add(b"this is a random bytes object", 75.0)
         ssl.RAND_add(bytearray(b"this is a random bytearray object"), 75.0)
 
-    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
-    def test_random_fork(self):
-        status = ssl.RAND_status()
-        if not status:
-            self.fail("OpenSSL's PRNG has insufficient randomness")
-
-        rfd, wfd = os.pipe()
-        pid = os.fork()
-        if pid == 0:
-            try:
-                os.close(rfd)
-                child_random = ssl.RAND_pseudo_bytes(16)[0]
-                self.assertEqual(len(child_random), 16)
-                os.write(wfd, child_random)
-                os.close(wfd)
-            except BaseException:
-                os._exit(1)
-            else:
-                os._exit(0)
-        else:
-            os.close(wfd)
-            self.addCleanup(os.close, rfd)
-            support.wait_process(pid, exitcode=0)
-
-            child_random = os.read(rfd, 16)
-            self.assertEqual(len(child_random), 16)
-            parent_random = ssl.RAND_pseudo_bytes(16)[0]
-            self.assertEqual(len(parent_random), 16)
-
-            self.assertNotEqual(child_random, parent_random)
-
-    maxDiff = None
-
     def test_parse_cert(self):
         # note that this uses an 'unofficial' function in _ssl.c,
         # provided solely for this test, to exercise the certificate
@@ -624,6 +598,7 @@ def test_timeout(self):
             with test_wrap_socket(s) as ss:
                 self.assertEqual(timeout, ss.gettimeout())
 
+    @ignore_deprecation
     def test_errors_sslwrap(self):
         sock = socket.socket()
         self.assertRaisesRegex(ValueError,
@@ -675,6 +650,7 @@ def test_malformed_key(self):
         """Wrapping with a badly formatted key (syntax error)"""
         self.bad_cert_test("badkey.pem")
 
+    @ignore_deprecation
     def test_match_hostname(self):
         def ok(cert, hostname):
             ssl.match_hostname(cert, hostname)
@@ -1126,17 +1102,15 @@ class ContextTests(unittest.TestCase):
 
     def test_constructor(self):
         for protocol in PROTOCOLS:
-            ssl.SSLContext(protocol)
-        ctx = ssl.SSLContext()
+            with warnings_helper.check_warnings():
+                ctx = ssl.SSLContext(protocol)
+            self.assertEqual(ctx.protocol, protocol)
+        with warnings_helper.check_warnings():
+            ctx = ssl.SSLContext()
         self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
         self.assertRaises(ValueError, ssl.SSLContext, -1)
         self.assertRaises(ValueError, ssl.SSLContext, 42)
 
-    def test_protocol(self):
-        for proto in PROTOCOLS:
-            ctx = ssl.SSLContext(proto)
-            self.assertEqual(ctx.protocol, proto)
-
     def test_ciphers(self):
         ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
         ctx.set_ciphers("ALL")
@@ -1174,16 +1148,19 @@ def test_options(self):
                     OP_ENABLE_MIDDLEBOX_COMPAT |
                     OP_IGNORE_UNEXPECTED_EOF)
         self.assertEqual(default, ctx.options)
-        ctx.options |= ssl.OP_NO_TLSv1
+        with warnings_helper.check_warnings():
+            ctx.options |= ssl.OP_NO_TLSv1
         self.assertEqual(default | ssl.OP_NO_TLSv1, ctx.options)
-        ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
+        with warnings_helper.check_warnings():
+            ctx.options = (ctx.options & ~ssl.OP_NO_TLSv1)
         self.assertEqual(default, ctx.options)
         ctx.options = 0
         # Ubuntu has OP_NO_SSLv3 forced on by default
         self.assertEqual(0, ctx.options & ~ssl.OP_NO_SSLv3)
 
     def test_verify_mode_protocol(self):
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+        with warnings_helper.check_warnings():
+            ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
         # Default value
         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
         ctx.verify_mode = ssl.CERT_OPTIONAL
@@ -1221,6 +1198,7 @@ def test_hostname_checks_common_name(self):
 
     @requires_minimum_version
     @unittest.skipIf(IS_LIBRESSL, "see bpo-34001")
+    @ignore_deprecation
     def test_min_max_version(self):
         ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_SERVER)
         # OpenSSL default is MINIMUM_SUPPORTED, however some vendors like
@@ -1304,7 +1282,7 @@ def test_min_max_version(self):
         "requires OpenSSL >= 1.1.0"
     )
     def test_security_level(self):
-        ctx = ssl.SSLContext()
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
         # The default security callback allows for levels between 0-5
         # with OpenSSL defaulting to 1, however some vendors override the
         # default value (e.g. Debian defaults to 2)
@@ -1513,7 +1491,7 @@ def test_load_dh_params(self):
             ctx.load_dh_params(CERTFILE)
 
     def test_session_stats(self):
-        for proto in PROTOCOLS:
+        for proto in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
             ctx = ssl.SSLContext(proto)
             self.assertEqual(ctx.session_stats(), {
                 'number': 0,
@@ -1673,7 +1651,7 @@ def _assert_context_options(self, ctx):
     def test_create_default_context(self):
         ctx = ssl.create_default_context()
 
-        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
         self.assertTrue(ctx.check_hostname)
         self._assert_context_options(ctx)
@@ -1682,42 +1660,49 @@ def test_create_default_context(self):
             cadata = f.read()
         ctx = ssl.create_default_context(cafile=SIGNING_CA, capath=CAPATH,
                                          cadata=cadata)
-        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
         self._assert_context_options(ctx)
 
         ctx = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
-        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER)
         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
         self._assert_context_options(ctx)
 
+
+
     def test__create_stdlib_context(self):
         ctx = ssl._create_stdlib_context()
-        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_CLIENT)
         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
         self.assertFalse(ctx.check_hostname)
         self._assert_context_options(ctx)
 
-        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
+        with warnings_helper.check_warnings():
+            ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1)
         self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
         self._assert_context_options(ctx)
 
-        ctx = ssl._create_stdlib_context(ssl.PROTOCOL_TLSv1,
-                                         cert_reqs=ssl.CERT_REQUIRED,
-                                         check_hostname=True)
-        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1)
+        with warnings_helper.check_warnings():
+            ctx = ssl._create_stdlib_context(
+                ssl.PROTOCOL_TLSv1_2,
+                cert_reqs=ssl.CERT_REQUIRED,
+                check_hostname=True
+            )
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLSv1_2)
         self.assertEqual(ctx.verify_mode, ssl.CERT_REQUIRED)
         self.assertTrue(ctx.check_hostname)
         self._assert_context_options(ctx)
 
         ctx = ssl._create_stdlib_context(purpose=ssl.Purpose.CLIENT_AUTH)
-        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS)
+        self.assertEqual(ctx.protocol, ssl.PROTOCOL_TLS_SERVER)
         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
         self._assert_context_options(ctx)
 
     def test_check_hostname(self):
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+        with warnings_helper.check_warnings():
+            ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
         self.assertFalse(ctx.check_hostname)
         self.assertEqual(ctx.verify_mode, ssl.CERT_NONE)
 
@@ -2042,7 +2027,9 @@ def test_non_blocking_connect_ex(self):
 
     def test_connect_with_context(self):
         # Same as test_connect, but with a separately created context
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        ctx.check_hostname = False
+        ctx.verify_mode = ssl.CERT_NONE
         with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
             s.connect(self.server_addr)
             self.assertEqual({}, s.getpeercert())
@@ -2062,9 +2049,11 @@ def test_connect_with_context_fail(self):
         # This should fail because we have no verification certs. Connection
         # failure crashes ThreadedEchoServer, so run this in an independent
         # test method.
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        ctx.verify_mode = ssl.CERT_REQUIRED
-        s = ctx.wrap_socket(socket.socket(socket.AF_INET))
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        s = ctx.wrap_socket(
+            socket.socket(socket.AF_INET),
+            server_hostname=SIGNED_CERTFILE_HOSTNAME
+        )
         self.addCleanup(s.close)
         self.assertRaisesRegex(ssl.SSLError, "certificate verify failed",
                                 s.connect, self.server_addr)
@@ -2075,19 +2064,19 @@ def test_connect_capath(self):
         # OpenSSL 0.9.8n and 1.0.0, as a result the capath directory must
         # contain both versions of each certificate (same content, different
         # filename) for this test to be portable across OpenSSL releases.
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
         ctx.load_verify_locations(capath=CAPATH)
-        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+        with ctx.wrap_socket(socket.socket(socket.AF_INET),
+                             server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
             s.connect(self.server_addr)
             cert = s.getpeercert()
             self.assertTrue(cert)
 
         # Same with a bytes `capath` argument
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
         ctx.load_verify_locations(capath=BYTES_CAPATH)
-        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+        with ctx.wrap_socket(socket.socket(socket.AF_INET),
+                             server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
             s.connect(self.server_addr)
             cert = s.getpeercert()
             self.assertTrue(cert)
@@ -2096,19 +2085,19 @@ def test_connect_cadata(self):
         with open(SIGNING_CA) as f:
             pem = f.read()
         der = ssl.PEM_cert_to_DER_cert(pem)
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
         ctx.load_verify_locations(cadata=pem)
-        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+        with ctx.wrap_socket(socket.socket(socket.AF_INET),
+                             server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
             s.connect(self.server_addr)
             cert = s.getpeercert()
             self.assertTrue(cert)
 
         # same with DER
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        ctx.verify_mode = ssl.CERT_REQUIRED
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
         ctx.load_verify_locations(cadata=der)
-        with ctx.wrap_socket(socket.socket(socket.AF_INET)) as s:
+        with ctx.wrap_socket(socket.socket(socket.AF_INET),
+                             server_hostname=SIGNED_CERTFILE_HOSTNAME) as s:
             s.connect(self.server_addr)
             cert = s.getpeercert()
             self.assertTrue(cert)
@@ -2302,7 +2291,8 @@ def test_bio_read_write_data(self):
         sock.connect(self.server_addr)
         incoming = ssl.MemoryBIO()
         outgoing = ssl.MemoryBIO()
-        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS)
+        ctx = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        ctx.check_hostname = False
         ctx.verify_mode = ssl.CERT_NONE
         sslobj = ctx.wrap_bio(incoming, outgoing, False)
         self.ssl_io_loop(sock, incoming, outgoing, sslobj.do_handshake)
@@ -2384,7 +2374,6 @@ def wrap_conn(self):
             try:
                 self.sslconn = self.server.context.wrap_socket(
                     self.sock, server_side=True)
-                self.server.selected_npn_protocols.append(self.sslconn.selected_npn_protocol())
                 self.server.selected_alpn_protocols.append(self.sslconn.selected_alpn_protocol())
             except (ConnectionResetError, BrokenPipeError, ConnectionAbortedError) as e:
                 # We treat ConnectionResetError as though it were an
@@ -2433,8 +2422,6 @@ def wrap_conn(self):
                 cipher = self.sslconn.cipher()
                 if support.verbose and self.server.chatty:
                     sys.stdout.write(" server: connection cipher is now " + str(cipher) + "\n")
-                    sys.stdout.write(" server: selected protocol is now "
-                            + str(self.sslconn.selected_npn_protocol()) + "\n")
                 return True
 
         def read(self):
@@ -2562,7 +2549,7 @@ def run(self):
     def __init__(self, certificate=None, ssl_version=None,
                  certreqs=None, cacerts=None,
                  chatty=True, connectionchatty=False, starttls_server=False,
-                 npn_protocols=None, alpn_protocols=None,
+                 alpn_protocols=None,
                  ciphers=None, context=None):
         if context:
             self.context = context
@@ -2576,8 +2563,6 @@ def __init__(self, certificate=None, ssl_version=None,
                 self.context.load_verify_locations(cacerts)
             if certificate:
                 self.context.load_cert_chain(certificate)
-            if npn_protocols:
-                self.context.set_npn_protocols(npn_protocols)
             if alpn_protocols:
                 self.context.set_alpn_protocols(alpn_protocols)
             if ciphers:
@@ -2589,7 +2574,6 @@ def __init__(self, certificate=None, ssl_version=None,
         self.port = socket_helper.bind_port(self.sock)
         self.flag = None
         self.active = False
-        self.selected_npn_protocols = []
         self.selected_alpn_protocols = []
         self.shared_ciphers = []
         self.conn_errors = []
@@ -2796,14 +2780,12 @@ def server_params_test(client_context, server_context, indata=b"FOO\n",
                 'cipher': s.cipher(),
                 'peercert': s.getpeercert(),
                 'client_alpn_protocol': s.selected_alpn_protocol(),
-                'client_npn_protocol': s.selected_npn_protocol(),
                 'version': s.version(),
                 'session_reused': s.session_reused,
                 'session': s.session,
             })
             s.close()
         stats['server_alpn_protocols'] = server.selected_alpn_protocols
-        stats['server_npn_protocols'] = server.selected_npn_protocols
         stats['server_shared_ciphers'] = server.shared_ciphers
     return stats
 
@@ -2829,21 +2811,26 @@ def try_protocol_combo(server_protocol, client_protocol, expect_success,
                          (ssl.get_protocol_name(client_protocol),
                           ssl.get_protocol_name(server_protocol),
                           certtype))
-    client_context = ssl.SSLContext(client_protocol)
-    client_context.options |= client_options
-    server_context = ssl.SSLContext(server_protocol)
-    server_context.options |= server_options
+
+    with warnings_helper.check_warnings():
+        # ignore Deprecation warnings
+        client_context = ssl.SSLContext(client_protocol)
+        client_context.options |= client_options
+        server_context = ssl.SSLContext(server_protocol)
+        server_context.options |= server_options
 
     min_version = PROTOCOL_TO_TLS_VERSION.get(client_protocol, None)
     if (min_version is not None
-    # SSLContext.minimum_version is only available on recent OpenSSL
-    # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
-    and hasattr(server_context, 'minimum_version')
-    and server_protocol == ssl.PROTOCOL_TLS
-    and server_context.minimum_version > min_version):
+        # SSLContext.minimum_version is only available on recent OpenSSL
+        # (setter added in OpenSSL 1.1.0, getter added in OpenSSL 1.1.1)
+        and hasattr(server_context, 'minimum_version')
+        and server_protocol == ssl.PROTOCOL_TLS
+        and server_context.minimum_version > min_version
+    ):
         # If OpenSSL configuration is strict and requires more recent TLS
         # version, we have to change the minimum to test old TLS versions.
-        server_context.minimum_version = min_version
+        with warnings_helper.check_warnings():
+            server_context.minimum_version = min_version
 
     # NOTE: we must enable "ALL" ciphers on the client, otherwise an
     # SSLv23 client will send an SSLv3 hello (rather than SSLv2)
@@ -2886,17 +2873,6 @@ def test_echo(self):
         """Basic test of an SSL client connecting to a server"""
         if support.verbose:
             sys.stdout.write("\n")
-        for protocol in PROTOCOLS:
-            if protocol in {ssl.PROTOCOL_TLS_CLIENT, ssl.PROTOCOL_TLS_SERVER}:
-                continue
-            if not has_tls_protocol(protocol):
-                continue
-            with self.subTest(protocol=ssl._PROTOCOL_NAMES[protocol]):
-                context = ssl.SSLContext(protocol)
-                context.load_cert_chain(CERTFILE)
-                seclevel_workaround(context)
-                server_params_test(context, context,
-                                   chatty=True, connectionchatty=True)
 
         client_context, server_context, hostname = testing_context()
 
@@ -3565,8 +3541,7 @@ def test_recv_send(self):
                                 server_side=False,
                                 certfile=CERTFILE,
                                 ca_certs=CERTFILE,
-                                cert_reqs=ssl.CERT_NONE,
-                                ssl_version=ssl.PROTOCOL_TLS_CLIENT)
+                                cert_reqs=ssl.CERT_NONE)
             s.connect((HOST, server.port))
             # helper methods for standardising recv* method signatures
             def _recv_into():
@@ -3718,8 +3693,7 @@ def test_nonblocking_send(self):
                                 server_side=False,
                                 certfile=CERTFILE,
                                 ca_certs=CERTFILE,
-                                cert_reqs=ssl.CERT_NONE,
-                                ssl_version=ssl.PROTOCOL_TLS_CLIENT)
+                                cert_reqs=ssl.CERT_NONE)
             s.connect((HOST, server.port))
             s.setblocking(False)
 
@@ -3788,14 +3762,11 @@ def serve():
     def test_server_accept(self):
         # Issue #16357: accept() on a SSLSocket created through
         # SSLContext.wrap_socket().
-        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        context.verify_mode = ssl.CERT_REQUIRED
-        context.load_verify_locations(SIGNING_CA)
-        context.load_cert_chain(SIGNED_CERTFILE)
+        client_ctx, server_ctx, hostname = testing_context()
         server = socket.socket(socket.AF_INET)
         host = "127.0.0.1"
         port = socket_helper.bind_port(server)
-        server = context.wrap_socket(server, server_side=True)
+        server = server_ctx.wrap_socket(server, server_side=True)
         self.assertTrue(server.server_side)
 
         evt = threading.Event()
@@ -3813,8 +3784,10 @@ def serve():
         t.start()
         # Client wait until server setup and perform a connect.
         evt.wait()
-        client = context.wrap_socket(socket.socket())
-        client.connect((host, port))
+        client = client_ctx.wrap_socket(
+            socket.socket(), server_hostname=hostname
+        )
+        client.connect((hostname, port))
         client.send(b'data')
         client.recv()
         client_addr = client.getsockname()
@@ -3827,14 +3800,16 @@ def serve():
         self.assertEqual(peer, client_addr)
 
     def test_getpeercert_enotconn(self):
-        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        context.check_hostname = False
         with context.wrap_socket(socket.socket()) as sock:
             with self.assertRaises(OSError) as cm:
                 sock.getpeercert()
             self.assertEqual(cm.exception.errno, errno.ENOTCONN)
 
     def test_do_handshake_enotconn(self):
-        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
+        context = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT)
+        context.check_hostname = False
         with context.wrap_socket(socket.socket()) as sock:
             with self.assertRaises(OSError) as cm:
                 sock.do_handshake()
@@ -3875,13 +3850,11 @@ def test_version_basic(self):
 
     @requires_tls_version('TLSv1_3')
     def test_tls1_3(self):
-        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        context.load_cert_chain(CERTFILE)
-        context.options |= (
-            ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1 | ssl.OP_NO_TLSv1_2
-        )
-        with ThreadedEchoServer(context=context) as server:
-            with context.wrap_socket(socket.socket()) as s:
+        client_context, server_context, hostname = testing_context()
+        client_context.minimum_version = ssl.TLSVersion.TLSv1_3
+        with ThreadedEchoServer(context=server_context) as server:
+            with client_context.wrap_socket(socket.socket(),
+                                            server_hostname=hostname) as s:
                 s.connect((HOST, server.port))
                 self.assertIn(s.cipher()[0], {
                     'TLS_AES_256_GCM_SHA384',
@@ -3892,6 +3865,8 @@ def test_tls1_3(self):
 
     @requires_minimum_version
     @requires_tls_version('TLSv1_2')
+    @requires_tls_version('TLSv1')
+    @ignore_deprecation
     def test_min_max_version_tlsv1_2(self):
         client_context, server_context, hostname = testing_context()
         # client TLSv1.0 to 1.2
@@ -3909,6 +3884,7 @@ def test_min_max_version_tlsv1_2(self):
 
     @requires_minimum_version
     @requires_tls_version('TLSv1_1')
+    @ignore_deprecation
     def test_min_max_version_tlsv1_1(self):
         client_context, server_context, hostname = testing_context()
         # client 1.0 to 1.2, server 1.0 to 1.1
@@ -3927,6 +3903,7 @@ def test_min_max_version_tlsv1_1(self):
     @requires_minimum_version
     @requires_tls_version('TLSv1_2')
     @requires_tls_version('TLSv1')
+    @ignore_deprecation
     def test_min_max_version_mismatch(self):
         client_context, server_context, hostname = testing_context()
         # client 1.0, server 1.2 (mismatch)
@@ -3962,17 +3939,17 @@ def test_min_max_version_sslv3(self):
     def test_default_ecdh_curve(self):
         # Issue #21015: elliptic curve-based Diffie Hellman key exchange
         # should be enabled by default on SSL contexts.
-        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        context.load_cert_chain(CERTFILE)
+        client_context, server_context, hostname = testing_context()
         # TLSv1.3 defaults to PFS key agreement and no longer has KEA in
         # cipher name.
-        context.options |= ssl.OP_NO_TLSv1_3
+        client_context.maximum_version = ssl.TLSVersion.TLSv1_2
         # Prior to OpenSSL 1.0.0, ECDH ciphers have to be enabled
         # explicitly using the 'ECCdraft' cipher alias.  Otherwise,
         # our default cipher list should prefer ECDH-based ciphers
         # automatically.
-        with ThreadedEchoServer(context=context) as server:
-            with context.wrap_socket(socket.socket()) as s:
+        with ThreadedEchoServer(context=server_context) as server:
+            with client_context.wrap_socket(socket.socket(),
+                                            server_hostname=hostname) as s:
                 s.connect((HOST, server.port))
                 self.assertIn("ECDH", s.cipher()[0])
 
@@ -4159,14 +4136,6 @@ def test_alpn_protocols(self):
             self.assertEqual(server_result, expected,
                              msg % (server_result, "server"))
 
-    def test_selected_npn_protocol(self):
-        # selected_npn_protocol() is None unless NPN is used
-        client_context, server_context, hostname = testing_context()
-        stats = server_params_test(client_context, server_context,
-                                   chatty=True, connectionchatty=True,
-                                   sni_name=hostname)
-        self.assertIs(stats['client_npn_protocol'], None)
-
     def test_npn_protocols(self):
         assert not ssl.HAS_NPN
 
@@ -4313,13 +4282,11 @@ def test_sendfile(self):
         with open(os_helper.TESTFN, 'wb') as f:
             f.write(TEST_DATA)
         self.addCleanup(os_helper.unlink, os_helper.TESTFN)
-        context = ssl.SSLContext(ssl.PROTOCOL_TLS)
-        context.verify_mode = ssl.CERT_REQUIRED
-        context.load_verify_locations(SIGNING_CA)
-        context.load_cert_chain(SIGNED_CERTFILE)
-        server = ThreadedEchoServer(context=context, chatty=False)
+        client_context, server_context, hostname = testing_context()
+        server = ThreadedEchoServer(context=server_context, chatty=False)
         with server:
-            with context.wrap_socket(socket.socket()) as s:
+            with client_context.wrap_socket(socket.socket(),
+                                            server_hostname=hostname) as s:
                 s.connect((HOST, server.port))
                 with open(os_helper.TESTFN, 'rb') as file:
                     s.sendfile(file)
@@ -4437,7 +4404,7 @@ def test_session_handling(self):
 class TestPostHandshakeAuth(unittest.TestCase):
     def test_pha_setter(self):
         protocols = [
-            ssl.PROTOCOL_TLS, ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
+            ssl.PROTOCOL_TLS_SERVER, ssl.PROTOCOL_TLS_CLIENT
         ]
         for protocol in protocols:
             ctx = ssl.SSLContext(protocol)
diff --git a/Misc/NEWS.d/next/Library/2021-04-17-13-53-33.bpo-43880.-fC2JD.rst b/Misc/NEWS.d/next/Library/2021-04-17-13-53-33.bpo-43880.-fC2JD.rst
new file mode 100644
index 0000000000000..b50680afc23fd
--- /dev/null
+++ b/Misc/NEWS.d/next/Library/2021-04-17-13-53-33.bpo-43880.-fC2JD.rst
@@ -0,0 +1,3 @@
+:mod:`ssl` now raises DeprecationWarning for OP_NO_SSL/TLS* options, old TLS
+versions, old protocols, and other features that have been deprecated since
+Python 3.6, 3.7, or OpenSSL 1.1.0.
diff --git a/Modules/_ssl.c b/Modules/_ssl.c
index f371d4210a488..934c59e26d2d4 100644
--- a/Modules/_ssl.c
+++ b/Modules/_ssl.c
@@ -682,6 +682,17 @@ _setSSLError (_sslmodulestate *state, const char *errstr, int errcode, const cha
     return NULL;
 }
 
+static int
+_ssl_deprecated(const char* name, int stacklevel) {
+    return PyErr_WarnFormat(
+        PyExc_DeprecationWarning, stacklevel,
+        "ssl module: %s is deprecated", name
+    );
+}
+
+#define PY_SSL_DEPRECATED(name, stacklevel, ret) \
+    if (_ssl_deprecated((name), (stacklevel)) == -1) return (ret)
+
 /*
  * SSL objects
  */
@@ -2863,6 +2874,7 @@ _ssl__SSLContext_impl(PyTypeObject *type, int proto_version)
 {
     PySSLContext *self;
     long options;
+    const SSL_METHOD *method = NULL;
     SSL_CTX *ctx = NULL;
     X509_VERIFY_PARAM *params;
     int result;
@@ -2876,54 +2888,62 @@ _ssl__SSLContext_impl(PyTypeObject *type, int proto_version)
         return NULL;
     }
 
-    PySSL_BEGIN_ALLOW_THREADS
     switch(proto_version) {
 #if defined(SSL3_VERSION) && !defined(OPENSSL_NO_SSL3)
     case PY_SSL_VERSION_SSL3:
-        ctx = SSL_CTX_new(SSLv3_method());
+        PY_SSL_DEPRECATED("PROTOCOL_SSLv3", 2, NULL);
+        method = SSLv3_method();
         break;
 #endif
 #if (defined(TLS1_VERSION) && \
         !defined(OPENSSL_NO_TLS1) && \
         !defined(OPENSSL_NO_TLS1_METHOD))
     case PY_SSL_VERSION_TLS1:
-        ctx = SSL_CTX_new(TLSv1_method());
+        PY_SSL_DEPRECATED("PROTOCOL_TLSv1", 2, NULL);
+        method = TLSv1_method();
         break;
 #endif
 #if (defined(TLS1_1_VERSION) && \
         !defined(OPENSSL_NO_TLS1_1) && \
         !defined(OPENSSL_NO_TLS1_1_METHOD))
     case PY_SSL_VERSION_TLS1_1:
-        ctx = SSL_CTX_new(TLSv1_1_method());
+        PY_SSL_DEPRECATED("PROTOCOL_TLSv1_1", 2, NULL);
+        method = TLSv1_1_method();
         break;
 #endif
 #if (defined(TLS1_2_VERSION) && \
         !defined(OPENSSL_NO_TLS1_2) && \
         !defined(OPENSSL_NO_TLS1_2_METHOD))
     case PY_SSL_VERSION_TLS1_2:
-        ctx = SSL_CTX_new(TLSv1_2_method());
+        PY_SSL_DEPRECATED("PROTOCOL_TLSv1_2", 2, NULL);
+        method = TLSv1_2_method();
         break;
 #endif
     case PY_SSL_VERSION_TLS:
-        /* SSLv23 */
-        ctx = SSL_CTX_new(TLS_method());
+        PY_SSL_DEPRECATED("PROTOCOL_TLS", 2, NULL);
+        method = TLS_method();
         break;
     case PY_SSL_VERSION_TLS_CLIENT:
-        ctx = SSL_CTX_new(TLS_client_method());
+        method = TLS_client_method();
         break;
     case PY_SSL_VERSION_TLS_SERVER:
-        ctx = SSL_CTX_new(TLS_server_method());
+        method = TLS_server_method();
         break;
     default:
-        proto_version = -1;
+        method = NULL;
     }
-    PySSL_END_ALLOW_THREADS
 
-    if (proto_version == -1) {
-        PyErr_SetString(PyExc_ValueError,
-                        "invalid or unsupported protocol version");
+    if (method == NULL) {
+        PyErr_Format(PyExc_ValueError,
+                     "invalid or unsupported protocol version %i",
+                     proto_version);
         return NULL;
     }
+
+    PySSL_BEGIN_ALLOW_THREADS
+    ctx = SSL_CTX_new(method);
+    PySSL_END_ALLOW_THREADS
+
     if (ctx == NULL) {
         _setSSLError(get_ssl_state(module), NULL, 0, __FILE__, __LINE__);
         return NULL;
@@ -3299,6 +3319,29 @@ set_min_max_proto_version(PySSLContext *self, PyObject *arg, int what)
         return -1;
     }
 
+    /* check for deprecations and supported values */
+    switch(v) {
+        case PY_PROTO_SSLv3:
+            PY_SSL_DEPRECATED("TLSVersion.SSLv3", 2, -1);
+            break;
+        case PY_PROTO_TLSv1:
+            PY_SSL_DEPRECATED("TLSVersion.TLSv1", 2, -1);
+            break;
+        case PY_PROTO_TLSv1_1:
+            PY_SSL_DEPRECATED("TLSVersion.TLSv1_1", 2, -1);
+            break;
+        case PY_PROTO_MINIMUM_SUPPORTED:
+        case PY_PROTO_MAXIMUM_SUPPORTED:
+        case PY_PROTO_TLSv1_2:
+        case PY_PROTO_TLSv1_3:
+            /* ok */
+            break;
+        default:
+            PyErr_Format(PyExc_ValueError,
+                     "Unsupported TLS/SSL version 0x%x", v);
+            return -1;
+    }
+
     if (what == 0) {
         switch(v) {
         case PY_PROTO_MINIMUM_SUPPORTED:
@@ -3417,11 +3460,23 @@ static int
 set_options(PySSLContext *self, PyObject *arg, void *c)
 {
     long new_opts, opts, set, clear;
+    long opt_no = (
+        SSL_OP_NO_SSLv2 | SSL_OP_NO_SSLv3 | SSL_OP_NO_TLSv1 |
+        SSL_OP_NO_TLSv1_1 | SSL_OP_NO_TLSv1_2 | SSL_OP_NO_TLSv1_2
+    );
+
     if (!PyArg_Parse(arg, "l", &new_opts))
         return -1;
     opts = SSL_CTX_get_options(self->ctx);
     clear = opts & ~new_opts;
     set = ~opts & new_opts;
+
+    if ((set & opt_no) != 0) {
+        if (_ssl_deprecated("Setting OP_NO_SSL* or SSL_NO_TLS* options is "
+                            "deprecated", 2) < 0) {
+            return -1;
+        }
+    }
     if (clear) {
         SSL_CTX_clear_options(self->ctx, clear);
     }
@@ -4961,6 +5016,7 @@ static PyObject *
 _ssl_RAND_pseudo_bytes_impl(PyObject *module, int n)
 /*[clinic end generated code: output=b1509e937000e52d input=58312bd53f9bbdd0]*/
 {
+    PY_SSL_DEPRECATED("RAND_pseudo_bytes", 1, NULL);
     return PySSL_RAND(module, n, 1);
 }
 



More information about the Python-checkins mailing list