Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/ssl.py: 37%

777 statements  

« prev     ^ index     » next       coverage.py v7.0.1, created at 2022-12-25 06:11 +0000

1# Wrapper module for _ssl, providing some additional facilities 

2# implemented in Python. Written by Bill Janssen. 

3 

4"""This module provides some more Pythonic support for SSL. 

5 

6Object types: 

7 

8 SSLSocket -- subtype of socket.socket which does SSL over the socket 

9 

10Exceptions: 

11 

12 SSLError -- exception raised for I/O errors 

13 

14Functions: 

15 

16 cert_time_to_seconds -- convert time string used for certificate 

17 notBefore and notAfter functions to integer 

18 seconds past the Epoch (the time values 

19 returned from time.time()) 

20 

21 fetch_server_certificate (HOST, PORT) -- fetch the certificate provided 

22 by the server running on HOST at port PORT. No 

23 validation of the certificate is performed. 

24 

25Integer constants: 

26 

27SSL_ERROR_ZERO_RETURN 

28SSL_ERROR_WANT_READ 

29SSL_ERROR_WANT_WRITE 

30SSL_ERROR_WANT_X509_LOOKUP 

31SSL_ERROR_SYSCALL 

32SSL_ERROR_SSL 

33SSL_ERROR_WANT_CONNECT 

34 

35SSL_ERROR_EOF 

36SSL_ERROR_INVALID_ERROR_CODE 

37 

38The following group define certificate requirements that one side is 

39allowing/requiring from the other side: 

40 

41CERT_NONE - no certificates from the other side are required (or will 

42 be looked at if provided) 

43CERT_OPTIONAL - certificates are not required, but if provided will be 

44 validated, and if validation fails, the connection will 

45 also fail 

46CERT_REQUIRED - certificates are required, and will be validated, and 

47 if validation fails, the connection will also fail 

48 

49The following constants identify various SSL protocol variants: 

50 

51PROTOCOL_SSLv2 

52PROTOCOL_SSLv3 

53PROTOCOL_SSLv23 

54PROTOCOL_TLS 

55PROTOCOL_TLS_CLIENT 

56PROTOCOL_TLS_SERVER 

57PROTOCOL_TLSv1 

58PROTOCOL_TLSv1_1 

59PROTOCOL_TLSv1_2 

60 

61The following constants identify various SSL alert message descriptions as per 

62http://www.iana.org/assignments/tls-parameters/tls-parameters.xml#tls-parameters-6 

63 

64ALERT_DESCRIPTION_CLOSE_NOTIFY 

65ALERT_DESCRIPTION_UNEXPECTED_MESSAGE 

66ALERT_DESCRIPTION_BAD_RECORD_MAC 

67ALERT_DESCRIPTION_RECORD_OVERFLOW 

68ALERT_DESCRIPTION_DECOMPRESSION_FAILURE 

69ALERT_DESCRIPTION_HANDSHAKE_FAILURE 

70ALERT_DESCRIPTION_BAD_CERTIFICATE 

71ALERT_DESCRIPTION_UNSUPPORTED_CERTIFICATE 

72ALERT_DESCRIPTION_CERTIFICATE_REVOKED 

73ALERT_DESCRIPTION_CERTIFICATE_EXPIRED 

74ALERT_DESCRIPTION_CERTIFICATE_UNKNOWN 

75ALERT_DESCRIPTION_ILLEGAL_PARAMETER 

76ALERT_DESCRIPTION_UNKNOWN_CA 

77ALERT_DESCRIPTION_ACCESS_DENIED 

78ALERT_DESCRIPTION_DECODE_ERROR 

79ALERT_DESCRIPTION_DECRYPT_ERROR 

80ALERT_DESCRIPTION_PROTOCOL_VERSION 

81ALERT_DESCRIPTION_INSUFFICIENT_SECURITY 

82ALERT_DESCRIPTION_INTERNAL_ERROR 

83ALERT_DESCRIPTION_USER_CANCELLED 

84ALERT_DESCRIPTION_NO_RENEGOTIATION 

85ALERT_DESCRIPTION_UNSUPPORTED_EXTENSION 

86ALERT_DESCRIPTION_CERTIFICATE_UNOBTAINABLE 

87ALERT_DESCRIPTION_UNRECOGNIZED_NAME 

88ALERT_DESCRIPTION_BAD_CERTIFICATE_STATUS_RESPONSE 

89ALERT_DESCRIPTION_BAD_CERTIFICATE_HASH_VALUE 

90ALERT_DESCRIPTION_UNKNOWN_PSK_IDENTITY 

91""" 

92 

93import sys 

94import os 

95from collections import namedtuple 

96from enum import Enum as _Enum, IntEnum as _IntEnum, IntFlag as _IntFlag 

97 

98import _ssl # if we can't import it, let the error propagate 

99 

100from _ssl import OPENSSL_VERSION_NUMBER, OPENSSL_VERSION_INFO, OPENSSL_VERSION 

101from _ssl import _SSLContext, MemoryBIO, SSLSession 

102from _ssl import ( 

103 SSLError, SSLZeroReturnError, SSLWantReadError, SSLWantWriteError, 

104 SSLSyscallError, SSLEOFError, SSLCertVerificationError 

105 ) 

106from _ssl import txt2obj as _txt2obj, nid2obj as _nid2obj 

107from _ssl import RAND_status, RAND_add, RAND_bytes, RAND_pseudo_bytes 

108try: 

109 from _ssl import RAND_egd 

110except ImportError: 

111 # LibreSSL does not provide RAND_egd 

112 pass 

113 

114 

115from _ssl import ( 

116 HAS_SNI, HAS_ECDH, HAS_NPN, HAS_ALPN, HAS_SSLv2, HAS_SSLv3, HAS_TLSv1, 

117 HAS_TLSv1_1, HAS_TLSv1_2, HAS_TLSv1_3 

118) 

119from _ssl import _DEFAULT_CIPHERS, _OPENSSL_API_VERSION 

120 

121 

122_IntEnum._convert_( 

123 '_SSLMethod', __name__, 

124 lambda name: name.startswith('PROTOCOL_') and name != 'PROTOCOL_SSLv23', 

125 source=_ssl) 

126 

127_IntFlag._convert_( 

128 'Options', __name__, 

129 lambda name: name.startswith('OP_'), 

130 source=_ssl) 

131 

132_IntEnum._convert_( 

133 'AlertDescription', __name__, 

134 lambda name: name.startswith('ALERT_DESCRIPTION_'), 

135 source=_ssl) 

136 

137_IntEnum._convert_( 

138 'SSLErrorNumber', __name__, 

139 lambda name: name.startswith('SSL_ERROR_'), 

140 source=_ssl) 

141 

142_IntFlag._convert_( 

143 'VerifyFlags', __name__, 

144 lambda name: name.startswith('VERIFY_'), 

145 source=_ssl) 

146 

147_IntEnum._convert_( 

148 'VerifyMode', __name__, 

149 lambda name: name.startswith('CERT_'), 

150 source=_ssl) 

151 

152PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_SSLv23 = _SSLMethod.PROTOCOL_TLS 

153_PROTOCOL_NAMES = {value: name for name, value in _SSLMethod.__members__.items()} 

154 

155_SSLv2_IF_EXISTS = getattr(_SSLMethod, 'PROTOCOL_SSLv2', None) 

