Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/urllib3/connection.py: 29%

228 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:51 +0000

1from __future__ import absolute_import 

2 

3import datetime 

4import logging 

5import os 

6import re 

7import socket 

8import warnings 

9from socket import error as SocketError 

10from socket import timeout as SocketTimeout 

11 

12from .packages import six 

13from .packages.six.moves.http_client import HTTPConnection as _HTTPConnection 

14from .packages.six.moves.http_client import HTTPException # noqa: F401 

15from .util.proxy import create_proxy_ssl_context 

16 

17try: # Compiled with SSL? 

18 import ssl 

19 

20 BaseSSLError = ssl.SSLError 

21except (ImportError, AttributeError): # Platform-specific: No SSL. 

22 ssl = None 

23 

24 class BaseSSLError(BaseException): 

25 pass 

26 

27 

28try: 

29 # Python 3: not a no-op, we're adding this to the namespace so it can be imported. 

30 ConnectionError = ConnectionError 

31except NameError: 

32 # Python 2 

33 class ConnectionError(Exception): 

34 pass 

35 

36 

37try: # Python 3: 

38 # Not a no-op, we're adding this to the namespace so it can be imported. 

39 BrokenPipeError = BrokenPipeError 

40except NameError: # Python 2: 

41 

42 class BrokenPipeError(Exception): 

43 pass 

44 

45 

46from ._collections import HTTPHeaderDict # noqa (historical, removed in v2) 

47from ._version import __version__ 

48from .exceptions import ( 

49 ConnectTimeoutError, 

50 NewConnectionError, 

51 SubjectAltNameWarning, 

52 SystemTimeWarning, 

53) 

54from .util import SKIP_HEADER, SKIPPABLE_HEADERS, connection 

55from .util.ssl_ import ( 

56 assert_fingerprint, 

57 create_urllib3_context, 

58 is_ipaddress, 

59 resolve_cert_reqs, 

60 resolve_ssl_version, 

61 ssl_wrap_socket, 

62) 

63from .util.ssl_match_hostname import CertificateError, match_hostname 

64 

65log = logging.getLogger(__name__) 

66 

67port_by_scheme = {"http": 80, "https": 443} 

68 

69# When it comes time to update this value as a part of regular maintenance 

70# (ie test_recent_date is failing) update it to ~6 months before the current date. 

71RECENT_DATE = datetime.date(2022, 1, 1) 

72 

73_CONTAINS_CONTROL_CHAR_RE = re.compile(r"[^-!#$%&'*+.^_`|~0-9a-zA-Z]") 

74 

75 

76class HTTPConnection(_HTTPConnection, object): 

77 """ 

78 Based on :class:`http.client.HTTPConnection` but provides an extra constructor 

79 backwards-compatibility layer between older and newer Pythons. 

80 

81 Additional keyword parameters are used to configure attributes of the connection. 

82 Accepted parameters include: 

83 

84 - ``strict``: See the documentation on :class:`urllib3.connectionpool.HTTPConnectionPool` 

85 - ``source_address``: Set the source address for the current connection. 

86 - ``socket_options``: Set specific options on the underlying socket. If not specified, then 

87 defaults are loaded from ``HTTPConnection.default_socket_options`` which includes disabling 

88 Nagle's algorithm (sets TCP_NODELAY to 1) unless the connection is behind a proxy. 

89 

90 For example, if you wish to enable TCP Keep Alive in addition to the defaults, 

91 you might pass: 

92 

93 .. code-block:: python 

94 

95 HTTPConnection.default_socket_options + [ 

96 (socket.SOL_SOCKET, socket.SO_KEEPALIVE, 1), 

97 ] 

98 

99 Or you may want to disable the defaults by passing an empty list (e.g., ``[]``). 

100 """ 

101 

102 default_port = port_by_scheme["http"] 

103 

104 #: Disable Nagle's algorithm by default. 

105 #: ``[(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)]`` 

106 default_socket_options = [(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)] 

107 

108 #: Whether this connection verifies the host's certificate. 

109 is_verified = False 

110 

111 #: Whether this proxy connection (if used) verifies the proxy host's 

112 #: certificate. 

113 proxy_is_verified = None 

114 

115 def __init__(self, *args, **kw): 

116 if not six.PY2: 

117 kw.pop("strict", None) 

