Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 17%

670 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2import dataclasses 

3import functools 

4import logging 

5import random 

6import sys 

7import traceback 

8import warnings 

9from collections import defaultdict, deque 

10from contextlib import suppress 

11from http import HTTPStatus 

12from http.cookies import SimpleCookie 

13from itertools import cycle, islice 

14from time import monotonic 

15from types import TracebackType 

16from typing import ( # noqa 

17 TYPE_CHECKING, 

18 Any, 

19 Awaitable, 

20 Callable, 

21 DefaultDict, 

22 Dict, 

23 Iterator, 

24 List, 

25 Optional, 

26 Set, 

27 Tuple, 

28 Type, 

29 Union, 

30 cast, 

31) 

32 

33from . import hdrs, helpers 

34from .abc import AbstractResolver 

35from .client_exceptions import ( 

36 ClientConnectionError, 

37 ClientConnectorCertificateError, 

38 ClientConnectorError, 

39 ClientConnectorSSLError, 

40 ClientHttpProxyError, 

41 ClientProxyConnectionError, 

42 ServerFingerprintMismatch, 

43 UnixClientConnectorError, 

44 cert_errors, 

45 ssl_errors, 

46) 

47from .client_proto import ResponseHandler 

48from .client_reqrep import SSL_ALLOWED_TYPES, ClientRequest, Fingerprint 

49from .helpers import _SENTINEL, ceil_timeout, is_ip_address, sentinel, set_result 

50from .locks import EventResultOrError 

51from .resolver import DefaultResolver 

52 

53try: 

54 import ssl 

55 

56 SSLContext = ssl.SSLContext 

57except ImportError: # pragma: no cover 

58 ssl = None # type: ignore[assignment] 

59 SSLContext = object # type: ignore[misc,assignment] 

60 

61 

62__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector") 

63 

64 

65if TYPE_CHECKING: # pragma: no cover 

66 from .client import ClientTimeout 

67 from .client_reqrep import ConnectionKey 

68 from .tracing import Trace 

69 

70 

71class Connection: 

72 _source_traceback = None 

73 _transport = None 

74 

75 def __init__( 

76 self, 

77 connector: "BaseConnector", 

78 key: "ConnectionKey", 

79 protocol: ResponseHandler, 

80 loop: asyncio.AbstractEventLoop, 

81 ) -> None: 

82 self._key = key 

83 self._connector = connector 

84 self._loop = loop 

85 self._protocol: Optional[ResponseHandler] = protocol 

86 self._callbacks: List[Callable[[], None]] = [] 

87 

88 if loop.get_debug(): 

89 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

90 

91 def __repr__(self) -> str: 

92 return f"Connection<{self._key}>" 

93 

94 def __del__(self, _warnings: Any = warnings) -> None: 

95 if self._protocol is not None: 

96 _warnings.warn( 

97 f"Unclosed connection {self!r}", ResourceWarning, source=self 

98 ) 

99 if self._loop.is_closed(): 

100 return 

101 

102 self._connector._release(self._key, self._protocol, should_close=True) 

103 

104 context = {"client_connection": self, "message": "Unclosed connection"} 

105 if self._source_traceback is not None: 

106 context["source_traceback"] = self._source_traceback 

107 self._loop.call_exception_handler(context) 

108 

109 @property 

110 def transport(self) -> Optional[asyncio.Transport]: 

111 if self._protocol is None: 

112 return None 

113 return self._protocol.transport 

114 

115 @property 

116 def protocol(self) -> Optional[ResponseHandler]: 

117 return self._protocol 

118 

119 def add_callback(self, callback: Callable[[], None]) -> None: 

120 if callback is not None: 

121 self._callbacks.append(callback) 

122 

123 def _notify_release(self) -> None: 

124 callbacks, self._callbacks = self._callbacks[:], [] 

125 

126 for cb in callbacks: 

127 with suppress(Exception): 

128 cb() 

129 

130 def close(self) -> None: 

131 self._notify_release() 

132 

133 if self._protocol is not None: 

134 self._connector._release(self._key, self._protocol, should_close=True) 

135 self._protocol = None 

136 

137 def release(self) -> None: 

138 self._notify_release() 

139 

140 if self._protocol is not None: 

141 self._connector._release( 

142 self._key, self._protocol, should_close=self._protocol.should_close 

143 ) 

144 self._protocol = None 

145 

146 @property 

147 def closed(self) -> bool: 

148 return self._protocol is None or not self._protocol.is_connected() 

149 

150 

151class _TransportPlaceholder: 

152 """placeholder for BaseConnector.connect function""" 

153 

154 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

155 fut = loop.create_future() 

156 fut.set_result(None) 

157 self.closed: asyncio.Future[Optional[Exception]] = fut 

158 

159 def close(self) -> None: 

160 pass 

161 

162 

163class BaseConnector: 

164 """Base connector class. 

165 

166 keepalive_timeout - (optional) Keep-alive timeout. 

167 force_close - Set to True to force close and do reconnect 

168 after each request (and between redirects). 

169 limit - The total number of simultaneous connections. 

170 limit_per_host - Number of simultaneous connections to one host. 

171 enable_cleanup_closed - Enables clean-up closed ssl transports. 

172 Disabled by default. 

173 timeout_ceil_threshold - Trigger ceiling of timeout values when 

174 it's above timeout_ceil_threshold. 

175 loop - Optional event loop. 

176 """ 

177 

178 _closed = True # prevent AttributeError in __del__ if ctor was failed 

179 _source_traceback = None 