156 

157 

158class TLSVersion(_IntEnum): 

159 MINIMUM_SUPPORTED = _ssl.PROTO_MINIMUM_SUPPORTED 

160 SSLv3 = _ssl.PROTO_SSLv3 

161 TLSv1 = _ssl.PROTO_TLSv1 

162 TLSv1_1 = _ssl.PROTO_TLSv1_1 

163 TLSv1_2 = _ssl.PROTO_TLSv1_2 

164 TLSv1_3 = _ssl.PROTO_TLSv1_3 

165 MAXIMUM_SUPPORTED = _ssl.PROTO_MAXIMUM_SUPPORTED 

166 

167 

168class _TLSContentType(_IntEnum): 

169 """Content types (record layer) 

170 

171 See RFC 8446, section B.1 

172 """ 

173 CHANGE_CIPHER_SPEC = 20 

174 ALERT = 21 

175 HANDSHAKE = 22 

176 APPLICATION_DATA = 23 

177 # pseudo content types 

178 HEADER = 0x100 

179 INNER_CONTENT_TYPE = 0x101 

180 

181 

182class _TLSAlertType(_IntEnum): 

183 """Alert types for TLSContentType.ALERT messages 

184 

185 See RFC 8466, section B.2 

186 """ 

187 CLOSE_NOTIFY = 0 

188 UNEXPECTED_MESSAGE = 10 

189 BAD_RECORD_MAC = 20 

190 DECRYPTION_FAILED = 21 

191 RECORD_OVERFLOW = 22 

192 DECOMPRESSION_FAILURE = 30 

193 HANDSHAKE_FAILURE = 40 

194 NO_CERTIFICATE = 41 

195 BAD_CERTIFICATE = 42 

196 UNSUPPORTED_CERTIFICATE = 43 

197 CERTIFICATE_REVOKED = 44 

198 CERTIFICATE_EXPIRED = 45 

199 CERTIFICATE_UNKNOWN = 46 

200 ILLEGAL_PARAMETER = 47 

201 UNKNOWN_CA = 48 

202 ACCESS_DENIED = 49 

203 DECODE_ERROR = 50 

204 DECRYPT_ERROR = 51 

205 EXPORT_RESTRICTION = 60 

206 PROTOCOL_VERSION = 70 

207 INSUFFICIENT_SECURITY = 71 

208 INTERNAL_ERROR = 80 

209 INAPPROPRIATE_FALLBACK = 86 

210 USER_CANCELED = 90 

211 NO_RENEGOTIATION = 100 

212 MISSING_EXTENSION = 109 

213 UNSUPPORTED_EXTENSION = 110 

214 CERTIFICATE_UNOBTAINABLE = 111 

215 UNRECOGNIZED_NAME = 112 

216 BAD_CERTIFICATE_STATUS_RESPONSE = 113 

217 BAD_CERTIFICATE_HASH_VALUE = 114 

218 UNKNOWN_PSK_IDENTITY = 115 

219 CERTIFICATE_REQUIRED = 116 

220 NO_APPLICATION_PROTOCOL = 120 

221 

222 

223class _TLSMessageType(_IntEnum): 

224 """Message types (handshake protocol) 

225 

226 See RFC 8446, section B.3 

227 """ 

228 HELLO_REQUEST = 0 

229 CLIENT_HELLO = 1 

230 SERVER_HELLO = 2 

231 HELLO_VERIFY_REQUEST = 3 

232 NEWSESSION_TICKET = 4 

233 END_OF_EARLY_DATA = 5 

234 HELLO_RETRY_REQUEST = 6 

235 ENCRYPTED_EXTENSIONS = 8 

236 CERTIFICATE = 11 

237 SERVER_KEY_EXCHANGE = 12 

238 CERTIFICATE_REQUEST = 13 

239 SERVER_DONE = 14 

240 CERTIFICATE_VERIFY = 15 

241 CLIENT_KEY_EXCHANGE = 16 

242 FINISHED = 20 

243 CERTIFICATE_URL = 21 

244 CERTIFICATE_STATUS = 22 

245 SUPPLEMENTAL_DATA = 23 

246 KEY_UPDATE = 24 

247 NEXT_PROTO = 67 

248 MESSAGE_HASH = 254 

249 CHANGE_CIPHER_SPEC = 0x0101 

250 

251 

252if sys.platform == "win32": 

253 from _ssl import enum_certificates, enum_crls 

254 

255from socket import socket, AF_INET, SOCK_STREAM, create_connection 

256from socket import SOL_SOCKET, SO_TYPE 

257import socket as _socket 

258import base64 # for DER-to-PEM translation 

259import errno 

260import warnings 

261 

262 

263socket_error = OSError # keep that public name in module namespace 

264 

265CHANNEL_BINDING_TYPES = ['tls-unique'] 

266 

267HAS_NEVER_CHECK_COMMON_NAME = hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT') 

268 

269 

270_RESTRICTED_SERVER_CIPHERS = _DEFAULT_CIPHERS 

271 

272CertificateError = SSLCertVerificationError 

273 

274 

275def _dnsname_match(dn, hostname): 

276 """Matching according to RFC 6125, section 6.4.3 

277 

278 - Hostnames are compared lower case. 

279 - For IDNA, both dn and hostname must be encoded as IDN A-label (ACE). 

280 - Partial wildcards like 'www*.example.org', multiple wildcards, sole 

281 wildcard or wildcards in labels other then the left-most label are not 

282 supported and a CertificateError is raised. 

283 - A wildcard must match at least one character. 

284 """ 

285 if not dn: 

286 return False 

287 

288 wildcards = dn.count('*') 

289 # speed up common case w/o wildcards 

290 if not wildcards: 

291 return dn.lower() == hostname.lower() 

292 

293 if wildcards > 1: 

294 raise CertificateError( 

295 "too many wildcards in certificate DNS name: {!r}.".format(dn)) 

296 

297 dn_leftmost, sep, dn_remainder = dn.partition('.') 

298 

299 if '*' in dn_remainder: 

300 # Only match wildcard in leftmost segment. 

301 raise CertificateError( 

302 "wildcard can only be present in the leftmost label: " 

303 "{!r}.".format(dn)) 

304 

305 if not sep: 

306 # no right side 

307 raise CertificateError( 

308 "sole wildcard without additional labels are not support: " 

309 "{!r}.".format(dn)) 

310 

311 if dn_leftmost != '*': 

312 # no partial wildcard matching 

313 raise CertificateError( 

314 "partial wildcards in leftmost label are not supported: " 

315 "{!r}.".format(dn)) 

316 

317 hostname_leftmost, sep, hostname_remainder = hostname.partition('.') 

318 if not hostname_leftmost or not sep: 

319 # wildcard must match at least one char 

320 return False 

321 return dn_remainder.lower() == hostname_remainder.lower() 

322 

323 

324def _inet_paton(ipname): 

325 """Try to convert an IP address to packed binary form 

326 

327 Supports IPv4 addresses on all platforms and IPv6 on platforms with IPv6 

328 support. 

329 """ 

330 # inet_aton() also accepts strings like '1', '127.1', some also trailing 

331 # data like '127.0.0.1 whatever'. 

332 try: 

333 addr = _socket.inet_aton(ipname) 