118 

119 # Pre-set source_address. 

120 self.source_address = kw.get("source_address") 

121 

122 #: The socket options provided by the user. If no options are 

123 #: provided, we use the default options. 

124 self.socket_options = kw.pop("socket_options", self.default_socket_options) 

125 

126 # Proxy options provided by the user. 

127 self.proxy = kw.pop("proxy", None) 

128 self.proxy_config = kw.pop("proxy_config", None) 

129 

130 _HTTPConnection.__init__(self, *args, **kw) 

131 

132 @property 

133 def host(self): 

134 """ 

135 Getter method to remove any trailing dots that indicate the hostname is an FQDN. 

136 

137 In general, SSL certificates don't include the trailing dot indicating a 

138 fully-qualified domain name, and thus, they don't validate properly when 

139 checked against a domain name that includes the dot. In addition, some 

140 servers may not expect to receive the trailing dot when provided. 

141 

142 However, the hostname with trailing dot is critical to DNS resolution; doing a 

143 lookup with the trailing dot will properly only resolve the appropriate FQDN, 

144 whereas a lookup without a trailing dot will search the system's search domain 

145 list. Thus, it's important to keep the original host around for use only in 

146 those cases where it's appropriate (i.e., when doing DNS lookup to establish the 

147 actual TCP connection across which we're going to send HTTP requests). 

148 """ 

149 return self._dns_host.rstrip(".") 

150 

151 @host.setter 

152 def host(self, value): 

153 """ 

154 Setter for the `host` property. 

155 

156 We assume that only urllib3 uses the _dns_host attribute; httplib itself 

157 only uses `host`, and it seems reasonable that other libraries follow suit. 

158 """ 

159 self._dns_host = value 

160 

161 def _new_conn(self): 

162 """Establish a socket connection and set nodelay settings on it. 

163 

164 :return: New socket connection. 

165 """ 

166 extra_kw = {} 

167 if self.source_address: 

168 extra_kw["source_address"] = self.source_address 

169 

170 if self.socket_options: 

171 extra_kw["socket_options"] = self.socket_options 

172 

173 try: 

174 conn = connection.create_connection( 

175 (self._dns_host, self.port), self.timeout, **extra_kw 

176 ) 

177 

178 except SocketTimeout: 

179 raise ConnectTimeoutError( 

180 self, 

181 "Connection to %s timed out. (connect timeout=%s)" 

182 % (self.host, self.timeout), 

183 ) 

184 

185 except SocketError as e: 

186 raise NewConnectionError( 

187 self, "Failed to establish a new connection: %s" % e 

188 ) 

189 

190 return conn 

191 

192 def _is_using_tunnel(self): 

193 # Google App Engine's httplib does not define _tunnel_host 

194 return getattr(self, "_tunnel_host", None) 

195 

196 def _prepare_conn(self, conn): 

197 self.sock = conn 

198 if self._is_using_tunnel(): 

199 # TODO: Fix tunnel so it doesn't depend on self.sock state. 

200 self._tunnel() 

201 # Mark this connection as not reusable 

202 self.auto_open = 0 

203 

204 def connect(self): 

205 conn = self._new_conn() 

206 self._prepare_conn(conn) 

207 

208 def putrequest(self, method, url, *args, **kwargs): 

209 """ """ 

210 # Empty docstring because the indentation of CPython's implementation 

211 # is broken but we don't want this method in our documentation. 

212 match = _CONTAINS_CONTROL_CHAR_RE.search(method) 

213 if match: 

214 raise ValueError( 

215 "Method cannot contain non-token characters %r (found at least %r)" 

216 % (method, match.group()) 

217 ) 

218 

219 return _HTTPConnection.putrequest(self, method, url, *args, **kwargs) 

220 

221 def putheader(self, header, *values): 

222 """ """ 

223 if not any(isinstance(v, str) and v == SKIP_HEADER for v in values): 

224 _HTTPConnection.putheader(self, header, *values) 

225 elif six.ensure_str(header.lower()) not in SKIPPABLE_HEADERS: 

226 raise ValueError( 

227 "urllib3.util.SKIP_HEADER only supports '%s'" 

228 % ("', '".join(map(str.title, sorted(SKIPPABLE_HEADERS))),) 

229 ) 

230 