180 

181 # abort transport after 2 seconds (cleanup broken connections) 

182 _cleanup_closed_period = 2.0 

183 

184 def __init__( 

185 self, 

186 *, 

187 keepalive_timeout: Union[_SENTINEL, None, float] = sentinel, 

188 force_close: bool = False, 

189 limit: int = 100, 

190 limit_per_host: int = 0, 

191 enable_cleanup_closed: bool = False, 

192 timeout_ceil_threshold: float = 5, 

193 ) -> None: 

194 if force_close: 

195 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

196 raise ValueError( 

197 "keepalive_timeout cannot " "be set if force_close is True" 

198 ) 

199 else: 

200 if keepalive_timeout is sentinel: 

201 keepalive_timeout = 15.0 

202 

203 self._timeout_ceil_threshold = timeout_ceil_threshold 

204 

205 loop = asyncio.get_running_loop() 

206 

207 self._closed = False 

208 if loop.get_debug(): 

209 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

210 

211 self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {} 

212 self._limit = limit 

213 self._limit_per_host = limit_per_host 

214 self._acquired: Set[ResponseHandler] = set() 

215 self._acquired_per_host: DefaultDict[ 

216 ConnectionKey, Set[ResponseHandler] 

217 ] = defaultdict(set) 

218 self._keepalive_timeout = cast(float, keepalive_timeout) 

219 self._force_close = force_close 

220 

221 # {host_key: FIFO list of waiters} 

222 self._waiters = defaultdict(deque) # type: ignore[var-annotated] 

223 

224 self._loop = loop 

225 self._factory = functools.partial(ResponseHandler, loop=loop) 

226 

227 self.cookies: SimpleCookie[str] = SimpleCookie() 

228 

229 # start keep-alive connection cleanup task 

230 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

231 

232 # start cleanup closed transports task 

233 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

234 self._cleanup_closed_disabled = not enable_cleanup_closed 

235 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

236 self._cleanup_closed() 

237 

238 def __del__(self, _warnings: Any = warnings) -> None: 

239 if self._closed: 

240 return 

241 if not self._conns: 

242 return 

243 

244 conns = [repr(c) for c in self._conns.values()] 

245 

246 self._close_immediately() 

247 

248 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self) 

249 context = { 

250 "connector": self, 

251 "connections": conns, 

252 "message": "Unclosed connector", 

253 } 

254 if self._source_traceback is not None: 

255 context["source_traceback"] = self._source_traceback 

256 self._loop.call_exception_handler(context) 

257 

258 async def __aenter__(self) -> "BaseConnector": 

259 return self 

260 

261 async def __aexit__( 

262 self, 

263 exc_type: Optional[Type[BaseException]] = None, 

264 exc_value: Optional[BaseException] = None, 

265 exc_traceback: Optional[TracebackType] = None, 

266 ) -> None: 

267 await self.close() 

268 

269 @property 

270 def force_close(self) -> bool: 

271 """Ultimately close connection on releasing if True.""" 

272 return self._force_close 

273 

274 @property 

275 def limit(self) -> int: 

276 """The total number for simultaneous connections. 

277 

278 If limit is 0 the connector has no limit. 

279 The default limit size is 100. 

280 """ 

281 return self._limit 

282 

283 @property 

284 def limit_per_host(self) -> int: 

285 """The limit for simultaneous connections to the same endpoint. 

286 

287 Endpoints are the same if they are have equal 

288 (host, port, is_ssl) triple. 

289 """ 

290 return self._limit_per_host 

291 

292 def _cleanup(self) -> None: 

293 """Cleanup unused transports.""" 

294 if self._cleanup_handle: 

295 self._cleanup_handle.cancel() 

296 # _cleanup_handle should be unset, otherwise _release() will not 

297 # recreate it ever! 

298 self._cleanup_handle = None 

299 

300 now = self._loop.time() 

301 timeout = self._keepalive_timeout 

302 

303 if self._conns: 

304 connections = {} 

305 deadline = now - timeout 

306 for key, conns in self._conns.items(): 

307 alive = [] 

308 for proto, use_time in conns: 

309 if proto.is_connected(): 

310 if use_time - deadline < 0: 

311 transport = proto.transport 

312 proto.close() 

313 if key.is_ssl and not self._cleanup_closed_disabled: 

314 self._cleanup_closed_transports.append(transport) 

315 else: 

316 alive.append((proto, use_time)) 

317 else: 

318 transport = proto.transport 

319 proto.close() 

320 if key.is_ssl and not self._cleanup_closed_disabled: 

321 self._cleanup_closed_transports.append(transport) 

322 

323 if alive: 

324 connections[key] = alive 

325 

326 self._conns = connections 

327 

328 if self._conns: 

329 self._cleanup_handle = helpers.weakref_handle( 

330 self, 

331 "_cleanup", 

332 timeout, 

333 self._loop, 

334 timeout_ceil_threshold=self._timeout_ceil_threshold, 

335 ) 

336 

337 def _drop_acquired_per_host( 

338 self, key: "ConnectionKey", val: ResponseHandler 

339 ) -> None: 

340 acquired_per_host = self._acquired_per_host 

341 if key not in acquired_per_host: 

342 return 

343 conns = acquired_per_host[key] 

344 conns.remove(val) 

345 if not conns: 

346 del self._acquired_per_host[key] 

347 

348 def _cleanup_closed(self) -> None: 

349 """Double confirmation for transport close. 

350 

351 Some broken ssl servers may leave socket open without proper close. 

352 """ 