334 except OSError: 

335 # not an IPv4 address 

336 pass 

337 else: 

338 if _socket.inet_ntoa(addr) == ipname: 

339 # only accept injective ipnames 

340 return addr 

341 else: 

342 # refuse for short IPv4 notation and additional trailing data 

343 raise ValueError( 

344 "{!r} is not a quad-dotted IPv4 address.".format(ipname) 

345 ) 

346 

347 try: 

348 return _socket.inet_pton(_socket.AF_INET6, ipname) 

349 except OSError: 

350 raise ValueError("{!r} is neither an IPv4 nor an IP6 " 

351 "address.".format(ipname)) 

352 except AttributeError: 

353 # AF_INET6 not available 

354 pass 

355 

356 raise ValueError("{!r} is not an IPv4 address.".format(ipname)) 

357 

358 

359def _ipaddress_match(cert_ipaddress, host_ip): 

360 """Exact matching of IP addresses. 

361 

362 RFC 6125 explicitly doesn't define an algorithm for this 

363 (section 1.7.2 - "Out of Scope"). 

364 """ 

365 # OpenSSL may add a trailing newline to a subjectAltName's IP address, 

366 # commonly woth IPv6 addresses. Strip off trailing \n. 

367 ip = _inet_paton(cert_ipaddress.rstrip()) 

368 return ip == host_ip 

369 

370 

371def match_hostname(cert, hostname): 

372 """Verify that *cert* (in decoded format as returned by 

373 SSLSocket.getpeercert()) matches the *hostname*. RFC 2818 and RFC 6125 

374 rules are followed. 

375 

376 The function matches IP addresses rather than dNSNames if hostname is a 

377 valid ipaddress string. IPv4 addresses are supported on all platforms. 

378 IPv6 addresses are supported on platforms with IPv6 support (AF_INET6 

379 and inet_pton). 

380 

381 CertificateError is raised on failure. On success, the function 

382 returns nothing. 

383 """ 

384 if not cert: 

385 raise ValueError("empty or no certificate, match_hostname needs a " 

386 "SSL socket or SSL context with either " 

387 "CERT_OPTIONAL or CERT_REQUIRED") 

388 try: 

389 host_ip = _inet_paton(hostname) 

390 except ValueError: 

391 # Not an IP address (common case) 

392 host_ip = None 

393 dnsnames = [] 

394 san = cert.get('subjectAltName', ()) 

395 for key, value in san: 

396 if key == 'DNS': 

397 if host_ip is None and _dnsname_match(value, hostname): 

398 return 

399 dnsnames.append(value) 

400 elif key == 'IP Address': 

401 if host_ip is not None and _ipaddress_match(value, host_ip): 

402 return 

403 dnsnames.append(value) 

404 if not dnsnames: 

405 # The subject is only checked when there is no dNSName entry 

406 # in subjectAltName 

407 for sub in cert.get('subject', ()): 

408 for key, value in sub: 

409 # XXX according to RFC 2818, the most specific Common Name 

410 # must be used. 

411 if key == 'commonName': 

412 if _dnsname_match(value, hostname): 

413 return 

414 dnsnames.append(value) 

415 if len(dnsnames) > 1: 

416 raise CertificateError("hostname %r " 

417 "doesn't match either of %s" 

418 % (hostname, ', '.join(map(repr, dnsnames)))) 

419 elif len(dnsnames) == 1: 

420 raise CertificateError("hostname %r " 

421 "doesn't match %r" 

422 % (hostname, dnsnames[0])) 

423 else: 

424 raise CertificateError("no appropriate commonName or " 

425 "subjectAltName fields were found") 

426 

427 

428DefaultVerifyPaths = namedtuple("DefaultVerifyPaths", 

429 "cafile capath openssl_cafile_env openssl_cafile openssl_capath_env " 

430 "openssl_capath") 

431 

432def get_default_verify_paths(): 

433 """Return paths to default cafile and capath. 

434 """ 

435 parts = _ssl.get_default_verify_paths() 

436 

437 # environment vars shadow paths 

438 cafile = os.environ.get(parts[0], parts[1]) 

439 capath = os.environ.get(parts[2], parts[3]) 

440 

441 return DefaultVerifyPaths(cafile if os.path.isfile(cafile) else None, 

442 capath if os.path.isdir(capath) else None, 

443 *parts) 

444 

445 

446class _ASN1Object(namedtuple("_ASN1Object", "nid shortname longname oid")): 

447 """ASN.1 object identifier lookup 

448 """ 

449 __slots__ = () 

450 

451 def __new__(cls, oid): 

452 return super().__new__(cls, *_txt2obj(oid, name=False)) 

453 

454 @classmethod 

455 def fromnid(cls, nid): 

456 """Create _ASN1Object from OpenSSL numeric ID 

457 """ 

458 return super().__new__(cls, *_nid2obj(nid)) 

459 

460 @classmethod 

461 def fromname(cls, name): 

462 """Create _ASN1Object from short name, long name or OID 

463 """ 

464 return super().__new__(cls, *_txt2obj(name, name=True)) 

465 

466 

467class Purpose(_ASN1Object, _Enum): 

468 """SSLContext purpose flags with X509v3 Extended Key Usage objects 

469 """ 

470 SERVER_AUTH = '1.3.6.1.5.5.7.3.1' 

471 CLIENT_AUTH = '1.3.6.1.5.5.7.3.2' 

472 

473 

474class SSLContext(_SSLContext): 

475 """An SSLContext holds various SSL-related configuration options and 

476 data, such as certificates and possibly a private key.""" 

477 _windows_cert_stores = ("CA", "ROOT") 

478 

479 sslsocket_class = None # SSLSocket is assigned later. 

480 sslobject_class = None # SSLObject is assigned later. 

481 

482 def __new__(cls, protocol=PROTOCOL_TLS, *args, **kwargs): 

483 self = _SSLContext.__new__(cls, protocol) 

484 return self 

485 

486 def _encode_hostname(self, hostname): 

487 if hostname is None: 

488 return None 

489 elif isinstance(hostname, str): 

490 return hostname.encode('idna').decode('ascii') 

491 else: 

492 return hostname.decode('ascii') 

493 

494 def wrap_socket(self, sock, server_side=False, 

495 do_handshake_on_connect=True, 

496 suppress_ragged_eofs=True, 

497 server_hostname=None, session=None): 

498 # SSLSocket class handles server_hostname encoding before it calls 

499 # ctx._wrap_socket() 

500 return self.sslsocket_class._create( 

501 sock=sock, 

502 server_side=server_side, 

503 do_handshake_on_connect=do_handshake_on_connect, 

504 suppress_ragged_eofs=suppress_ragged_eofs, 

505 server_hostname=server_hostname, 

506 context=self, 

507 session=session 

508 ) 

509 

510 def wrap_bio(self, incoming, outgoing, server_side=False, 

511 server_hostname=None, session=None): 

512 # Need to encode server_hostname here because _wrap_bio() can only 

513 # handle ASCII str. 

514 return self.sslobject_class._create( 

515 incoming, outgoing, server_side=server_side, 

516 server_hostname=self._encode_hostname(server_hostname), 

517 session=session, context=self, 

518 ) 

519 

520 def set_npn_protocols(self, npn_protocols): 

521 protos = bytearray() 

522 for protocol in npn_protocols: 