231 def request(self, method, url, body=None, headers=None): 

232 # Update the inner socket's timeout value to send the request. 

233 # This only triggers if the connection is re-used. 

234 if getattr(self, "sock", None) is not None: 

235 self.sock.settimeout(self.timeout) 

236 

237 if headers is None: 

238 headers = {} 

239 else: 

240 # Avoid modifying the headers passed into .request() 

241 headers = headers.copy() 

242 if "user-agent" not in (six.ensure_str(k.lower()) for k in headers): 

243 headers["User-Agent"] = _get_default_user_agent() 

244 super(HTTPConnection, self).request(method, url, body=body, headers=headers) 

245 

246 def request_chunked(self, method, url, body=None, headers=None): 

247 """ 

248 Alternative to the common request method, which sends the 

249 body with chunked encoding and not as one block 

250 """ 

251 headers = headers or {} 

252 header_keys = set([six.ensure_str(k.lower()) for k in headers]) 

253 skip_accept_encoding = "accept-encoding" in header_keys 

254 skip_host = "host" in header_keys 

255 self.putrequest( 

256 method, url, skip_accept_encoding=skip_accept_encoding, skip_host=skip_host 

257 ) 

258 if "user-agent" not in header_keys: 

259 self.putheader("User-Agent", _get_default_user_agent()) 

260 for header, value in headers.items(): 

261 self.putheader(header, value) 

262 if "transfer-encoding" not in header_keys: 

263 self.putheader("Transfer-Encoding", "chunked") 

264 self.endheaders() 

265 

266 if body is not None: 

267 stringish_types = six.string_types + (bytes,) 

268 if isinstance(body, stringish_types): 

269 body = (body,) 

270 for chunk in body: 

271 if not chunk: 

272 continue 

273 if not isinstance(chunk, bytes): 

274 chunk = chunk.encode("utf8") 

275 len_str = hex(len(chunk))[2:] 

276 to_send = bytearray(len_str.encode()) 

277 to_send += b"\r\n" 

278 to_send += chunk 

279 to_send += b"\r\n" 

280 self.send(to_send) 

281 

282 # After the if clause, to always have a closed body 

283 self.send(b"0\r\n\r\n") 

284 

285 

286class HTTPSConnection(HTTPConnection): 

287 """ 

288 Many of the parameters to this constructor are passed to the underlying SSL 

289 socket by means of :py:func:`urllib3.util.ssl_wrap_socket`. 

290 """ 

291 

292 default_port = port_by_scheme["https"] 

293 

294 cert_reqs = None 

295 ca_certs = None 

296 ca_cert_dir = None 

297 ca_cert_data = None 

298 ssl_version = None 

299 assert_fingerprint = None 

300 tls_in_tls_required = False 

301 

302 def __init__( 

303 self, 

304 host, 

305 port=None, 

306 key_file=None, 

307 cert_file=None, 

308 key_password=None, 

309 strict=None, 

310 timeout=socket._GLOBAL_DEFAULT_TIMEOUT, 

311 ssl_context=None, 

312 server_hostname=None, 

313 **kw 

314 ): 

315 

316 HTTPConnection.__init__(self, host, port, strict=strict, timeout=timeout, **kw) 

317 

318 self.key_file = key_file 

319 self.cert_file = cert_file 

320 self.key_password = key_password 

321 self.ssl_context = ssl_context 

322 self.server_hostname = server_hostname 

323 

324 # Required property for Google AppEngine 1.9.0 which otherwise causes 

325 # HTTPS requests to go out as HTTP. (See Issue #356) 

326 self._protocol = "https" 

327 

328 def set_cert( 

329 self, 

330 key_file=None, 

331 cert_file=None, 

332 cert_reqs=None, 

333 key_password=None, 

334 ca_certs=None, 

335 assert_hostname=None, 

336 assert_fingerprint=None, 

337 ca_cert_dir=None, 

338 ca_cert_data=None, 

339 ): 

340 """ 

341 This method should only be called once, before the connection is used. 

342 """ 

343 # If cert_reqs is not provided we'll assume CERT_REQUIRED unless we also 

344 # have an SSLContext object in which case we'll use its verify_mode. 

345 if cert_reqs is None: 

346 if self.ssl_context is not None: 

347 cert_reqs = self.ssl_context.verify_mode 

348 else: 