353 if self._cleanup_closed_handle: 

354 self._cleanup_closed_handle.cancel() 

355 

356 for transport in self._cleanup_closed_transports: 

357 if transport is not None: 

358 transport.abort() 

359 

360 self._cleanup_closed_transports = [] 

361 

362 if not self._cleanup_closed_disabled: 

363 self._cleanup_closed_handle = helpers.weakref_handle( 

364 self, 

365 "_cleanup_closed", 

366 self._cleanup_closed_period, 

367 self._loop, 

368 timeout_ceil_threshold=self._timeout_ceil_threshold, 

369 ) 

370 

371 async def close(self) -> None: 

372 """Close all opened transports.""" 

373 waiters = self._close_immediately() 

374 if waiters: 

375 results = await asyncio.gather(*waiters, return_exceptions=True) 

376 for res in results: 

377 if isinstance(res, Exception): 

378 err_msg = "Error while closing connector: " + repr(res) 

379 logging.error(err_msg) 

380 

381 def _close_immediately(self) -> List["asyncio.Future[None]"]: 

382 waiters: List["asyncio.Future[None]"] = [] 

383 

384 if self._closed: 

385 return waiters 

386 

387 self._closed = True 

388 

389 try: 

390 if self._loop.is_closed(): 

391 return waiters 

392 

393 # cancel cleanup task 

394 if self._cleanup_handle: 

395 self._cleanup_handle.cancel() 

396 

397 # cancel cleanup close task 

398 if self._cleanup_closed_handle: 

399 self._cleanup_closed_handle.cancel() 

400 

401 for data in self._conns.values(): 

402 for proto, t0 in data: 

403 proto.close() 

404 waiters.append(proto.closed) 

405 

406 for proto in self._acquired: 

407 proto.close() 

408 waiters.append(proto.closed) 

409 

410 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures 

411 for transport in self._cleanup_closed_transports: 

412 if transport is not None: 

413 transport.abort() 

414 

415 return waiters 

416 

417 finally: 

418 self._conns.clear() 

419 self._acquired.clear() 

420 self._waiters.clear() 

421 self._cleanup_handle = None 

422 self._cleanup_closed_transports.clear() 

423 self._cleanup_closed_handle = None 

424 

425 @property 

426 def closed(self) -> bool: 

427 """Is connector closed. 

428 

429 A readonly property. 

430 """ 

431 return self._closed 

432 

433 def _available_connections(self, key: "ConnectionKey") -> int: 

434 """ 

435 Return number of available connections. 

436 

437 The limit, limit_per_host and the connection key are taken into account. 

438 

439 If it returns less than 1 means that there are no connections 

440 available. 

441 """ 

442 if self._limit: 

443 # total calc available connections 

444 available = self._limit - len(self._acquired) 

445 

446 # check limit per host 

447 if ( 

448 self._limit_per_host 

449 and available > 0 

450 and key in self._acquired_per_host 

451 ): 

452 acquired = self._acquired_per_host.get(key) 

453 assert acquired is not None 

454 available = self._limit_per_host - len(acquired) 

455 

456 elif self._limit_per_host and key in self._acquired_per_host: 

457 # check limit per host 

458 acquired = self._acquired_per_host.get(key) 

459 assert acquired is not None 

460 available = self._limit_per_host - len(acquired) 

461 else: 

462 available = 1 

463 

464 return available 

465 

466 async def connect( 

467 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

468 ) -> Connection: 

469 """Get from pool or create new connection.""" 

470 key = req.connection_key 

471 available = self._available_connections(key) 

472 

473 # Wait if there are no available connections or if there are/were 

474 # waiters (i.e. don't steal connection from a waiter about to wake up) 

475 if available <= 0 or key in self._waiters: 

476 fut = self._loop.create_future() 

477 

478 # This connection will now count towards the limit. 

479 self._waiters[key].append(fut) 

480 

481 if traces: 

482 for trace in traces: 

483 await trace.send_connection_queued_start() 

484 

485 try: 

486 await fut 

487 except BaseException as e: 

488 if key in self._waiters: 

489 # remove a waiter even if it was cancelled, normally it's 

490 # removed when it's notified 

491 try: 

492 self._waiters[key].remove(fut) 

493 except ValueError: # fut may no longer be in list 

494 pass 

495 

496 raise e 

497 finally: 

498 if key in self._waiters and not self._waiters[key]: 

499 del self._waiters[key] 

500 

501 if traces: 

502 for trace in traces: 

503 await trace.send_connection_queued_end() 

504 

505 proto = self._get(key) 

506 if proto is None: 

507 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) 

508 self._acquired.add(placeholder) 

509 self._acquired_per_host[key].add(placeholder) 

510 

511 if traces: 

512 for trace in traces: 

513 await trace.send_connection_create_start() 

514 

515 try: 

516 proto = await self._create_connection(req, traces, timeout) 

517 if self._closed: 

518 proto.close() 

519 raise ClientConnectionError("Connector is closed.") 

520 except BaseException: 

521 if not self._closed: 

522 self._acquired.remove(placeholder) 

523 self._drop_acquired_per_host(key, placeholder) 

524 self._release_waiter() 

525 raise 

526 else: 

527 if not self._closed: 

528 self._acquired.remove(placeholder) 

529 self._drop_acquired_per_host(key, placeholder) 

530 

531 if traces: 

532 for trace in traces: 

533 await trace.send_connection_create_end() 

534 else: 

535 if traces: 

536 # Acquire the connection to prevent race conditions with limits 

