1""" 
    2Module for using pyOpenSSL as a TLS backend. This module was relevant before 
    3the standard library ``ssl`` module supported SNI, but now that we've dropped 
    4support for Python 2.7 all relevant Python versions support SNI so 
    5**this module is no longer recommended**. 
    6 
    7This needs the following packages installed: 
    8 
    9* `pyOpenSSL`_ (tested with 16.0.0) 
    10* `cryptography`_ (minimum 1.3.4, from pyopenssl) 
    11* `idna`_ (minimum 2.0) 
    12 
    13However, pyOpenSSL depends on cryptography, so while we use all three directly here we 
    14end up having relatively few packages required. 
    15 
    16You can install them with the following command: 
    17 
    18.. code-block:: bash 
    19 
    20    $ python -m pip install pyopenssl cryptography idna 
    21 
    22To activate certificate checking, call 
    23:func:`~urllib3.contrib.pyopenssl.inject_into_urllib3` from your Python code 
    24before you begin making HTTP requests. This can be done in a ``sitecustomize`` 
    25module, or at any other time before your application begins using ``urllib3``, 
    26like this: 
    27 
    28.. code-block:: python 
    29 
    30    try: 
    31        import urllib3.contrib.pyopenssl 
    32        urllib3.contrib.pyopenssl.inject_into_urllib3() 
    33    except ImportError: 
    34        pass 
    35 
    36.. _pyopenssl: https://www.pyopenssl.org 
    37.. _cryptography: https://cryptography.io 
    38.. _idna: https://github.com/kjd/idna 
    39""" 
    40 
    41from __future__ import annotations 
    42 
    43import OpenSSL.SSL  # type: ignore[import-untyped] 
    44from cryptography import x509 
    45 
    46try: 
    47    from cryptography.x509 import UnsupportedExtension  # type: ignore[attr-defined] 
    48except ImportError: 
    49    # UnsupportedExtension is gone in cryptography >= 2.1.0 
    50    class UnsupportedExtension(Exception):  # type: ignore[no-redef] 
    51        pass 
    52 
    53 
    54import logging 
    55import ssl 
    56import typing 
    57from io import BytesIO 
    58from socket import socket as socket_cls 
    59from socket import timeout 
    60 
    61from .. import util 
    62 
    63if typing.TYPE_CHECKING: 
    64    from OpenSSL.crypto import X509  # type: ignore[import-untyped] 
    65 
    66 
    67__all__ = ["inject_into_urllib3", "extract_from_urllib3"] 
    68 
    69# Map from urllib3 to PyOpenSSL compatible parameter-values. 
    70_openssl_versions: dict[int, int] = { 
    71    util.ssl_.PROTOCOL_TLS: OpenSSL.SSL.SSLv23_METHOD,  # type: ignore[attr-defined] 
    72    util.ssl_.PROTOCOL_TLS_CLIENT: OpenSSL.SSL.SSLv23_METHOD,  # type: ignore[attr-defined] 
    73    ssl.PROTOCOL_TLSv1: OpenSSL.SSL.TLSv1_METHOD, 
    74} 
    75 
    76if hasattr(ssl, "PROTOCOL_TLSv1_1") and hasattr(OpenSSL.SSL, "TLSv1_1_METHOD"): 
    77    _openssl_versions[ssl.PROTOCOL_TLSv1_1] = OpenSSL.SSL.TLSv1_1_METHOD 
    78 
    79if hasattr(ssl, "PROTOCOL_TLSv1_2") and hasattr(OpenSSL.SSL, "TLSv1_2_METHOD"): 
    80    _openssl_versions[ssl.PROTOCOL_TLSv1_2] = OpenSSL.SSL.TLSv1_2_METHOD 
    81 
    82 
    83_stdlib_to_openssl_verify = { 
    84    ssl.CERT_NONE: OpenSSL.SSL.VERIFY_NONE, 
    85    ssl.CERT_OPTIONAL: OpenSSL.SSL.VERIFY_PEER, 
    86    ssl.CERT_REQUIRED: OpenSSL.SSL.VERIFY_PEER 
    87    + OpenSSL.SSL.VERIFY_FAIL_IF_NO_PEER_CERT, 
    88} 
    89_openssl_to_stdlib_verify = {v: k for k, v in _stdlib_to_openssl_verify.items()} 
    90 
    91# The SSLvX values are the most likely to be missing in the future 
    92# but we check them all just to be sure. 
    93_OP_NO_SSLv2_OR_SSLv3: int = getattr(OpenSSL.SSL, "OP_NO_SSLv2", 0) | getattr( 
    94    OpenSSL.SSL, "OP_NO_SSLv3", 0 
    95) 
    96_OP_NO_TLSv1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1", 0) 
    97_OP_NO_TLSv1_1: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_1", 0) 
    98_OP_NO_TLSv1_2: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_2", 0) 
    99_OP_NO_TLSv1_3: int = getattr(OpenSSL.SSL, "OP_NO_TLSv1_3", 0) 
    100 
    101_openssl_to_ssl_minimum_version: dict[int, int] = { 
    102    ssl.TLSVersion.MINIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 
    103    ssl.TLSVersion.TLSv1: _OP_NO_SSLv2_OR_SSLv3, 
    104    ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1, 
    105    ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1, 
    106    ssl.TLSVersion.TLSv1_3: ( 
    107        _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 
    108    ), 
    109    ssl.TLSVersion.MAXIMUM_SUPPORTED: ( 
    110        _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 
    111    ), 
    112} 
    113_openssl_to_ssl_maximum_version: dict[int, int] = { 
    114    ssl.TLSVersion.MINIMUM_SUPPORTED: ( 
    115        _OP_NO_SSLv2_OR_SSLv3 
    116        | _OP_NO_TLSv1 
    117        | _OP_NO_TLSv1_1 
    118        | _OP_NO_TLSv1_2 
    119        | _OP_NO_TLSv1_3 
    120    ), 
    121    ssl.TLSVersion.TLSv1: ( 
    122        _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_1 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3 
    123    ), 
    124    ssl.TLSVersion.TLSv1_1: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_2 | _OP_NO_TLSv1_3, 
    125    ssl.TLSVersion.TLSv1_2: _OP_NO_SSLv2_OR_SSLv3 | _OP_NO_TLSv1_3, 
    126    ssl.TLSVersion.TLSv1_3: _OP_NO_SSLv2_OR_SSLv3, 
    127    ssl.TLSVersion.MAXIMUM_SUPPORTED: _OP_NO_SSLv2_OR_SSLv3, 
    128} 
    129 
    130# OpenSSL will only write 16K at a time 
    131SSL_WRITE_BLOCKSIZE = 16384 
    132 
    133orig_util_SSLContext = util.ssl_.SSLContext 
    134 
    135 
    136log = logging.getLogger(__name__) 
    137 
    138 
    139def inject_into_urllib3() -> None: 
    140    "Monkey-patch urllib3 with PyOpenSSL-backed SSL-support." 
    141 
    142    _validate_dependencies_met() 
    143 
    144    util.SSLContext = PyOpenSSLContext  # type: ignore[assignment] 
    145    util.ssl_.SSLContext = PyOpenSSLContext  # type: ignore[assignment] 
    146    util.IS_PYOPENSSL = True 
    147    util.ssl_.IS_PYOPENSSL = True 
    148 
    149 
    150def extract_from_urllib3() -> None: 
    151    "Undo monkey-patching by :func:`inject_into_urllib3`." 
    152 
    153    util.SSLContext = orig_util_SSLContext 
    154    util.ssl_.SSLContext = orig_util_SSLContext 
    155    util.IS_PYOPENSSL = False 
    156    util.ssl_.IS_PYOPENSSL = False 
    157 
    158 
    159def _validate_dependencies_met() -> None: 
    160    """ 
    161    Verifies that PyOpenSSL's package-level dependencies have been met. 
    162    Throws `ImportError` if they are not met. 
    163    """ 
    164    # Method added in `cryptography==1.1`; not available in older versions 
    165    from cryptography.x509.extensions import Extensions 
    166 
    167    if getattr(Extensions, "get_extension_for_class", None) is None: 
    168        raise ImportError( 
    169            "'cryptography' module missing required functionality.  " 
    170            "Try upgrading to v1.3.4 or newer." 
    171        ) 
    172 
    173    # pyOpenSSL 0.14 and above use cryptography for OpenSSL bindings. The _x509 
    174    # attribute is only present on those versions. 
    175    from OpenSSL.crypto import X509 
    176 
    177    x509 = X509() 
    178    if getattr(x509, "_x509", None) is None: 
    179        raise ImportError( 
    180            "'pyOpenSSL' module missing required functionality. " 
    181            "Try upgrading to v0.14 or newer." 
    182        ) 
    183 
    184 
    185def _dnsname_to_stdlib(name: str) -> str | None: 
    186    """ 
    187    Converts a dNSName SubjectAlternativeName field to the form used by the 
    188    standard library on the given Python version. 
    189 
    190    Cryptography produces a dNSName as a unicode string that was idna-decoded 
    191    from ASCII bytes. We need to idna-encode that string to get it back, and 
    192    then on Python 3 we also need to convert to unicode via UTF-8 (the stdlib 
    193    uses PyUnicode_FromStringAndSize on it, which decodes via UTF-8). 
    194 
    195    If the name cannot be idna-encoded then we return None signalling that 
    196    the name given should be skipped. 
    197    """ 
    198 
    199    def idna_encode(name: str) -> bytes | None: 
    200        """ 
    201        Borrowed wholesale from the Python Cryptography Project. It turns out 
    202        that we can't just safely call `idna.encode`: it can explode for 
    203        wildcard names. This avoids that problem. 
    204        """ 
    205        import idna 
    206 
    207        try: 
    208            for prefix in ["*.", "."]: 
    209                if name.startswith(prefix): 
    210                    name = name[len(prefix) :] 
    211                    return prefix.encode("ascii") + idna.encode(name) 
    212            return idna.encode(name) 
    213        except idna.core.IDNAError: 
    214            return None 
    215 
    216    # Don't send IPv6 addresses through the IDNA encoder. 
    217    if ":" in name: 
    218        return name 
    219 
    220    encoded_name = idna_encode(name) 
    221    if encoded_name is None: 
    222        return None 
    223    return encoded_name.decode("utf-8") 
    224 
    225 
    226def get_subj_alt_name(peer_cert: X509) -> list[tuple[str, str]]: 
    227    """ 
    228    Given an PyOpenSSL certificate, provides all the subject alternative names. 
    229    """ 
    230    cert = peer_cert.to_cryptography() 
    231 
    232    # We want to find the SAN extension. Ask Cryptography to locate it (it's 
    233    # faster than looping in Python) 
    234    try: 
    235        ext = cert.extensions.get_extension_for_class(x509.SubjectAlternativeName).value 
    236    except x509.ExtensionNotFound: 
    237        # No such extension, return the empty list. 
    238        return [] 
    239    except ( 
    240        x509.DuplicateExtension, 
    241        UnsupportedExtension, 
    242        x509.UnsupportedGeneralNameType, 
    243        UnicodeError, 
    244    ) as e: 
    245        # A problem has been found with the quality of the certificate. Assume 
    246        # no SAN field is present. 
    247        log.warning( 
    248            "A problem was encountered with the certificate that prevented " 
    249            "urllib3 from finding the SubjectAlternativeName field. This can " 
    250            "affect certificate validation. The error was %s", 
    251            e, 
    252        ) 
    253        return [] 
    254 
    255    # We want to return dNSName and iPAddress fields. We need to cast the IPs 
    256    # back to strings because the match_hostname function wants them as 
    257    # strings. 
    258    # Sadly the DNS names need to be idna encoded and then, on Python 3, UTF-8 
    259    # decoded. This is pretty frustrating, but that's what the standard library 
    260    # does with certificates, and so we need to attempt to do the same. 
    261    # We also want to skip over names which cannot be idna encoded. 
    262    names = [ 
    263        ("DNS", name) 
    264        for name in map(_dnsname_to_stdlib, ext.get_values_for_type(x509.DNSName)) 
    265        if name is not None 
    266    ] 
    267    names.extend( 
    268        ("IP Address", str(name)) for name in ext.get_values_for_type(x509.IPAddress) 
    269    ) 
    270 
    271    return names 
    272 
    273 
    274class WrappedSocket: 
    275    """API-compatibility wrapper for Python OpenSSL's Connection-class.""" 
    276 
    277    def __init__( 
    278        self, 
    279        connection: OpenSSL.SSL.Connection, 
    280        socket: socket_cls, 
    281        suppress_ragged_eofs: bool = True, 
    282    ) -> None: 
    283        self.connection = connection 
    284        self.socket = socket 
    285        self.suppress_ragged_eofs = suppress_ragged_eofs 
    286        self._io_refs = 0 
    287        self._closed = False 
    288 
    289    def fileno(self) -> int: 
    290        return self.socket.fileno() 
    291 
    292    # Copy-pasted from Python 3.5 source code 
    293    def _decref_socketios(self) -> None: 
    294        if self._io_refs > 0: 
    295            self._io_refs -= 1 
    296        if self._closed: 
    297            self.close() 
    298 
    299    def recv(self, *args: typing.Any, **kwargs: typing.Any) -> bytes: 
    300        try: 
    301            data = self.connection.recv(*args, **kwargs) 
    302        except OpenSSL.SSL.SysCallError as e: 
    303            if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 
    304                return b"" 
    305            else: 
    306                raise OSError(e.args[0], str(e)) from e 
    307        except OpenSSL.SSL.ZeroReturnError: 
    308            if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 
    309                return b"" 
    310            else: 
    311                raise 
    312        except OpenSSL.SSL.WantReadError as e: 
    313            if not util.wait_for_read(self.socket, self.socket.gettimeout()): 
    314                raise timeout("The read operation timed out") from e 
    315            else: 
    316                return self.recv(*args, **kwargs) 
    317 
    318        # TLS 1.3 post-handshake authentication 
    319        except OpenSSL.SSL.Error as e: 
    320            raise ssl.SSLError(f"read error: {e!r}") from e 
    321        else: 
    322            return data  # type: ignore[no-any-return] 
    323 
    324    def recv_into(self, *args: typing.Any, **kwargs: typing.Any) -> int: 
    325        try: 
    326            return self.connection.recv_into(*args, **kwargs)  # type: ignore[no-any-return] 
    327        except OpenSSL.SSL.SysCallError as e: 
    328            if self.suppress_ragged_eofs and e.args == (-1, "Unexpected EOF"): 
    329                return 0 
    330            else: 
    331                raise OSError(e.args[0], str(e)) from e 
    332        except OpenSSL.SSL.ZeroReturnError: 
    333            if self.connection.get_shutdown() == OpenSSL.SSL.RECEIVED_SHUTDOWN: 
    334                return 0 
    335            else: 
    336                raise 
    337        except OpenSSL.SSL.WantReadError as e: 
    338            if not util.wait_for_read(self.socket, self.socket.gettimeout()): 
    339                raise timeout("The read operation timed out") from e 
    340            else: 
    341                return self.recv_into(*args, **kwargs) 
    342 
    343        # TLS 1.3 post-handshake authentication 
    344        except OpenSSL.SSL.Error as e: 
    345            raise ssl.SSLError(f"read error: {e!r}") from e 
    346 
    347    def settimeout(self, timeout: float) -> None: 
    348        return self.socket.settimeout(timeout) 
    349 
    350    def _send_until_done(self, data: bytes) -> int: 
    351        while True: 
    352            try: 
    353                return self.connection.send(data)  # type: ignore[no-any-return] 
    354            except OpenSSL.SSL.WantWriteError as e: 
    355                if not util.wait_for_write(self.socket, self.socket.gettimeout()): 
    356                    raise timeout() from e 
    357                continue 
    358            except OpenSSL.SSL.SysCallError as e: 
    359                raise OSError(e.args[0], str(e)) from e 
    360 
    361    def sendall(self, data: bytes) -> None: 
    362        total_sent = 0 
    363        while total_sent < len(data): 
    364            sent = self._send_until_done( 
    365                data[total_sent : total_sent + SSL_WRITE_BLOCKSIZE] 
    366            ) 
    367            total_sent += sent 
    368 
    369    def shutdown(self, how: int) -> None: 
    370        try: 
    371            self.connection.shutdown() 
    372        except OpenSSL.SSL.Error as e: 
    373            raise ssl.SSLError(f"shutdown error: {e!r}") from e 
    374 
    375    def close(self) -> None: 
    376        self._closed = True 
    377        if self._io_refs <= 0: 
    378            self._real_close() 
    379 
    380    def _real_close(self) -> None: 
    381        try: 
    382            return self.connection.close()  # type: ignore[no-any-return] 
    383        except OpenSSL.SSL.Error: 
    384            return 
    385 
    386    def getpeercert( 
    387        self, binary_form: bool = False 
    388    ) -> dict[str, list[typing.Any]] | None: 
    389        x509 = self.connection.get_peer_certificate() 
    390 
    391        if not x509: 
    392            return x509  # type: ignore[no-any-return] 
    393 
    394        if binary_form: 
    395            return OpenSSL.crypto.dump_certificate(OpenSSL.crypto.FILETYPE_ASN1, x509)  # type: ignore[no-any-return] 
    396 
    397        return { 
    398            "subject": ((("commonName", x509.get_subject().CN),),),  # type: ignore[dict-item] 
    399            "subjectAltName": get_subj_alt_name(x509), 
    400        } 
    401 
    402    def version(self) -> str: 
    403        return self.connection.get_protocol_version_name()  # type: ignore[no-any-return] 
    404 
    405    def selected_alpn_protocol(self) -> str | None: 
    406        alpn_proto = self.connection.get_alpn_proto_negotiated() 
    407        return alpn_proto.decode() if alpn_proto else None 
    408 
    409 
    410WrappedSocket.makefile = socket_cls.makefile  # type: ignore[attr-defined] 
    411 
    412 
    413class PyOpenSSLContext: 
    414    """ 
    415    I am a wrapper class for the PyOpenSSL ``Context`` object. I am responsible 
    416    for translating the interface of the standard library ``SSLContext`` object 
    417    to calls into PyOpenSSL. 
    418    """ 
    419 
    420    def __init__(self, protocol: int) -> None: 
    421        self.protocol = _openssl_versions[protocol] 
    422        self._ctx = OpenSSL.SSL.Context(self.protocol) 
    423        self._options = 0 
    424        self.check_hostname = False 
    425        self._minimum_version: int = ssl.TLSVersion.MINIMUM_SUPPORTED 
    426        self._maximum_version: int = ssl.TLSVersion.MAXIMUM_SUPPORTED 
    427        self._verify_flags: int = ssl.VERIFY_X509_TRUSTED_FIRST 
    428 
    429    @property 
    430    def options(self) -> int: 
    431        return self._options 
    432 
    433    @options.setter 
    434    def options(self, value: int) -> None: 
    435        self._options = value 
    436        self._set_ctx_options() 
    437 
    438    @property 
    439    def verify_flags(self) -> int: 
    440        return self._verify_flags 
    441 
    442    @verify_flags.setter 
    443    def verify_flags(self, value: int) -> None: 
    444        self._verify_flags = value 
    445        self._ctx.get_cert_store().set_flags(self._verify_flags) 
    446 
    447    @property 
    448    def verify_mode(self) -> int: 
    449        return _openssl_to_stdlib_verify[self._ctx.get_verify_mode()] 
    450 
    451    @verify_mode.setter 
    452    def verify_mode(self, value: ssl.VerifyMode) -> None: 
    453        self._ctx.set_verify(_stdlib_to_openssl_verify[value], _verify_callback) 
    454 
    455    def set_default_verify_paths(self) -> None: 
    456        self._ctx.set_default_verify_paths() 
    457 
    458    def set_ciphers(self, ciphers: bytes | str) -> None: 
    459        if isinstance(ciphers, str): 
    460            ciphers = ciphers.encode("utf-8") 
    461        self._ctx.set_cipher_list(ciphers) 
    462 
    463    def load_verify_locations( 
    464        self, 
    465        cafile: str | None = None, 
    466        capath: str | None = None, 
    467        cadata: bytes | None = None, 
    468    ) -> None: 
    469        if cafile is not None: 
    470            cafile = cafile.encode("utf-8")  # type: ignore[assignment] 
    471        if capath is not None: 
    472            capath = capath.encode("utf-8")  # type: ignore[assignment] 
    473        try: 
    474            self._ctx.load_verify_locations(cafile, capath) 
    475            if cadata is not None: 
    476                self._ctx.load_verify_locations(BytesIO(cadata)) 
    477        except OpenSSL.SSL.Error as e: 
    478            raise ssl.SSLError(f"unable to load trusted certificates: {e!r}") from e 
    479 
    480    def load_cert_chain( 
    481        self, 
    482        certfile: str, 
    483        keyfile: str | None = None, 
    484        password: str | None = None, 
    485    ) -> None: 
    486        try: 
    487            self._ctx.use_certificate_chain_file(certfile) 
    488            if password is not None: 
    489                if not isinstance(password, bytes): 
    490                    password = password.encode("utf-8")  # type: ignore[assignment] 
    491                self._ctx.set_passwd_cb(lambda *_: password) 
    492            self._ctx.use_privatekey_file(keyfile or certfile) 
    493        except OpenSSL.SSL.Error as e: 
    494            raise ssl.SSLError(f"Unable to load certificate chain: {e!r}") from e 
    495 
    496    def set_alpn_protocols(self, protocols: list[bytes | str]) -> None: 
    497        protocols = [util.util.to_bytes(p, "ascii") for p in protocols] 
    498        return self._ctx.set_alpn_protos(protocols)  # type: ignore[no-any-return] 
    499 
    500    def wrap_socket( 
    501        self, 
    502        sock: socket_cls, 
    503        server_side: bool = False, 
    504        do_handshake_on_connect: bool = True, 
    505        suppress_ragged_eofs: bool = True, 
    506        server_hostname: bytes | str | None = None, 
    507    ) -> WrappedSocket: 
    508        cnx = OpenSSL.SSL.Connection(self._ctx, sock) 
    509 
    510        # If server_hostname is an IP, don't use it for SNI, per RFC6066 Section 3 
    511        if server_hostname and not util.ssl_.is_ipaddress(server_hostname): 
    512            if isinstance(server_hostname, str): 
    513                server_hostname = server_hostname.encode("utf-8") 
    514            cnx.set_tlsext_host_name(server_hostname) 
    515 
    516        cnx.set_connect_state() 
    517 
    518        while True: 
    519            try: 
    520                cnx.do_handshake() 
    521            except OpenSSL.SSL.WantReadError as e: 
    522                if not util.wait_for_read(sock, sock.gettimeout()): 
    523                    raise timeout("select timed out") from e 
    524                continue 
    525            except OpenSSL.SSL.Error as e: 
    526                raise ssl.SSLError(f"bad handshake: {e!r}") from e 
    527            break 
    528 
    529        return WrappedSocket(cnx, sock) 
    530 
    531    def _set_ctx_options(self) -> None: 
    532        self._ctx.set_options( 
    533            self._options 
    534            | _openssl_to_ssl_minimum_version[self._minimum_version] 
    535            | _openssl_to_ssl_maximum_version[self._maximum_version] 
    536        ) 
    537 
    538    @property 
    539    def minimum_version(self) -> int: 
    540        return self._minimum_version 
    541 
    542    @minimum_version.setter 
    543    def minimum_version(self, minimum_version: int) -> None: 
    544        self._minimum_version = minimum_version 
    545        self._set_ctx_options() 
    546 
    547    @property 
    548    def maximum_version(self) -> int: 
    549        return self._maximum_version 
    550 
    551    @maximum_version.setter 
    552    def maximum_version(self, maximum_version: int) -> None: 
    553        self._maximum_version = maximum_version 
    554        self._set_ctx_options() 
    555 
    556 
    557def _verify_callback( 
    558    cnx: OpenSSL.SSL.Connection, 
    559    x509: X509, 
    560    err_no: int, 
    561    err_depth: int, 
    562    return_code: int, 
    563) -> bool: 
    564    return err_no == 0