523 b = bytes(protocol, 'ascii') 

524 if len(b) == 0 or len(b) > 255: 

525 raise SSLError('NPN protocols must be 1 to 255 in length') 

526 protos.append(len(b)) 

527 protos.extend(b) 

528 

529 self._set_npn_protocols(protos) 

530 

531 def set_servername_callback(self, server_name_callback): 

532 if server_name_callback is None: 

533 self.sni_callback = None 

534 else: 

535 if not callable(server_name_callback): 

536 raise TypeError("not a callable object") 

537 

538 def shim_cb(sslobj, servername, sslctx): 

539 servername = self._encode_hostname(servername) 

540 return server_name_callback(sslobj, servername, sslctx) 

541 

542 self.sni_callback = shim_cb 

543 

544 def set_alpn_protocols(self, alpn_protocols): 

545 protos = bytearray() 

546 for protocol in alpn_protocols: 

547 b = bytes(protocol, 'ascii') 

548 if len(b) == 0 or len(b) > 255: 

549 raise SSLError('ALPN protocols must be 1 to 255 in length') 

550 protos.append(len(b)) 

551 protos.extend(b) 

552 

553 self._set_alpn_protocols(protos) 

554 

555 def _load_windows_store_certs(self, storename, purpose): 

556 certs = bytearray() 

557 try: 

558 for cert, encoding, trust in enum_certificates(storename): 

559 # CA certs are never PKCS#7 encoded 

560 if encoding == "x509_asn": 

561 if trust is True or purpose.oid in trust: 

562 certs.extend(cert) 

563 except PermissionError: 

564 warnings.warn("unable to enumerate Windows certificate store") 

565 if certs: 

566 self.load_verify_locations(cadata=certs) 

567 return certs 

568 

569 def load_default_certs(self, purpose=Purpose.SERVER_AUTH): 

570 if not isinstance(purpose, _ASN1Object): 

571 raise TypeError(purpose) 

572 if sys.platform == "win32": 

573 for storename in self._windows_cert_stores: 

574 self._load_windows_store_certs(storename, purpose) 

575 self.set_default_verify_paths() 

576 

577 if hasattr(_SSLContext, 'minimum_version'): 

578 @property 

579 def minimum_version(self): 

580 return TLSVersion(super().minimum_version) 

581 

582 @minimum_version.setter 

583 def minimum_version(self, value): 

584 if value == TLSVersion.SSLv3: 

585 self.options &= ~Options.OP_NO_SSLv3 

586 super(SSLContext, SSLContext).minimum_version.__set__(self, value) 

587 

588 @property 

589 def maximum_version(self): 

590 return TLSVersion(super().maximum_version) 

591 

592 @maximum_version.setter 

593 def maximum_version(self, value): 

594 super(SSLContext, SSLContext).maximum_version.__set__(self, value) 

595 

596 @property 

597 def options(self): 

598 return Options(super().options) 

599 

600 @options.setter 

601 def options(self, value): 

602 super(SSLContext, SSLContext).options.__set__(self, value) 

603 

604 if hasattr(_ssl, 'HOSTFLAG_NEVER_CHECK_SUBJECT'): 

605 @property 

606 def hostname_checks_common_name(self): 

607 ncs = self._host_flags & _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 

608 return ncs != _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 

609 

610 @hostname_checks_common_name.setter 

611 def hostname_checks_common_name(self, value): 

612 if value: 

613 self._host_flags &= ~_ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 

614 else: 

615 self._host_flags |= _ssl.HOSTFLAG_NEVER_CHECK_SUBJECT 

616 else: 

617 @property 

618 def hostname_checks_common_name(self): 

619 return True 

620 

621 @property 

622 def _msg_callback(self): 

623 """TLS message callback 

624 

625 The message callback provides a debugging hook to analyze TLS 

626 connections. The callback is called for any TLS protocol message 

627 (header, handshake, alert, and more), but not for application data. 

628 Due to technical limitations, the callback can't be used to filter 

629 traffic or to abort a connection. Any exception raised in the 

630 callback is delayed until the handshake, read, or write operation 

631 has been performed. 

632 

633 def msg_cb(conn, direction, version, content_type, msg_type, data): 

634 pass 

635 

636 conn 

637 :class:`SSLSocket` or :class:`SSLObject` instance 

638 direction 

639 ``read`` or ``write`` 

640 version 

641 :class:`TLSVersion` enum member or int for unknown version. For a 

642 frame header, it's the header version. 

643 content_type 

644 :class:`_TLSContentType` enum member or int for unsupported 

645 content type. 

646 msg_type 

647 Either a :class:`_TLSContentType` enum number for a header 

648 message, a :class:`_TLSAlertType` enum member for an alert 

649 message, a :class:`_TLSMessageType` enum member for other 

650 messages, or int for unsupported message types. 

651 data 

652 Raw, decrypted message content as bytes 

653 """ 

654 inner = super()._msg_callback 

655 if inner is not None: 

656 return inner.user_function 

657 else: 

658 return None 

659 

660 @_msg_callback.setter 

661 def _msg_callback(self, callback): 

662 if callback is None: 

663 super(SSLContext, SSLContext)._msg_callback.__set__(self, None) 

664 return 

665 

666 if not hasattr(callback, '__call__'): 

667 raise TypeError(f"{callback} is not callable.") 

668 

669 def inner(conn, direction, version, content_type, msg_type, data): 

670 try: 

671 version = TLSVersion(version) 

672 except ValueError: 

673 pass 

674 

675 try: 

676 content_type = _TLSContentType(content_type) 

677 except ValueError: 

678 pass 

679 

680 if content_type == _TLSContentType.HEADER: 

681 msg_enum = _TLSContentType 

682 elif content_type == _TLSContentType.ALERT: 

683 msg_enum = _TLSAlertType 

684 else: 

685 msg_enum = _TLSMessageType 

686 try: 

687 msg_type = msg_enum(msg_type) 

688 except ValueError: 

689 pass 

690 

691 return callback(conn, direction, version, 

692 content_type, msg_type, data) 

693 

694 inner.user_function = callback 

695 

696 super(SSLContext, SSLContext)._msg_callback.__set__(self, inner) 

697 

698 @property 

699 def protocol(self): 

700 return _SSLMethod(super().protocol) 

701 

702 @property 

703 def verify_flags(self): 

704 return VerifyFlags(super().verify_flags) 

705 

706 @verify_flags.setter 

707 def verify_flags(self, value): 

708 super(SSLContext, SSLContext).verify_flags.__set__(self, value) 

709 

710 @property 

711 def verify_mode(self): 

712 value = super().verify_mode 

713 try: 

714 return VerifyMode(value) 

715 except ValueError: 

716 return value 

717 

718 @verify_mode.setter 

719 def verify_mode(self, value): 

720 super(SSLContext, SSLContext).verify_mode.__set__(self, value) 

721 

722 

723def create_default_context(purpose=Purpose.SERVER_AUTH, *, cafile=None, 

724 capath=None, cadata=None): 

725 """Create a SSLContext object with default settings. 

726 

727 NOTE: The protocol and settings may change anytime without prior 

728 deprecation. The values represent a fair balance between maximum 

729 compatibility and security. 

730 """ 

731 if not isinstance(purpose, _ASN1Object): 