537 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) 

538 self._acquired.add(placeholder) 

539 self._acquired_per_host[key].add(placeholder) 

540 for trace in traces: 

541 await trace.send_connection_reuseconn() 

542 self._acquired.remove(placeholder) 

543 self._drop_acquired_per_host(key, placeholder) 

544 

545 self._acquired.add(proto) 

546 self._acquired_per_host[key].add(proto) 

547 return Connection(self, key, proto, self._loop) 

548 

549 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]: 

550 try: 

551 conns = self._conns[key] 

552 except KeyError: 

553 return None 

554 

555 t1 = self._loop.time() 

556 while conns: 

557 proto, t0 = conns.pop() 

558 if proto.is_connected(): 

559 if t1 - t0 > self._keepalive_timeout: 

560 transport = proto.transport 

561 proto.close() 

562 # only for SSL transports 

563 if key.is_ssl and not self._cleanup_closed_disabled: 

564 self._cleanup_closed_transports.append(transport) 

565 else: 

566 if not conns: 

567 # The very last connection was reclaimed: drop the key 

568 del self._conns[key] 

569 return proto 

570 else: 

571 transport = proto.transport 

572 proto.close() 

573 if key.is_ssl and not self._cleanup_closed_disabled: 

574 self._cleanup_closed_transports.append(transport) 

575 

576 # No more connections: drop the key 

577 del self._conns[key] 

578 return None 

579 

580 def _release_waiter(self) -> None: 

581 """ 

582 Iterates over all waiters until one to be released is found. 

583 

584 The one to be released is not finished and 

585 belongs to a host that has available connections. 

586 """ 

587 if not self._waiters: 

588 return 

589 

590 # Having the dict keys ordered this avoids to iterate 

591 # at the same order at each call. 

592 queues = list(self._waiters.keys()) 

593 random.shuffle(queues) 

594 

595 for key in queues: 

596 if self._available_connections(key) < 1: 

597 continue 

598 

599 waiters = self._waiters[key] 

600 while waiters: 

601 waiter = waiters.popleft() 

602 if not waiter.done(): 

603 waiter.set_result(None) 

604 return 

605 

606 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

607 if self._closed: 

608 # acquired connection is already released on connector closing 

609 return 

610 

611 try: 

612 self._acquired.remove(proto) 

613 self._drop_acquired_per_host(key, proto) 

614 except KeyError: # pragma: no cover 

615 # this may be result of undetermenistic order of objects 

616 # finalization due garbage collection. 

617 pass 

618 else: 

619 self._release_waiter() 

620 

621 def _release( 

622 self, 

623 key: "ConnectionKey", 

624 protocol: ResponseHandler, 

625 *, 

626 should_close: bool = False, 

627 ) -> None: 

628 if self._closed: 

629 # acquired connection is already released on connector closing 

630 return 

631 

632 self._release_acquired(key, protocol) 

633 

634 if self._force_close: 

635 should_close = True 

636 

637 if should_close or protocol.should_close: 

638 transport = protocol.transport 

639 protocol.close() 

640 # TODO: Remove once fixed: https://bugs.python.org/issue39951 

641 # See PR #6321 

642 set_result(protocol.closed, None) 

643 

644 if key.is_ssl and not self._cleanup_closed_disabled: 

645 self._cleanup_closed_transports.append(transport) 

646 else: 

647 conns = self._conns.get(key) 

648 if conns is None: 

649 conns = self._conns[key] = [] 

650 conns.append((protocol, self._loop.time())) 

651 

652 if self._cleanup_handle is None: 

653 self._cleanup_handle = helpers.weakref_handle( 

654 self, 

655 "_cleanup", 

656 self._keepalive_timeout, 

657 self._loop, 

658 timeout_ceil_threshold=self._timeout_ceil_threshold, 

659 ) 

660 

661 async def _create_connection( 

662 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

663 ) -> ResponseHandler: 

664 raise NotImplementedError() 

665 

666 

667class _DNSCacheTable: 

668 def __init__(self, ttl: Optional[float] = None) -> None: 

669 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] = {} 

670 self._timestamps: Dict[Tuple[str, int], float] = {} 

671 self._ttl = ttl 

672 

673 def __contains__(self, host: object) -> bool: 

674 return host in self._addrs_rr 

675 

676 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None: 

677 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

678 

679 if self._ttl is not None: 

680 self._timestamps[key] = monotonic() 

681 

682 def remove(self, key: Tuple[str, int]) -> None: 

683 self._addrs_rr.pop(key, None) 

684 

685 if self._ttl is not None: 

686 self._timestamps.pop(key, None) 

687 

688 def clear(self) -> None: 

689 self._addrs_rr.clear() 

690 self._timestamps.clear() 

691 

692 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]: 

693 loop, length = self._addrs_rr[key] 

694 addrs = list(islice(loop, length)) 

695 # Consume one more element to shift internal state of `cycle` 

696 next(loop) 

697 return addrs 

698 

699 def expired(self, key: Tuple[str, int]) -> bool: 

700 if self._ttl is None: 

701 return False 

702 

703 return self._timestamps[key] + self._ttl < monotonic() 

704 

705 

706class TCPConnector(BaseConnector): 