349 cert_reqs = resolve_cert_reqs(None) 

350 

351 self.key_file = key_file 

352 self.cert_file = cert_file 

353 self.cert_reqs = cert_reqs 

354 self.key_password = key_password 

355 self.assert_hostname = assert_hostname 

356 self.assert_fingerprint = assert_fingerprint 

357 self.ca_certs = ca_certs and os.path.expanduser(ca_certs) 

358 self.ca_cert_dir = ca_cert_dir and os.path.expanduser(ca_cert_dir) 

359 self.ca_cert_data = ca_cert_data 

360 

361 def connect(self): 

362 # Add certificate verification 

363 self.sock = conn = self._new_conn() 

364 hostname = self.host 

365 tls_in_tls = False 

366 

367 if self._is_using_tunnel(): 

368 if self.tls_in_tls_required: 

369 self.sock = conn = self._connect_tls_proxy(hostname, conn) 

370 tls_in_tls = True 

371 

372 # Calls self._set_hostport(), so self.host is 

373 # self._tunnel_host below. 

374 self._tunnel() 

375 # Mark this connection as not reusable 

376 self.auto_open = 0 

377 

378 # Override the host with the one we're requesting data from. 

379 hostname = self._tunnel_host 

380 

381 server_hostname = hostname 

382 if self.server_hostname is not None: 

383 server_hostname = self.server_hostname 

384 

385 is_time_off = datetime.date.today() < RECENT_DATE 

386 if is_time_off: 

387 warnings.warn( 

388 ( 

389 "System time is way off (before {0}). This will probably " 

390 "lead to SSL verification errors" 

391 ).format(RECENT_DATE), 

392 SystemTimeWarning, 

393 ) 

394 

395 # Wrap socket using verification with the root certs in 

396 # trusted_root_certs 

397 default_ssl_context = False 

398 if self.ssl_context is None: 

399 default_ssl_context = True 

400 self.ssl_context = create_urllib3_context( 

401 ssl_version=resolve_ssl_version(self.ssl_version), 

402 cert_reqs=resolve_cert_reqs(self.cert_reqs), 

403 ) 

404 

405 context = self.ssl_context 

406 context.verify_mode = resolve_cert_reqs(self.cert_reqs) 

407 

408 # Try to load OS default certs if none are given. 

409 # Works well on Windows (requires Python3.4+) 

410 if ( 

411 not self.ca_certs 

412 and not self.ca_cert_dir 

413 and not self.ca_cert_data 

414 and default_ssl_context 

415 and hasattr(context, "load_default_certs") 

416 ): 

417 context.load_default_certs() 

418 

419 self.sock = ssl_wrap_socket( 

420 sock=conn, 

421 keyfile=self.key_file, 

422 certfile=self.cert_file, 

423 key_password=self.key_password, 

424 ca_certs=self.ca_certs, 

425 ca_cert_dir=self.ca_cert_dir, 

426 ca_cert_data=self.ca_cert_data, 

427 server_hostname=server_hostname, 

428 ssl_context=context, 

429 tls_in_tls=tls_in_tls, 

430 ) 

431 

432 # If we're using all defaults and the connection 

433 # is TLSv1 or TLSv1.1 we throw a DeprecationWarning 

434 # for the host. 

435 if ( 

436 default_ssl_context 

437 and self.ssl_version is None 

438 and hasattr(self.sock, "version") 

439 and self.sock.version() in {"TLSv1", "TLSv1.1"} 

440 ): 

441 warnings.warn( 

442 "Negotiating TLSv1/TLSv1.1 by default is deprecated " 

443 "and will be disabled in urllib3 v2.0.0. Connecting to " 

444 "'%s' with '%s' can be enabled by explicitly opting-in " 

445 "with 'ssl_version'" % (self.host, self.sock.version()), 

446 DeprecationWarning, 

447 ) 

448 

449 if self.assert_fingerprint: 

450 assert_fingerprint( 

451 self.sock.getpeercert(binary_form=True), self.assert_fingerprint 

452 ) 

453 elif ( 

454 context.verify_mode != ssl.CERT_NONE 

455 and not getattr(context, "check_hostname", False) 

456 and self.assert_hostname is not False 

457 ): 

458 # While urllib3 attempts to always turn off hostname matching from 