732 raise TypeError(purpose) 

733 

734 # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION, 

735 # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE 

736 # by default. 

737 context = SSLContext(PROTOCOL_TLS) 

738 

739 if purpose == Purpose.SERVER_AUTH: 

740 # verify certs and host name in client mode 

741 context.verify_mode = CERT_REQUIRED 

742 context.check_hostname = True 

743 

744 if cafile or capath or cadata: 

745 context.load_verify_locations(cafile, capath, cadata) 

746 elif context.verify_mode != CERT_NONE: 

747 # no explicit cafile, capath or cadata but the verify mode is 

748 # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system 

749 # root CA certificates for the given purpose. This may fail silently. 

750 context.load_default_certs(purpose) 

751 # OpenSSL 1.1.1 keylog file 

752 if hasattr(context, 'keylog_filename'): 

753 keylogfile = os.environ.get('SSLKEYLOGFILE') 

754 if keylogfile and not sys.flags.ignore_environment: 

755 context.keylog_filename = keylogfile 

756 return context 

757 

758def _create_unverified_context(protocol=PROTOCOL_TLS, *, cert_reqs=CERT_NONE, 

759 check_hostname=False, purpose=Purpose.SERVER_AUTH, 

760 certfile=None, keyfile=None, 

761 cafile=None, capath=None, cadata=None): 

762 """Create a SSLContext object for Python stdlib modules 

763 

764 All Python stdlib modules shall use this function to create SSLContext 

765 objects in order to keep common settings in one place. The configuration 

766 is less restrict than create_default_context()'s to increase backward 

767 compatibility. 

768 """ 

769 if not isinstance(purpose, _ASN1Object): 

770 raise TypeError(purpose) 

771 

772 # SSLContext sets OP_NO_SSLv2, OP_NO_SSLv3, OP_NO_COMPRESSION, 

773 # OP_CIPHER_SERVER_PREFERENCE, OP_SINGLE_DH_USE and OP_SINGLE_ECDH_USE 

774 # by default. 

775 context = SSLContext(protocol) 

776 

777 if not check_hostname: 

778 context.check_hostname = False 

779 if cert_reqs is not None: 

780 context.verify_mode = cert_reqs 

781 if check_hostname: 

782 context.check_hostname = True 

783 

784 if keyfile and not certfile: 

785 raise ValueError("certfile must be specified") 

786 if certfile or keyfile: 

787 context.load_cert_chain(certfile, keyfile) 

788 

789 # load CA root certs 

790 if cafile or capath or cadata: 

791 context.load_verify_locations(cafile, capath, cadata) 

792 elif context.verify_mode != CERT_NONE: 

793 # no explicit cafile, capath or cadata but the verify mode is 

794 # CERT_OPTIONAL or CERT_REQUIRED. Let's try to load default system 

795 # root CA certificates for the given purpose. This may fail silently. 

796 context.load_default_certs(purpose) 

797 # OpenSSL 1.1.1 keylog file 

798 if hasattr(context, 'keylog_filename'): 

799 keylogfile = os.environ.get('SSLKEYLOGFILE') 

800 if keylogfile and not sys.flags.ignore_environment: 

801 context.keylog_filename = keylogfile 

802 return context 

803 

804# Used by http.client if no context is explicitly passed. 

805_create_default_https_context = create_default_context 

806 

807 

808# Backwards compatibility alias, even though it's not a public name. 

809_create_stdlib_context = _create_unverified_context 

810 

811 

812class SSLObject: 

813 """This class implements an interface on top of a low-level SSL object as 

814 implemented by OpenSSL. This object captures the state of an SSL connection 

815 but does not provide any network IO itself. IO needs to be performed 

816 through separate "BIO" objects which are OpenSSL's IO abstraction layer. 

817 

818 This class does not have a public constructor. Instances are returned by 

819 ``SSLContext.wrap_bio``. This class is typically used by framework authors 

820 that want to implement asynchronous IO for SSL through memory buffers. 

821 

822 When compared to ``SSLSocket``, this object lacks the following features: 

823 

824 * Any form of network IO, including methods such as ``recv`` and ``send``. 

825 * The ``do_handshake_on_connect`` and ``suppress_ragged_eofs`` machinery. 

826 """ 

827 def __init__(self, *args, **kwargs): 

828 raise TypeError( 

829 f"{self.__class__.__name__} does not have a public " 

830 f"constructor. Instances are returned by SSLContext.wrap_bio()." 

831 ) 

832 

833 @classmethod 

834 def _create(cls, incoming, outgoing, server_side=False, 

835 server_hostname=None, session=None, context=None): 

836 self = cls.__new__(cls) 

837 sslobj = context._wrap_bio( 

838 incoming, outgoing, server_side=server_side, 

839 server_hostname=server_hostname, 

840 owner=self, session=session 

841 ) 

842 self._sslobj = sslobj 

843 return self 

844 

845 @property 

846 def context(self): 

847 """The SSLContext that is currently in use.""" 

848 return self._sslobj.context 

849 

850 @context.setter 

851 def context(self, ctx): 

852 self._sslobj.context = ctx 

853 

854 @property 

855 def session(self): 

856 """The SSLSession for client socket.""" 

857 return self._sslobj.session 

858 

859 @session.setter 

860 def session(self, session): 

861 self._sslobj.session = session 

862 

863 @property 

864 def session_reused(self): 

865 """Was the client session reused during handshake""" 

866 return self._sslobj.session_reused 

867 

868 @property 

869 def server_side(self): 

870 """Whether this is a server-side socket.""" 

871 return self._sslobj.server_side 

872 

873 @property 

874 def server_hostname(self): 

875 """The currently set server hostname (for SNI), or ``None`` if no 

876 server hostname is set.""" 

877 return self._sslobj.server_hostname 

878 

879 def read(self, len=1024, buffer=None): 

880 """Read up to 'len' bytes from the SSL object and return them. 

881 

882 If 'buffer' is provided, read into this buffer and return the number of 

883 bytes read. 

884 """ 

885 if buffer is not None: 

886 v = self._sslobj.read(len, buffer) 

887 else: 

888 v = self._sslobj.read(len) 

889 return v 

890 

891 def write(self, data): 

892 """Write 'data' to the SSL object and return the number of bytes 

893 written. 

894 

895 The 'data' argument must support the buffer interface. 

896 """ 

897 return self._sslobj.write(data) 

898 

899 def getpeercert(self, binary_form=False): 

900 """Returns a formatted version of the data in the certificate provided 

901 by the other end of the SSL channel. 

902 

903 Return None if no certificate was provided, {} if a certificate was 

904 provided, but not validated. 

905 """ 

906 return self._sslobj.getpeercert(binary_form) 

907 

908 def selected_npn_protocol(self): 

909 """Return the currently selected NPN protocol as a string, or ``None`` 

910 if a next protocol was not negotiated or if NPN is not supported by one 

911 of the peers.""" 

912 if _ssl.HAS_NPN: 

913 return self._sslobj.selected_npn_protocol() 

914 

915 def selected_alpn_protocol(self): 

916 """Return the currently selected ALPN protocol as a string, or ``None`` 

917 if a next protocol was not negotiated or if ALPN is not supported by one 

918 of the peers.""" 

919 if _ssl.HAS_ALPN: 

920 return self._sslobj.selected_alpn_protocol() 