707 """TCP connector. 

708 

709 verify_ssl - Set to True to check ssl certifications. 

710 fingerprint - Pass the binary sha256 

711 digest of the expected certificate in DER format to verify 

712 that the certificate the server presents matches. See also 

713 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning 

714 resolver - Enable DNS lookups and use this 

715 resolver 

716 use_dns_cache - Use memory cache for DNS lookups. 

717 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

718 family - socket address family 

719 local_addr - local tuple of (host, port) to bind socket to 

720 

721 keepalive_timeout - (optional) Keep-alive timeout. 

722 force_close - Set to True to force close and do reconnect 

723 after each request (and between redirects). 

724 limit - The total number of simultaneous connections. 

725 limit_per_host - Number of simultaneous connections to one host. 

726 enable_cleanup_closed - Enables clean-up closed ssl transports. 

727 Disabled by default. 

728 loop - Optional event loop. 

729 """ 

730 

731 def __init__( 

732 self, 

733 *, 

734 use_dns_cache: bool = True, 

735 ttl_dns_cache: Optional[int] = 10, 

736 family: int = 0, 

737 ssl: Union[None, bool, Fingerprint, SSLContext] = None, 

738 local_addr: Optional[Tuple[str, int]] = None, 

739 resolver: Optional[AbstractResolver] = None, 

740 keepalive_timeout: Union[None, float, _SENTINEL] = sentinel, 

741 force_close: bool = False, 

742 limit: int = 100, 

743 limit_per_host: int = 0, 

744 enable_cleanup_closed: bool = False, 

745 timeout_ceil_threshold: float = 5, 

746 ) -> None: 

747 super().__init__( 

748 keepalive_timeout=keepalive_timeout, 

749 force_close=force_close, 

750 limit=limit, 

751 limit_per_host=limit_per_host, 

752 enable_cleanup_closed=enable_cleanup_closed, 

753 timeout_ceil_threshold=timeout_ceil_threshold, 

754 ) 

755 

756 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

757 raise TypeError( 

758 "ssl should be SSLContext, bool, Fingerprint, " 

759 "or None, got {!r} instead.".format(ssl) 

760 ) 

761 self._ssl = ssl 

762 if resolver is None: 

763 resolver = DefaultResolver() 

764 self._resolver: AbstractResolver = resolver 

765 

766 self._use_dns_cache = use_dns_cache 

767 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 

768 self._throttle_dns_events: Dict[Tuple[str, int], EventResultOrError] = {} 

769 self._family = family 

770 self._local_addr = local_addr 

771 

772 def _close_immediately(self) -> List["asyncio.Future[None]"]: 

773 for ev in self._throttle_dns_events.values(): 

774 ev.cancel() 

775 return super()._close_immediately() 

776 

777 @property 

778 def family(self) -> int: 

779 """Socket family like AF_INET.""" 

780 return self._family 

781 

782 @property 

783 def use_dns_cache(self) -> bool: 

784 """True if local DNS caching is enabled.""" 

785 return self._use_dns_cache 

786 

787 def clear_dns_cache( 

788 self, host: Optional[str] = None, port: Optional[int] = None 

789 ) -> None: 

790 """Remove specified host/port or clear all dns local cache.""" 

791 if host is not None and port is not None: 

792 self._cached_hosts.remove((host, port)) 

793 elif host is not None or port is not None: 

794 raise ValueError("either both host and port " "or none of them are allowed") 

795 else: 

796 self._cached_hosts.clear() 

797 

798 async def _resolve_host( 

799 self, host: str, port: int, traces: Optional[List["Trace"]] = None 

800 ) -> List[Dict[str, Any]]: 

801 if is_ip_address(host): 

802 return [ 

803 { 

804 "hostname": host, 

805 "host": host, 

806 "port": port, 

807 "family": self._family, 

808 "proto": 0, 

809 "flags": 0, 

810 } 

811 ] 

812 

813 if not self._use_dns_cache: 

814 if traces: 

815 for trace in traces: 

816 await trace.send_dns_resolvehost_start(host) 

817 

818 res = await self._resolver.resolve(host, port, family=self._family) 

819 

820 if traces: 

821 for trace in traces: 

822 await trace.send_dns_resolvehost_end(host) 

823 

824 return res 

825 

826 key = (host, port) 

827 

828 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)): 

829 # get result early, before any await (#4014) 

830 result = self._cached_hosts.next_addrs(key) 

831 

832 if traces: 

833 for trace in traces: 

834 await trace.send_dns_cache_hit(host) 

835 return result 

836 

837 if key in self._throttle_dns_events: 

838 # get event early, before any await (#4014) 

839 event = self._throttle_dns_events[key] 

840 if traces: 

841 for trace in traces: 

842 await trace.send_dns_cache_hit(host) 

843 await event.wait() 

844 else: 

845 # update dict early, before any await (#4014) 

846 self._throttle_dns_events[key] = EventResultOrError(self._loop) 

847 if traces: 

848 for trace in traces: 

849 await trace.send_dns_cache_miss(host) 

850 try: 

851 if traces: 

852 for trace in traces: 

853 await trace.send_dns_resolvehost_start(host) 

854 

855 addrs = await self._resolver.resolve(host, port, family=self._family) 

856 if traces: 

857 for trace in traces: 

858 await trace.send_dns_resolvehost_end(host) 

859 

860 self._cached_hosts.add(key, addrs) 

861 self._throttle_dns_events[key].set() 

862 except BaseException as e: 

863 # any DNS exception, independently of the implementation 

864 # is set for the waiters to raise the same exception. 

865 self._throttle_dns_events[key].set(exc=e) 

866 raise 

867 finally: 

868 self._throttle_dns_events.pop(key) 

869 

870 return self._cached_hosts.next_addrs(key) 

871 

872 async def _create_connection( 

873 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

874 ) -> ResponseHandler: 