459 # the TLS library, this cannot always be done. So we check whether 

460 # the TLS Library still thinks it's matching hostnames. 

461 cert = self.sock.getpeercert() 

462 if not cert.get("subjectAltName", ()): 

463 warnings.warn( 

464 ( 

465 "Certificate for {0} has no `subjectAltName`, falling back to check for a " 

466 "`commonName` for now. This feature is being removed by major browsers and " 

467 "deprecated by RFC 2818. (See https://github.com/urllib3/urllib3/issues/497 " 

468 "for details.)".format(hostname) 

469 ), 

470 SubjectAltNameWarning, 

471 ) 

472 _match_hostname(cert, self.assert_hostname or server_hostname) 

473 

474 self.is_verified = ( 

475 context.verify_mode == ssl.CERT_REQUIRED 

476 or self.assert_fingerprint is not None 

477 ) 

478 

479 def _connect_tls_proxy(self, hostname, conn): 

480 """ 

481 Establish a TLS connection to the proxy using the provided SSL context. 

482 """ 

483 proxy_config = self.proxy_config 

484 ssl_context = proxy_config.ssl_context 

485 if ssl_context: 

486 # If the user provided a proxy context, we assume CA and client 

487 # certificates have already been set 

488 return ssl_wrap_socket( 

489 sock=conn, 

490 server_hostname=hostname, 

491 ssl_context=ssl_context, 

492 ) 

493 

494 ssl_context = create_proxy_ssl_context( 

495 self.ssl_version, 

496 self.cert_reqs, 

497 self.ca_certs, 

498 self.ca_cert_dir, 

499 self.ca_cert_data, 

500 ) 

501 

502 # If no cert was provided, use only the default options for server 

503 # certificate validation 

504 socket = ssl_wrap_socket( 

505 sock=conn, 

506 ca_certs=self.ca_certs, 

507 ca_cert_dir=self.ca_cert_dir, 

508 ca_cert_data=self.ca_cert_data, 

509 server_hostname=hostname, 

510 ssl_context=ssl_context, 

511 ) 

512 

513 if ssl_context.verify_mode != ssl.CERT_NONE and not getattr( 

514 ssl_context, "check_hostname", False 

515 ): 

516 # While urllib3 attempts to always turn off hostname matching from 

517 # the TLS library, this cannot always be done. So we check whether 

518 # the TLS Library still thinks it's matching hostnames. 

519 cert = socket.getpeercert() 

520 if not cert.get("subjectAltName", ()): 

521 warnings.warn( 

522 ( 

523 "Certificate for {0} has no `subjectAltName`, falling back to check for a " 

524 "`commonName` for now. This feature is being removed by major browsers and " 

525 "deprecated by RFC 2818. (See https://github.com/urllib3/urllib3/issues/497 " 

526 "for details.)".format(hostname) 

527 ), 

528 SubjectAltNameWarning, 

529 ) 

530 _match_hostname(cert, hostname) 

531 

532 self.proxy_is_verified = ssl_context.verify_mode == ssl.CERT_REQUIRED 

533 return socket 

534 

535 

536def _match_hostname(cert, asserted_hostname): 

537 # Our upstream implementation of ssl.match_hostname() 

538 # only applies this normalization to IP addresses so it doesn't 

539 # match DNS SANs so we do the same thing! 

540 stripped_hostname = asserted_hostname.strip("u[]") 

541 if is_ipaddress(stripped_hostname): 

542 asserted_hostname = stripped_hostname 

543 

544 try: 

545 match_hostname(cert, asserted_hostname) 

546 except CertificateError as e: 

547 log.warning( 

548 "Certificate did not match expected hostname: %s. Certificate: %s", 

549 asserted_hostname, 

550 cert, 

551 ) 

552 # Add cert to exception and reraise so client code can inspect 

553 # the cert when catching the exception, if they want to 

554 e._peer_cert = cert 

555 raise 

556 

557 

558def _get_default_user_agent(): 

559 return "python-urllib3/%s" % __version__ 

560 

561 

562class DummyConnection(object): 

563 """Used to detect a failed ConnectionCls import.""" 

564 

565 pass 

566 

567 

568if not ssl: 

569 HTTPSConnection = DummyConnection # noqa: F811 

570 

571 

572VerifiedHTTPSConnection = HTTPSConnection