921 

922 def cipher(self): 

923 """Return the currently selected cipher as a 3-tuple ``(name, 

924 ssl_version, secret_bits)``.""" 

925 return self._sslobj.cipher() 

926 

927 def shared_ciphers(self): 

928 """Return a list of ciphers shared by the client during the handshake or 

929 None if this is not a valid server connection. 

930 """ 

931 return self._sslobj.shared_ciphers() 

932 

933 def compression(self): 

934 """Return the current compression algorithm in use, or ``None`` if 

935 compression was not negotiated or not supported by one of the peers.""" 

936 return self._sslobj.compression() 

937 

938 def pending(self): 

939 """Return the number of bytes that can be read immediately.""" 

940 return self._sslobj.pending() 

941 

942 def do_handshake(self): 

943 """Start the SSL/TLS handshake.""" 

944 self._sslobj.do_handshake() 

945 

946 def unwrap(self): 

947 """Start the SSL shutdown handshake.""" 

948 return self._sslobj.shutdown() 

949 

950 def get_channel_binding(self, cb_type="tls-unique"): 

951 """Get channel binding data for current connection. Raise ValueError 

952 if the requested `cb_type` is not supported. Return bytes of the data 

953 or None if the data is not available (e.g. before the handshake).""" 

954 return self._sslobj.get_channel_binding(cb_type) 

955 

956 def version(self): 

957 """Return a string identifying the protocol version used by the 

958 current SSL channel. """ 

959 return self._sslobj.version() 

960 

961 def verify_client_post_handshake(self): 

962 return self._sslobj.verify_client_post_handshake() 

963 

964 

965def _sslcopydoc(func): 

966 """Copy docstring from SSLObject to SSLSocket""" 

967 func.__doc__ = getattr(SSLObject, func.__name__).__doc__ 

968 return func 

969 

970 

971class SSLSocket(socket): 

972 """This class implements a subtype of socket.socket that wraps 

973 the underlying OS socket in an SSL context when necessary, and 

974 provides read and write methods over that channel. """ 

975 

976 def __init__(self, *args, **kwargs): 

977 raise TypeError( 

978 f"{self.__class__.__name__} does not have a public " 

979 f"constructor. Instances are returned by " 

980 f"SSLContext.wrap_socket()." 

981 ) 

982 

983 @classmethod 

984 def _create(cls, sock, server_side=False, do_handshake_on_connect=True, 

985 suppress_ragged_eofs=True, server_hostname=None, 

986 context=None, session=None): 

987 if sock.getsockopt(SOL_SOCKET, SO_TYPE) != SOCK_STREAM: 

988 raise NotImplementedError("only stream sockets are supported") 

989 if server_side: 

990 if server_hostname: 

991 raise ValueError("server_hostname can only be specified " 

992 "in client mode") 

993 if session is not None: 

994 raise ValueError("session can only be specified in " 

995 "client mode") 

996 if context.check_hostname and not server_hostname: 

997 raise ValueError("check_hostname requires server_hostname") 

998 

999 kwargs = dict( 

1000 family=sock.family, type=sock.type, proto=sock.proto, 

1001 fileno=sock.fileno() 

1002 ) 

1003 self = cls.__new__(cls, **kwargs) 

1004 super(SSLSocket, self).__init__(**kwargs) 

1005 self.settimeout(sock.gettimeout()) 

1006 sock.detach() 

1007 

1008 self._context = context 

1009 self._session = session 

1010 self._closed = False 

1011 self._sslobj = None 

1012 self.server_side = server_side 

1013 self.server_hostname = context._encode_hostname(server_hostname) 

1014 self.do_handshake_on_connect = do_handshake_on_connect 

1015 self.suppress_ragged_eofs = suppress_ragged_eofs 

1016 

1017 # See if we are connected 

1018 try: 

1019 self.getpeername() 

1020 except OSError as e: 

1021 if e.errno != errno.ENOTCONN: 

1022 raise 

1023 connected = False 

1024 else: 

1025 connected = True 

1026 

1027 self._connected = connected 

1028 if connected: 

1029 # create the SSL object 

1030 try: 

1031 self._sslobj = self._context._wrap_socket( 

1032 self, server_side, self.server_hostname, 

1033 owner=self, session=self._session, 

1034 ) 

1035 if do_handshake_on_connect: 

1036 timeout = self.gettimeout() 

1037 if timeout == 0.0: 

1038 # non-blocking 

1039 raise ValueError("do_handshake_on_connect should not be specified for non-blocking sockets") 

1040 self.do_handshake() 

1041 except (OSError, ValueError): 

1042 self.close() 

1043 raise 

1044 return self 

1045 

1046 @property 

1047 @_sslcopydoc 

1048 def context(self): 

1049 return self._context 

1050 

1051 @context.setter 

1052 def context(self, ctx): 

1053 self._context = ctx 

1054 self._sslobj.context = ctx 

1055 

1056 @property 

1057 @_sslcopydoc 

1058 def session(self): 

1059 if self._sslobj is not None: 

1060 return self._sslobj.session 

1061 

1062 @session.setter 

1063 def session(self, session): 

1064 self._session = session 

1065 if self._sslobj is not None: 

1066 self._sslobj.session = session 

1067 

1068 @property 

1069 @_sslcopydoc 

1070 def session_reused(self): 

1071 if self._sslobj is not None: 

1072 return self._sslobj.session_reused 

1073 

1074 def dup(self): 

1075 raise NotImplementedError("Can't dup() %s instances" % 

1076 self.__class__.__name__) 

1077 

1078 def _checkClosed(self, msg=None): 

1079 # raise an exception here if you wish to check for spurious closes 

1080 pass 

1081 

1082 def _check_connected(self): 

1083 if not self._connected: 

1084 # getpeername() will raise ENOTCONN if the socket is really 

1085 # not connected; note that we can be connected even without 

1086 # _connected being set, e.g. if connect() first returned 

1087 # EAGAIN. 

1088 self.getpeername() 

1089 

1090 def read(self, len=1024, buffer=None): 

1091 """Read up to LEN bytes and return them. 

1092 Return zero-length string on EOF.""" 

1093 

1094 self._checkClosed() 

1095 if self._sslobj is None: 

1096 raise ValueError("Read on closed or unwrapped SSL socket.") 

1097 try: 

1098 if buffer is not None: 

1099 return self._sslobj.read(len, buffer) 

1100 else: 

1101 return self._sslobj.read(len) 

1102 except SSLError as x: 

1103 if x.args[0] == SSL_ERROR_EOF and self.suppress_ragged_eofs: 

1104 if buffer is not None: 

1105 return 0 

1106 else: 

1107 return b'' 

1108 else: 

1109 raise 

1110 

1111 def write(self, data): 

1112 """Write DATA to the underlying SSL channel. Returns 

1113 number of bytes of DATA actually transmitted.""" 

1114 

1115 self._checkClosed() 

1116 if self._sslobj is None: 

1117 raise ValueError("Write on closed or unwrapped SSL socket.") 

1118 return self._sslobj.write(data) 

1119 

1120 @_sslcopydoc 

1121 def getpeercert(self, binary_form=False): 

1122 self._checkClosed() 

1123 self._check_connected() 

1124 return self._sslobj.getpeercert(binary_form) 

1125 