875 """Create connection. 

876 

877 Has same keyword arguments as BaseEventLoop.create_connection. 

878 """ 

879 if req.proxy: 

880 _, proto = await self._create_proxy_connection(req, traces, timeout) 

881 else: 

882 _, proto = await self._create_direct_connection(req, traces, timeout) 

883 

884 return proto 

885 

886 @staticmethod 

887 @functools.lru_cache(None) 

888 def _make_ssl_context(verified: bool) -> SSLContext: 

889 if verified: 

890 return ssl.create_default_context() 

891 else: 

892 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

893 sslcontext.options |= ssl.OP_NO_SSLv2 

894 sslcontext.options |= ssl.OP_NO_SSLv3 

895 sslcontext.check_hostname = False 

896 sslcontext.verify_mode = ssl.CERT_NONE 

897 try: 

898 sslcontext.options |= ssl.OP_NO_COMPRESSION 

899 except AttributeError as attr_err: 

900 warnings.warn( 

901 "{!s}: The Python interpreter is compiled " 

902 "against OpenSSL < 1.0.0. Ref: " 

903 "https://docs.python.org/3/library/ssl.html" 

904 "#ssl.OP_NO_COMPRESSION".format(attr_err), 

905 ) 

906 sslcontext.set_default_verify_paths() 

907 return sslcontext 

908 

909 def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]: 

910 """Logic to get the correct SSL context 

911 

912 0. if req.ssl is false, return None 

913 

914 1. if ssl_context is specified in req, use it 

915 2. if _ssl_context is specified in self, use it 

916 3. otherwise: 

917 1. if verify_ssl is not specified in req, use self.ssl_context 

918 (will generate a default context according to self.verify_ssl) 

919 2. if verify_ssl is True in req, generate a default SSL context 

920 3. if verify_ssl is False in req, generate a SSL context that 

921 won't verify 

922 """ 

923 if req.is_ssl(): 

924 if ssl is None: # pragma: no cover 

925 raise RuntimeError("SSL is not supported.") 

926 sslcontext = req.ssl 

927 if isinstance(sslcontext, ssl.SSLContext): 

928 return sslcontext 

929 if sslcontext is not None: 

930 # not verified or fingerprinted 

931 return self._make_ssl_context(False) 

932 sslcontext = self._ssl 

933 if isinstance(sslcontext, ssl.SSLContext): 

934 return sslcontext 

935 if sslcontext is not None: 

936 # not verified or fingerprinted 

937 return self._make_ssl_context(False) 

938 return self._make_ssl_context(True) 

939 else: 

940 return None 

941 

942 def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]: 

943 ret = req.ssl 

944 if isinstance(ret, Fingerprint): 

945 return ret 

946 ret = self._ssl 

947 if isinstance(ret, Fingerprint): 

948 return ret 

949 return None 

950 

951 async def _wrap_create_connection( 

952 self, 

953 *args: Any, 

954 req: "ClientRequest", 

955 timeout: "ClientTimeout", 

956 client_error: Type[Exception] = ClientConnectorError, 

957 **kwargs: Any, 

958 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

959 try: 

960 async with ceil_timeout( 

961 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

962 ): 

963 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa 

964 except cert_errors as exc: 

965 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

966 except ssl_errors as exc: 

967 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

968 except OSError as exc: 

969 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

970 raise 

971 raise client_error(req.connection_key, exc) from exc 

972 

973 def _warn_about_tls_in_tls( 

974 self, 

975 underlying_transport: asyncio.Transport, 

976 req: "ClientRequest", 

977 ) -> None: 

978 """Issue a warning if the requested URL has HTTPS scheme.""" 

979 if req.request_info.url.scheme != "https": 

980 return 

981 

982 asyncio_supports_tls_in_tls = getattr( 

983 underlying_transport, 

984 "_start_tls_compatible", 

985 False, 

986 ) 

987 

988 if asyncio_supports_tls_in_tls: 

989 return 

990 

991 warnings.warn( 

992 "An HTTPS request is being sent through an HTTPS proxy. " 

993 "This support for TLS in TLS is known to be disabled " 

994 "in the stdlib asyncio. This is why you'll probably see " 

995 "an error in the log below.\n\n" 

996 "It is possible to enable it via monkeypatching under " 

997 "Python 3.7 or higher. For more details, see:\n" 

998 "* https://bugs.python.org/issue37179\n" 

999 "* https://github.com/python/cpython/pull/28073\n\n" 

1000 "You can temporarily patch this as follows:\n" 

1001 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1002 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1003 RuntimeWarning, 

1004 source=self, 

1005 # Why `4`? At least 3 of the calls in the stack originate 

1006 # from the methods in this class. 

1007 stacklevel=3, 

1008 ) 

1009 

1010 async def _start_tls_connection( 

1011 self, 

1012 underlying_transport: asyncio.Transport, 

1013 req: "ClientRequest", 

1014 timeout: "ClientTimeout", 

1015 client_error: Type[Exception] = ClientConnectorError, 

1016 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1017 """Wrap the raw TCP transport with TLS.""" 

1018 tls_proto = self._factory() # Create a brand new proto for TLS 

1019 

1020 # Safety of the `cast()` call here is based on the fact that 

1021 # internally `_get_ssl_context()` only returns `None` when 

1022 # `req.is_ssl()` evaluates to `False` which is never gonna happen 

1023 # in this code path. Of course, it's rather fragile 

1024 # maintainability-wise but this is to be solved separately. 

1025 sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req)) 