1126 @_sslcopydoc 

1127 def selected_npn_protocol(self): 

1128 self._checkClosed() 

1129 if self._sslobj is None or not _ssl.HAS_NPN: 

1130 return None 

1131 else: 

1132 return self._sslobj.selected_npn_protocol() 

1133 

1134 @_sslcopydoc 

1135 def selected_alpn_protocol(self): 

1136 self._checkClosed() 

1137 if self._sslobj is None or not _ssl.HAS_ALPN: 

1138 return None 

1139 else: 

1140 return self._sslobj.selected_alpn_protocol() 

1141 

1142 @_sslcopydoc 

1143 def cipher(self): 

1144 self._checkClosed() 

1145 if self._sslobj is None: 

1146 return None 

1147 else: 

1148 return self._sslobj.cipher() 

1149 

1150 @_sslcopydoc 

1151 def shared_ciphers(self): 

1152 self._checkClosed() 

1153 if self._sslobj is None: 

1154 return None 

1155 else: 

1156 return self._sslobj.shared_ciphers() 

1157 

1158 @_sslcopydoc 

1159 def compression(self): 

1160 self._checkClosed() 

1161 if self._sslobj is None: 

1162 return None 

1163 else: 

1164 return self._sslobj.compression() 

1165 

1166 def send(self, data, flags=0): 

1167 self._checkClosed() 

1168 if self._sslobj is not None: 

1169 if flags != 0: 

1170 raise ValueError( 

1171 "non-zero flags not allowed in calls to send() on %s" % 

1172 self.__class__) 

1173 return self._sslobj.write(data) 

1174 else: 

1175 return super().send(data, flags) 

1176 

1177 def sendto(self, data, flags_or_addr, addr=None): 

1178 self._checkClosed() 

1179 if self._sslobj is not None: 

1180 raise ValueError("sendto not allowed on instances of %s" % 

1181 self.__class__) 

1182 elif addr is None: 

1183 return super().sendto(data, flags_or_addr) 

1184 else: 

1185 return super().sendto(data, flags_or_addr, addr) 

1186 

1187 def sendmsg(self, *args, **kwargs): 

1188 # Ensure programs don't send data unencrypted if they try to 

1189 # use this method. 

1190 raise NotImplementedError("sendmsg not allowed on instances of %s" % 

1191 self.__class__) 

1192 

1193 def sendall(self, data, flags=0): 

1194 self._checkClosed() 

1195 if self._sslobj is not None: 

1196 if flags != 0: 

1197 raise ValueError( 

1198 "non-zero flags not allowed in calls to sendall() on %s" % 

1199 self.__class__) 

1200 count = 0 

1201 with memoryview(data) as view, view.cast("B") as byte_view: 

1202 amount = len(byte_view) 

1203 while count < amount: 

1204 v = self.send(byte_view[count:]) 

1205 count += v 

1206 else: 

1207 return super().sendall(data, flags) 

1208 

1209 def sendfile(self, file, offset=0, count=None): 

1210 """Send a file, possibly by using os.sendfile() if this is a 

1211 clear-text socket. Return the total number of bytes sent. 

1212 """ 

1213 if self._sslobj is not None: 

1214 return self._sendfile_use_send(file, offset, count) 

1215 else: 

1216 # os.sendfile() works with plain sockets only 

1217 return super().sendfile(file, offset, count) 

1218 

1219 def recv(self, buflen=1024, flags=0): 

1220 self._checkClosed() 

1221 if self._sslobj is not None: 

1222 if flags != 0: 

1223 raise ValueError( 

1224 "non-zero flags not allowed in calls to recv() on %s" % 

1225 self.__class__) 

1226 return self.read(buflen) 

1227 else: 

1228 return super().recv(buflen, flags) 

1229 

1230 def recv_into(self, buffer, nbytes=None, flags=0): 

1231 self._checkClosed() 

1232 if buffer and (nbytes is None): 

1233 nbytes = len(buffer) 

1234 elif nbytes is None: 

1235 nbytes = 1024 

1236 if self._sslobj is not None: 

1237 if flags != 0: 

1238 raise ValueError( 

1239 "non-zero flags not allowed in calls to recv_into() on %s" % 

1240 self.__class__) 

1241 return self.read(nbytes, buffer) 

1242 else: 

1243 return super().recv_into(buffer, nbytes, flags) 

1244 

1245 def recvfrom(self, buflen=1024, flags=0): 

1246 self._checkClosed() 

1247 if self._sslobj is not None: 

1248 raise ValueError("recvfrom not allowed on instances of %s" % 

1249 self.__class__) 

1250 else: 

1251 return super().recvfrom(buflen, flags) 

1252 

1253 def recvfrom_into(self, buffer, nbytes=None, flags=0): 

1254 self._checkClosed() 

1255 if self._sslobj is not None: 

1256 raise ValueError("recvfrom_into not allowed on instances of %s" % 

1257 self.__class__) 

1258 else: 

1259 return super().recvfrom_into(buffer, nbytes, flags) 

1260 

1261 def recvmsg(self, *args, **kwargs): 

1262 raise NotImplementedError("recvmsg not allowed on instances of %s" % 

1263 self.__class__) 

1264 

1265 def recvmsg_into(self, *args, **kwargs): 

1266 raise NotImplementedError("recvmsg_into not allowed on instances of " 

1267 "%s" % self.__class__) 

1268 

1269 @_sslcopydoc 

1270 def pending(self): 

1271 self._checkClosed() 

1272 if self._sslobj is not None: 

1273 return self._sslobj.pending() 

1274 else: 

1275 return 0 

1276 

1277 def shutdown(self, how): 

1278 self._checkClosed() 

1279 self._sslobj = None 

1280 super().shutdown(how) 

1281 

1282 @_sslcopydoc 

1283 def unwrap(self): 

1284 if self._sslobj: 

1285 s = self._sslobj.shutdown() 

1286 self._sslobj = None 

1287 return s 

1288 else: 

1289 raise ValueError("No SSL wrapper around " + str(self)) 

1290 

1291 @_sslcopydoc 

1292 def verify_client_post_handshake(self): 

1293 if self._sslobj: 

1294 return self._sslobj.verify_client_post_handshake() 

1295 else: 

1296 raise ValueError("No SSL wrapper around " + str(self)) 

1297 

1298 def _real_close(self): 

1299 self._sslobj = None 

1300 super()._real_close() 

1301 

1302 @_sslcopydoc 

1303 def do_handshake(self, block=False): 

1304 self._check_connected() 

1305 timeout = self.gettimeout() 

1306 try: 

1307 if timeout == 0.0 and block: 

1308 self.settimeout(None) 

1309 self._sslobj.do_handshake() 

1310 finally: 

1311 self.settimeout(timeout) 

1312 

1313 def _real_connect(self, addr, connect_ex): 

1314 if self.server_side: 

1315 raise ValueError("can't connect in server-side mode") 

1316 # Here we assume that the socket is client-side, and not 

1317 # connected at the time of the call. We connect it, then wrap it. 

1318 if self._connected or self._sslobj is not None: 

1319 raise ValueError("attempt to connect already-connected SSLSocket!") 

1320 self._sslobj = self.context._wrap_socket( 

1321 self, False, self.server_hostname, 

1322 owner=self, session=self._session 

1323 ) 