1026 

1027 try: 

1028 async with ceil_timeout( 

1029 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1030 ): 

1031 try: 

1032 tls_transport = await self._loop.start_tls( 

1033 underlying_transport, 

1034 tls_proto, 

1035 sslcontext, 

1036 server_hostname=req.host, 

1037 ssl_handshake_timeout=timeout.total, 

1038 ) 

1039 except BaseException: 

1040 # We need to close the underlying transport since 

1041 # `start_tls()` probably failed before it had a 

1042 # chance to do this: 

1043 underlying_transport.close() 

1044 raise 

1045 except cert_errors as exc: 

1046 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1047 except ssl_errors as exc: 

1048 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1049 except OSError as exc: 

1050 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1051 raise 

1052 raise client_error(req.connection_key, exc) from exc 

1053 except TypeError as type_err: 

1054 # Example cause looks like this: 

1055 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1056 # object at 0x7f760615e460> is not supported by start_tls() 

1057 

1058 raise ClientConnectionError( 

1059 "Cannot initialize a TLS-in-TLS connection to host " 

1060 f"{req.host!s}:{req.port:d} through an underlying connection " 

1061 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1062 f"[{type_err!s}]" 

1063 ) from type_err 

1064 else: 

1065 tls_proto.connection_made( 

1066 tls_transport 

1067 ) # Kick the state machine of the new TLS protocol 

1068 

1069 return tls_transport, tls_proto 

1070 

1071 async def _create_direct_connection( 

1072 self, 

1073 req: "ClientRequest", 

1074 traces: List["Trace"], 

1075 timeout: "ClientTimeout", 

1076 *, 

1077 client_error: Type[Exception] = ClientConnectorError, 

1078 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1079 sslcontext = self._get_ssl_context(req) 

1080 fingerprint = self._get_fingerprint(req) 

1081 

1082 host = req.url.raw_host 

1083 assert host is not None 

1084 port = req.port 

1085 assert port is not None 

1086 host_resolved = asyncio.ensure_future( 

1087 self._resolve_host(host, port, traces=traces), loop=self._loop 

1088 ) 

1089 try: 

1090 # Cancelling this lookup should not cancel the underlying lookup 

1091 # or else the cancel event will get broadcast to all the waiters 

1092 # across all connections. 

1093 hosts = await asyncio.shield(host_resolved) 

1094 except asyncio.CancelledError: 

1095 

1096 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None: 

1097 with suppress(Exception, asyncio.CancelledError): 

1098 fut.result() 

1099 

1100 host_resolved.add_done_callback(drop_exception) 

1101 raise 

1102 except OSError as exc: 

1103 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1104 raise 

1105 # in case of proxy it is not ClientProxyConnectionError 

1106 # it is problem of resolving proxy ip itself 

1107 raise ClientConnectorError(req.connection_key, exc) from exc 

1108 

1109 last_exc: Optional[Exception] = None 

1110 

1111 for hinfo in hosts: 

1112 host = hinfo["host"] 

1113 port = hinfo["port"] 

1114 

1115 try: 

1116 transp, proto = await self._wrap_create_connection( 

1117 self._factory, 

1118 host, 

1119 port, 

1120 timeout=timeout, 

1121 ssl=sslcontext, 

1122 family=hinfo["family"], 

1123 proto=hinfo["proto"], 

1124 flags=hinfo["flags"], 

1125 server_hostname=hinfo["hostname"] if sslcontext else None, 

1126 local_addr=self._local_addr, 

1127 req=req, 

1128 client_error=client_error, 

1129 ) 

1130 except ClientConnectorError as exc: 

1131 last_exc = exc 

1132 continue 

1133 

1134 if req.is_ssl() and fingerprint: 

1135 try: 

1136 fingerprint.check(transp) 

1137 except ServerFingerprintMismatch as exc: 

1138 transp.close() 

1139 if not self._cleanup_closed_disabled: 

1140 self._cleanup_closed_transports.append(transp) 

1141 last_exc = exc 

1142 continue 

1143 

1144 return transp, proto 

1145 assert last_exc is not None 

1146 raise last_exc 

1147 

1148 async def _create_proxy_connection( 

1149 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

1150 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1151 headers: Dict[str, str] = {} 

1152 if req.proxy_headers is not None: 

1153 headers = req.proxy_headers # type: ignore[assignment] 

1154 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

1155 

1156 url = req.proxy 

1157 assert url is not None 

1158 proxy_req = ClientRequest( 

1159 hdrs.METH_GET, 

1160 url, 

1161 headers=headers, 

1162 auth=req.proxy_auth, 

1163 loop=self._loop, 

1164 ssl=req.ssl, 

1165 ) 

1166 

1167 # create connection to proxy server 

1168 transport, proto = await self._create_direct_connection( 

1169 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1170 ) 

1171 

1172 # Many HTTP proxies has buggy keepalive support. Let's not 

1173 # reuse connection but close it after processing every 

1174 # response. 

1175 proto.force_close() 

1176 

1177 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

1178 if auth is not None: 

1179 if not req.is_ssl(): 

1180 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1181 else: 

1182 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1183 

1184 if req.is_ssl(): 

1185 self._warn_about_tls_in_tls(transport, req) 

1186 

1187 # For HTTPS requests over HTTP proxy 

1188 # we must notify proxy to tunnel connection 

1189 # so we send CONNECT command: 

1190 # CONNECT www.python.org:443 HTTP/1.1 

1191 # Host: www.python.org 

1192 # 

1193 # next we must do TLS handshake and so on 

1194 # to do this we must wrap raw socket into secure one 

1195 # asyncio handles this perfectly 

1196 proxy_req.method = hdrs.METH_CONNECT 

1197 proxy_req.url = req.url 

1198 key = dataclasses.replace( 

1199 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None 

1200 ) 

1201 conn = Connection(self, key, proto, self._loop) 

1202 proxy_resp = await proxy_req.send(conn) 

1203 try: 

1204 protocol = conn._protocol 

1205 assert protocol is not None 

1206 

1207 # read_until_eof=True will ensure the connection isn't closed 

1208 # once the response is received and processed allowing 

1209 # START_TLS to work on the connection below. 

1210 protocol.set_response_params( 

1211 read_until_eof=True, 

1212 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1213 ) 

1214 resp = await proxy_resp.start(conn) 

1215 except BaseException: 

1216 proxy_resp.close() 

1217 conn.close() 

1218 raise 

1219 else: 

1220 conn._protocol = None 

1221 conn._transport = None 

1222 try: 

1223 if resp.status != 200: 

1224 message = resp.reason 

1225 if message is None: 

1226 message = HTTPStatus(resp.status).phrase 

1227 raise ClientHttpProxyError( 

1228 proxy_resp.request_info, 

1229 resp.history, 

1230 status=resp.status, 

1231 message=message, 

1232 headers=resp.headers, 

1233 ) 

1234 except BaseException: 

1235 # It shouldn't be closed in `finally` because it's fed to 

1236 # `loop.start_tls()` and the docs say not to touch it after 

1237 # passing there. 

1238 transport.close() 

1239 raise 

1240 

1241 return await self._start_tls_connection( 

1242 # Access the old transport for the last time before it's 

1243 # closed and forgotten forever: 

1244 transport, 

1245 req=req, 

1246 timeout=timeout, 

1247 ) 

1248 finally: 

1249 proxy_resp.close() 

1250 

1251 return transport, proto 

1252 

1253 

1254class UnixConnector(BaseConnector): 

1255 """Unix socket connector. 

1256 

1257 path - Unix socket path. 

1258 keepalive_timeout - (optional) Keep-alive timeout. 

1259 force_close - Set to True to force close and do reconnect 

1260 after each request (and between redirects). 

1261 limit - The total number of simultaneous connections. 

1262 limit_per_host - Number of simultaneous connections to one host. 

1263 loop - Optional event loop. 

1264 """ 

1265 

1266 def __init__( 

1267 self, 

1268 path: str, 

1269 force_close: bool = False, 

1270 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel, 

1271 limit: int = 100, 

1272 limit_per_host: int = 0, 

1273 ) -> None: 

1274 super().__init__( 

1275 force_close=force_close, 

1276 keepalive_timeout=keepalive_timeout, 

1277 limit=limit, 

1278 limit_per_host=limit_per_host, 

1279 ) 

1280 self._path = path 

1281 

1282 @property 

1283 def path(self) -> str: 

1284 """Path to unix socket.""" 

1285 return self._path 

1286 

1287 async def _create_connection( 

1288 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

1289 ) -> ResponseHandler: 

1290 try: 

1291 async with ceil_timeout( 

1292 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1293 ): 

1294 _, proto = await self._loop.create_unix_connection( 

1295 self._factory, self._path 

1296 ) 

1297 except OSError as exc: 

1298 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1299 raise 

1300 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1301 

1302 return proto 

1303 

1304 

1305class NamedPipeConnector(BaseConnector): 

1306 """Named pipe connector. 

1307 

1308 Only supported by the proactor event loop. 

1309 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1310 

1311 path - Windows named pipe path. 

1312 keepalive_timeout - (optional) Keep-alive timeout. 

1313 force_close - Set to True to force close and do reconnect 

1314 after each request (and between redirects). 

1315 limit - The total number of simultaneous connections. 

1316 limit_per_host - Number of simultaneous connections to one host. 

1317 loop - Optional event loop. 

1318 """ 

1319 

1320 def __init__( 

1321 self, 

1322 path: str, 

1323 force_close: bool = False, 

1324 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel, 

1325 limit: int = 100, 

1326 limit_per_host: int = 0, 

1327 ) -> None: 

1328 super().__init__( 

1329 force_close=force_close, 

1330 keepalive_timeout=keepalive_timeout, 

1331 limit=limit, 

1332 limit_per_host=limit_per_host, 

1333 ) 

1334 if not isinstance( 

1335 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined] 

1336 ): 

1337 raise RuntimeError( 

1338 "Named Pipes only available in proactor " "loop under windows" 

1339 ) 

1340 self._path = path 

1341 

1342 @property 

1343 def path(self) -> str: 

1344 """Path to the named pipe.""" 

1345 return self._path 

1346 

1347 async def _create_connection( 

1348 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

1349 ) -> ResponseHandler: 

1350 try: 

1351 async with ceil_timeout( 

1352 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1353 ): 

1354 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501 

1355 self._factory, self._path 

1356 ) 

1357 # the drain is required so that the connection_made is called 

1358 # and transport is set otherwise it is not set before the 

1359 # `assert conn.transport is not None` 

1360 # in client.py's _request method 

1361 await asyncio.sleep(0) 

1362 # other option is to manually set transport like 

1363 # `proto.transport = trans` 

1364 except OSError as exc: 

1365 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1366 raise 

1367 raise ClientConnectorError(req.connection_key, exc) from exc 

1368 

1369 return cast(ResponseHandler, proto)