1324 try: 

1325 if connect_ex: 

1326 rc = super().connect_ex(addr) 

1327 else: 

1328 rc = None 

1329 super().connect(addr) 

1330 if not rc: 

1331 self._connected = True 

1332 if self.do_handshake_on_connect: 

1333 self.do_handshake() 

1334 return rc 

1335 except (OSError, ValueError): 

1336 self._sslobj = None 

1337 raise 

1338 

1339 def connect(self, addr): 

1340 """Connects to remote ADDR, and then wraps the connection in 

1341 an SSL channel.""" 

1342 self._real_connect(addr, False) 

1343 

1344 def connect_ex(self, addr): 

1345 """Connects to remote ADDR, and then wraps the connection in 

1346 an SSL channel.""" 

1347 return self._real_connect(addr, True) 

1348 

1349 def accept(self): 

1350 """Accepts a new connection from a remote client, and returns 

1351 a tuple containing that new connection wrapped with a server-side 

1352 SSL channel, and the address of the remote client.""" 

1353 

1354 newsock, addr = super().accept() 

1355 newsock = self.context.wrap_socket(newsock, 

1356 do_handshake_on_connect=self.do_handshake_on_connect, 

1357 suppress_ragged_eofs=self.suppress_ragged_eofs, 

1358 server_side=True) 

1359 return newsock, addr 

1360 

1361 @_sslcopydoc 

1362 def get_channel_binding(self, cb_type="tls-unique"): 

1363 if self._sslobj is not None: 

1364 return self._sslobj.get_channel_binding(cb_type) 

1365 else: 

1366 if cb_type not in CHANNEL_BINDING_TYPES: 

1367 raise ValueError( 

1368 "{0} channel binding type not implemented".format(cb_type) 

1369 ) 

1370 return None 

1371 

1372 @_sslcopydoc 

1373 def version(self): 

1374 if self._sslobj is not None: 

1375 return self._sslobj.version() 

1376 else: 

1377 return None 

1378 

1379 

1380# Python does not support forward declaration of types. 

1381SSLContext.sslsocket_class = SSLSocket 

1382SSLContext.sslobject_class = SSLObject 

1383 

1384 

1385def wrap_socket(sock, keyfile=None, certfile=None, 

1386 server_side=False, cert_reqs=CERT_NONE, 

1387 ssl_version=PROTOCOL_TLS, ca_certs=None, 

1388 do_handshake_on_connect=True, 

1389 suppress_ragged_eofs=True, 

1390 ciphers=None): 

1391 

1392 if server_side and not certfile: 

1393 raise ValueError("certfile must be specified for server-side " 

1394 "operations") 

1395 if keyfile and not certfile: 

1396 raise ValueError("certfile must be specified") 

1397 context = SSLContext(ssl_version) 

1398 context.verify_mode = cert_reqs 

1399 if ca_certs: 

1400 context.load_verify_locations(ca_certs) 

1401 if certfile: 

1402 context.load_cert_chain(certfile, keyfile) 

1403 if ciphers: 

1404 context.set_ciphers(ciphers) 

1405 return context.wrap_socket( 

1406 sock=sock, server_side=server_side, 

1407 do_handshake_on_connect=do_handshake_on_connect, 

1408 suppress_ragged_eofs=suppress_ragged_eofs 

1409 ) 

1410 

1411# some utility functions 

1412 

1413def cert_time_to_seconds(cert_time): 

1414 """Return the time in seconds since the Epoch, given the timestring 

1415 representing the "notBefore" or "notAfter" date from a certificate 

1416 in ``"%b %d %H:%M:%S %Y %Z"`` strptime format (C locale). 

1417 

1418 "notBefore" or "notAfter" dates must use UTC (RFC 5280). 

1419 

1420 Month is one of: Jan Feb Mar Apr May Jun Jul Aug Sep Oct Nov Dec 

1421 UTC should be specified as GMT (see ASN1_TIME_print()) 

1422 """ 

1423 from time import strptime 

1424 from calendar import timegm 

1425 

1426 months = ( 

1427 "Jan","Feb","Mar","Apr","May","Jun", 

1428 "Jul","Aug","Sep","Oct","Nov","Dec" 

1429 ) 

1430 time_format = ' %d %H:%M:%S %Y GMT' # NOTE: no month, fixed GMT 

1431 try: 

1432 month_number = months.index(cert_time[:3].title()) + 1 

1433 except ValueError: 

1434 raise ValueError('time data %r does not match ' 

1435 'format "%%b%s"' % (cert_time, time_format)) 

1436 else: 

1437 # found valid month 

1438 tt = strptime(cert_time[3:], time_format) 

1439 # return an integer, the previous mktime()-based implementation 

1440 # returned a float (fractional seconds are always zero here). 

1441 return timegm((tt[0], month_number) + tt[2:6]) 

1442 

1443PEM_HEADER = "-----BEGIN CERTIFICATE-----" 

1444PEM_FOOTER = "-----END CERTIFICATE-----" 

1445 

1446def DER_cert_to_PEM_cert(der_cert_bytes): 

1447 """Takes a certificate in binary DER format and returns the 

1448 PEM version of it as a string.""" 

1449 

1450 f = str(base64.standard_b64encode(der_cert_bytes), 'ASCII', 'strict') 

1451 ss = [PEM_HEADER] 

1452 ss += [f[i:i+64] for i in range(0, len(f), 64)] 

1453 ss.append(PEM_FOOTER + '\n') 

1454 return '\n'.join(ss) 

1455 

1456def PEM_cert_to_DER_cert(pem_cert_string): 

1457 """Takes a certificate in ASCII PEM format and returns the 

1458 DER-encoded version of it as a byte sequence""" 

1459 

1460 if not pem_cert_string.startswith(PEM_HEADER): 

1461 raise ValueError("Invalid PEM encoding; must start with %s" 

1462 % PEM_HEADER) 

1463 if not pem_cert_string.strip().endswith(PEM_FOOTER): 

1464 raise ValueError("Invalid PEM encoding; must end with %s" 

1465 % PEM_FOOTER) 

1466 d = pem_cert_string.strip()[len(PEM_HEADER):-len(PEM_FOOTER)] 

1467 return base64.decodebytes(d.encode('ASCII', 'strict')) 

1468 

1469def get_server_certificate(addr, ssl_version=PROTOCOL_TLS, ca_certs=None): 

1470 """Retrieve the certificate from the server at the specified address, 

1471 and return it as a PEM-encoded string. 

1472 If 'ca_certs' is specified, validate the server cert against it. 

1473 If 'ssl_version' is specified, use it in the connection attempt.""" 

1474 

1475 host, port = addr 

1476 if ca_certs is not None: 

1477 cert_reqs = CERT_REQUIRED 

1478 else: 

1479 cert_reqs = CERT_NONE 

1480 context = _create_stdlib_context(ssl_version, 

1481 cert_reqs=cert_reqs, 

1482 cafile=ca_certs) 

1483 with create_connection(addr) as sock: 

1484 with context.wrap_socket(sock) as sslsock: 

1485 dercert = sslsock.getpeercert(True) 

1486 return DER_cert_to_PEM_cert(dercert) 

1487 

1488def get_protocol_name(protocol_code): 

1489 return _PROTOCOL_NAMES.get(protocol_code, '<unknown>')