Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

854 statements  

1import asyncio 

2import functools 

3import random 

4import socket 

5import sys 

6import traceback 

7import warnings 

8from collections import OrderedDict, defaultdict, deque 

9from collections.abc import Awaitable, Callable, Iterator, Sequence 

10from contextlib import suppress 

11from http import HTTPStatus 

12from itertools import chain, cycle, islice 

13from time import monotonic 

14from types import TracebackType 

15from typing import TYPE_CHECKING, Any, Literal, Optional, Union, cast 

16 

17import aiohappyeyeballs 

18from aiohappyeyeballs import AddrInfoType, SocketFactoryType 

19 

20from . import hdrs, helpers 

21from .abc import AbstractResolver, ResolveResult 

22from .client_exceptions import ( 

23 ClientConnectionError, 

24 ClientConnectorCertificateError, 

25 ClientConnectorDNSError, 

26 ClientConnectorError, 

27 ClientConnectorSSLError, 

28 ClientHttpProxyError, 

29 ClientProxyConnectionError, 

30 InvalidUrlClientError, 

31 ServerFingerprintMismatch, 

32 UnixClientConnectorError, 

33 cert_errors, 

34 ssl_errors, 

35) 

36from .client_proto import ResponseHandler 

37from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params 

38from .helpers import ( 

39 _SENTINEL, 

40 ceil_timeout, 

41 is_canonical_ipv4_address, 

42 is_ip_address, 

43 noop, 

44 sentinel, 

45 set_exception, 

46 set_result, 

47) 

48from .log import client_logger 

49from .resolver import DefaultResolver 

50 

51if sys.version_info >= (3, 12): 

52 from collections.abc import Buffer 

53else: 

54 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

55 

56if TYPE_CHECKING: 

57 import ssl 

58 

59 SSLContext = ssl.SSLContext 

60else: 

61 try: 

62 import ssl 

63 

64 SSLContext = ssl.SSLContext 

65 except ImportError: # pragma: no cover 

66 ssl = None # type: ignore[assignment] 

67 SSLContext = object # type: ignore[misc,assignment] 

68 

69EMPTY_SCHEMA_SET = frozenset({""}) 

70HTTP_SCHEMA_SET = frozenset({"http", "https"}) 

71WS_SCHEMA_SET = frozenset({"ws", "wss"}) 

72 

73HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET 

74HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET 

75 

76NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < ( 

77 3, 

78 13, 

79 1, 

80) or sys.version_info < (3, 12, 8) 

81# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 

82# which first appeared in Python 3.12.8 and 3.13.1 

83 

84 

85__all__ = ( 

86 "BaseConnector", 

87 "TCPConnector", 

88 "UnixConnector", 

89 "NamedPipeConnector", 

90 "AddrInfoType", 

91 "SocketFactoryType", 

92) 

93 

94 

95if TYPE_CHECKING: 

96 from .client import ClientTimeout 

97 from .client_reqrep import ConnectionKey 

98 from .tracing import Trace 

99 

100 

101class _DeprecationWaiter: 

102 __slots__ = ("_awaitable", "_awaited") 

103 

104 def __init__(self, awaitable: Awaitable[Any]) -> None: 

105 self._awaitable = awaitable 

106 self._awaited = False 

107 

108 def __await__(self) -> Any: 

109 self._awaited = True 

110 return self._awaitable.__await__() 

111 

112 def __del__(self) -> None: 

113 if not self._awaited: 

114 warnings.warn( 

115 "Connector.close() is a coroutine, " 

116 "please use await connector.close()", 

117 DeprecationWarning, 

118 ) 

119 

120 

121async def _wait_for_close(waiters: list[Awaitable[object]]) -> None: 

122 """Wait for all waiters to finish closing.""" 

123 results = await asyncio.gather(*waiters, return_exceptions=True) 

124 for res in results: 

125 if isinstance(res, Exception): 

126 client_logger.debug("Error while closing connector: %r", res) 

127 

128 

129class Connection: 

130 

131 _source_traceback = None 

132 

133 def __init__( 

134 self, 

135 connector: "BaseConnector", 

136 key: "ConnectionKey", 

137 protocol: ResponseHandler, 

138 loop: asyncio.AbstractEventLoop, 

139 ) -> None: 

140 self._key = key 

141 self._connector = connector 

142 self._loop = loop 

143 self._protocol: ResponseHandler | None = protocol 

144 self._callbacks: list[Callable[[], None]] = [] 

145 

146 if loop.get_debug(): 

147 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

148 

149 def __repr__(self) -> str: 

150 return f"Connection<{self._key}>" 

151 

152 def __del__(self, _warnings: Any = warnings) -> None: 

153 if self._protocol is not None: 

154 kwargs = {"source": self} 

155 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs) 

156 if self._loop.is_closed(): 

157 return 

158 

159 self._connector._release(self._key, self._protocol, should_close=True) 

160 

161 context = {"client_connection": self, "message": "Unclosed connection"} 

162 if self._source_traceback is not None: 

163 context["source_traceback"] = self._source_traceback 

164 self._loop.call_exception_handler(context) 

165 

166 def __bool__(self) -> Literal[True]: 

167 """Force subclasses to not be falsy, to make checks simpler.""" 

168 return True 

169 

170 @property 

171 def loop(self) -> asyncio.AbstractEventLoop: 

172 warnings.warn( 

173 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2 

174 ) 

175 return self._loop 

176 

177 @property 

178 def transport(self) -> asyncio.Transport | None: 

179 if self._protocol is None: 

180 return None 

181 return self._protocol.transport 

182 

183 @property 

184 def protocol(self) -> ResponseHandler | None: 

185 return self._protocol 

186 

187 def add_callback(self, callback: Callable[[], None]) -> None: 

188 if callback is not None: 

189 self._callbacks.append(callback) 

190 

191 def _notify_release(self) -> None: 

192 callbacks, self._callbacks = self._callbacks[:], [] 

193 

194 for cb in callbacks: 

195 with suppress(Exception): 

196 cb() 

197 

198 def close(self) -> None: 

199 self._notify_release() 

200 

201 if self._protocol is not None: 

202 self._connector._release(self._key, self._protocol, should_close=True) 

203 self._protocol = None 

204 

205 def release(self) -> None: 

206 self._notify_release() 

207 

208 if self._protocol is not None: 

209 self._connector._release(self._key, self._protocol) 

210 self._protocol = None 

211 

212 @property 

213 def closed(self) -> bool: 

214 return self._protocol is None or not self._protocol.is_connected() 

215 

216 

217class _ConnectTunnelConnection(Connection): 

218 """Special connection wrapper for CONNECT tunnels that must never be pooled. 

219 

220 This connection wraps the proxy connection that will be upgraded with TLS. 

221 It must never be released to the pool because: 

222 1. Its 'closed' future will never complete, causing session.close() to hang 

223 2. It represents an intermediate state, not a reusable connection 

224 3. The real connection (with TLS) will be created separately 

225 """ 

226 

227 def release(self) -> None: 

228 """Do nothing - don't pool or close the connection. 

229 

230 These connections are an intermediate state during the CONNECT tunnel 

231 setup and will be cleaned up naturally after the TLS upgrade. If they 

232 were to be pooled, they would never be properly closed, causing 

233 session.close() to wait forever for their 'closed' future. 

234 """ 

235 

236 

237class _TransportPlaceholder: 

238 """placeholder for BaseConnector.connect function""" 

239 

240 __slots__ = ("closed", "transport") 

241 

242 def __init__(self, closed_future: asyncio.Future[Exception | None]) -> None: 

243 """Initialize a placeholder for a transport.""" 

244 self.closed = closed_future 

245 self.transport = None 

246 

247 def close(self) -> None: 

248 """Close the placeholder.""" 

249 

250 def abort(self) -> None: 

251 """Abort the placeholder (does nothing).""" 

252 

253 

254class BaseConnector: 

255 """Base connector class. 

256 

257 keepalive_timeout - (optional) Keep-alive timeout. 

258 force_close - Set to True to force close and do reconnect 

259 after each request (and between redirects). 

260 limit - The total number of simultaneous connections. 

261 limit_per_host - Number of simultaneous connections to one host. 

262 enable_cleanup_closed - Enables clean-up closed ssl transports. 

263 Disabled by default. 

264 timeout_ceil_threshold - Trigger ceiling of timeout values when 

265 it's above timeout_ceil_threshold. 

266 loop - Optional event loop. 

267 """ 

268 

269 _closed = True # prevent AttributeError in __del__ if ctor was failed 

270 _source_traceback = None 

271 

272 # abort transport after 2 seconds (cleanup broken connections) 

273 _cleanup_closed_period = 2.0 

274 

275 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET 

276 

277 def __init__( 

278 self, 

279 *, 

280 keepalive_timeout: object | None | float = sentinel, 

281 force_close: bool = False, 

282 limit: int = 100, 

283 limit_per_host: int = 0, 

284 enable_cleanup_closed: bool = False, 

285 loop: asyncio.AbstractEventLoop | None = None, 

286 timeout_ceil_threshold: float = 5, 

287 ) -> None: 

288 

289 if force_close: 

290 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

291 raise ValueError( 

292 "keepalive_timeout cannot be set if force_close is True" 

293 ) 

294 else: 

295 if keepalive_timeout is sentinel: 

296 keepalive_timeout = 15.0 

297 

298 loop = loop or asyncio.get_running_loop() 

299 self._timeout_ceil_threshold = timeout_ceil_threshold 

300 

301 self._closed = False 

302 if loop.get_debug(): 

303 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

304 

305 # Connection pool of reusable connections. 

306 # We use a deque to store connections because it has O(1) popleft() 

307 # and O(1) append() operations to implement a FIFO queue. 

308 self._conns: defaultdict[ 

309 ConnectionKey, deque[tuple[ResponseHandler, float]] 

310 ] = defaultdict(deque) 

311 self._limit = limit 

312 self._limit_per_host = limit_per_host 

313 self._acquired: set[ResponseHandler] = set() 

314 self._acquired_per_host: defaultdict[ConnectionKey, set[ResponseHandler]] = ( 

315 defaultdict(set) 

316 ) 

317 self._keepalive_timeout = cast(float, keepalive_timeout) 

318 self._force_close = force_close 

319 

320 # {host_key: FIFO list of waiters} 

321 # The FIFO is implemented with an OrderedDict with None keys because 

322 # python does not have an ordered set. 

323 self._waiters: defaultdict[ 

324 ConnectionKey, OrderedDict[asyncio.Future[None], None] 

325 ] = defaultdict(OrderedDict) 

326 

327 self._loop = loop 

328 self._factory = functools.partial(ResponseHandler, loop=loop) 

329 

330 # start keep-alive connection cleanup task 

331 self._cleanup_handle: asyncio.TimerHandle | None = None 

332 

333 # start cleanup closed transports task 

334 self._cleanup_closed_handle: asyncio.TimerHandle | None = None 

335 

336 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED: 

337 warnings.warn( 

338 "enable_cleanup_closed ignored because " 

339 "https://github.com/python/cpython/pull/118960 is fixed " 

340 f"in Python version {sys.version_info}", 

341 DeprecationWarning, 

342 stacklevel=2, 

343 ) 

344 enable_cleanup_closed = False 

345 

346 self._cleanup_closed_disabled = not enable_cleanup_closed 

347 self._cleanup_closed_transports: list[asyncio.Transport | None] = [] 

348 self._placeholder_future: asyncio.Future[Exception | None] = ( 

349 loop.create_future() 

350 ) 

351 self._placeholder_future.set_result(None) 

352 self._cleanup_closed() 

353 

354 def __del__(self, _warnings: Any = warnings) -> None: 

355 if self._closed: 

356 return 

357 if not self._conns: 

358 return 

359 

360 conns = [repr(c) for c in self._conns.values()] 

361 

362 self._close() 

363 

364 kwargs = {"source": self} 

365 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs) 

366 context = { 

367 "connector": self, 

368 "connections": conns, 

369 "message": "Unclosed connector", 

370 } 

371 if self._source_traceback is not None: 

372 context["source_traceback"] = self._source_traceback 

373 self._loop.call_exception_handler(context) 

374 

375 def __enter__(self) -> "BaseConnector": 

376 warnings.warn( 

377 '"with Connector():" is deprecated, ' 

378 'use "async with Connector():" instead', 

379 DeprecationWarning, 

380 ) 

381 return self 

382 

383 def __exit__(self, *exc: Any) -> None: 

384 self._close() 

385 

386 async def __aenter__(self) -> "BaseConnector": 

387 return self 

388 

389 async def __aexit__( 

390 self, 

391 exc_type: type[BaseException] | None = None, 

392 exc_value: BaseException | None = None, 

393 exc_traceback: TracebackType | None = None, 

394 ) -> None: 

395 await self.close() 

396 

397 @property 

398 def force_close(self) -> bool: 

399 """Ultimately close connection on releasing if True.""" 

400 return self._force_close 

401 

402 @property 

403 def limit(self) -> int: 

404 """The total number for simultaneous connections. 

405 

406 If limit is 0 the connector has no limit. 

407 The default limit size is 100. 

408 """ 

409 return self._limit 

410 

411 @property 

412 def limit_per_host(self) -> int: 

413 """The limit for simultaneous connections to the same endpoint. 

414 

415 Endpoints are the same if they are have equal 

416 (host, port, is_ssl) triple. 

417 """ 

418 return self._limit_per_host 

419 

420 def _cleanup(self) -> None: 

421 """Cleanup unused transports.""" 

422 if self._cleanup_handle: 

423 self._cleanup_handle.cancel() 

424 # _cleanup_handle should be unset, otherwise _release() will not 

425 # recreate it ever! 

426 self._cleanup_handle = None 

427 

428 now = monotonic() 

429 timeout = self._keepalive_timeout 

430 

431 if self._conns: 

432 connections = defaultdict(deque) 

433 deadline = now - timeout 

434 for key, conns in self._conns.items(): 

435 alive: deque[tuple[ResponseHandler, float]] = deque() 

436 for proto, use_time in conns: 

437 if proto.is_connected() and use_time - deadline >= 0: 

438 alive.append((proto, use_time)) 

439 continue 

440 transport = proto.transport 

441 proto.close() 

442 if not self._cleanup_closed_disabled and key.is_ssl: 

443 self._cleanup_closed_transports.append(transport) 

444 

445 if alive: 

446 connections[key] = alive 

447 

448 self._conns = connections 

449 

450 if self._conns: 

451 self._cleanup_handle = helpers.weakref_handle( 

452 self, 

453 "_cleanup", 

454 timeout, 

455 self._loop, 

456 timeout_ceil_threshold=self._timeout_ceil_threshold, 

457 ) 

458 

459 def _cleanup_closed(self) -> None: 

460 """Double confirmation for transport close. 

461 

462 Some broken ssl servers may leave socket open without proper close. 

463 """ 

464 if self._cleanup_closed_handle: 

465 self._cleanup_closed_handle.cancel() 

466 

467 for transport in self._cleanup_closed_transports: 

468 if transport is not None: 

469 transport.abort() 

470 

471 self._cleanup_closed_transports = [] 

472 

473 if not self._cleanup_closed_disabled: 

474 self._cleanup_closed_handle = helpers.weakref_handle( 

475 self, 

476 "_cleanup_closed", 

477 self._cleanup_closed_period, 

478 self._loop, 

479 timeout_ceil_threshold=self._timeout_ceil_threshold, 

480 ) 

481 

482 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]: 

483 """Close all opened transports. 

484 

485 :param abort_ssl: If True, SSL connections will be aborted immediately 

486 without performing the shutdown handshake. This provides 

487 faster cleanup at the cost of less graceful disconnection. 

488 """ 

489 if not (waiters := self._close(abort_ssl=abort_ssl)): 

490 # If there are no connections to close, we can return a noop 

491 # awaitable to avoid scheduling a task on the event loop. 

492 return _DeprecationWaiter(noop()) 

493 coro = _wait_for_close(waiters) 

494 if sys.version_info >= (3, 12): 

495 # Optimization for Python 3.12, try to close connections 

496 # immediately to avoid having to schedule the task on the event loop. 

497 task = asyncio.Task(coro, loop=self._loop, eager_start=True) 

498 else: 

499 task = self._loop.create_task(coro) 

500 return _DeprecationWaiter(task) 

501 

502 def _close(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]: 

503 waiters: list[Awaitable[object]] = [] 

504 

505 if self._closed: 

506 return waiters 

507 

508 self._closed = True 

509 

510 try: 

511 if self._loop.is_closed(): 

512 return waiters 

513 

514 # cancel cleanup task 

515 if self._cleanup_handle: 

516 self._cleanup_handle.cancel() 

517 

518 # cancel cleanup close task 

519 if self._cleanup_closed_handle: 

520 self._cleanup_closed_handle.cancel() 

521 

522 for data in self._conns.values(): 

523 for proto, _ in data: 

524 if ( 

525 abort_ssl 

526 and proto.transport 

527 and proto.transport.get_extra_info("sslcontext") is not None 

528 ): 

529 proto.abort() 

530 else: 

531 proto.close() 

532 if closed := proto.closed: 

533 waiters.append(closed) 

534 

535 for proto in self._acquired: 

536 if ( 

537 abort_ssl 

538 and proto.transport 

539 and proto.transport.get_extra_info("sslcontext") is not None 

540 ): 

541 proto.abort() 

542 else: 

543 proto.close() 

544 if closed := proto.closed: 

545 waiters.append(closed) 

546 

547 for transport in self._cleanup_closed_transports: 

548 if transport is not None: 

549 transport.abort() 

550 

551 return waiters 

552 

553 finally: 

554 self._conns.clear() 

555 self._acquired.clear() 

556 for keyed_waiters in self._waiters.values(): 

557 for keyed_waiter in keyed_waiters: 

558 keyed_waiter.cancel() 

559 self._waiters.clear() 

560 self._cleanup_handle = None 

561 self._cleanup_closed_transports.clear() 

562 self._cleanup_closed_handle = None 

563 

564 @property 

565 def closed(self) -> bool: 

566 """Is connector closed. 

567 

568 A readonly property. 

569 """ 

570 return self._closed 

571 

572 def _available_connections(self, key: "ConnectionKey") -> int: 

573 """ 

574 Return number of available connections. 

575 

576 The limit, limit_per_host and the connection key are taken into account. 

577 

578 If it returns less than 1 means that there are no connections 

579 available. 

580 """ 

581 # check total available connections 

582 # If there are no limits, this will always return 1 

583 total_remain = 1 

584 

585 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: 

586 return total_remain 

587 

588 # check limit per host 

589 if host_remain := self._limit_per_host: 

590 if acquired := self._acquired_per_host.get(key): 

591 host_remain -= len(acquired) 

592 if total_remain > host_remain: 

593 return host_remain 

594 

595 return total_remain 

596 

597 def _update_proxy_auth_header_and_build_proxy_req( 

598 self, req: ClientRequest 

599 ) -> ClientRequest: 

600 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests.""" 

601 url = req.proxy 

602 assert url is not None 

603 headers: dict[str, str] = {} 

604 if req.proxy_headers is not None: 

605 headers = req.proxy_headers # type: ignore[assignment] 

606 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

607 proxy_req = ClientRequest( 

608 hdrs.METH_GET, 

609 url, 

610 headers=headers, 

611 auth=req.proxy_auth, 

612 loop=self._loop, 

613 ssl=req.ssl, 

614 ) 

615 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

616 if auth is not None: 

617 if not req.is_ssl(): 

618 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

619 else: 

620 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

621 return proxy_req 

622 

623 async def connect( 

624 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

625 ) -> Connection: 

626 """Get from pool or create new connection.""" 

627 key = req.connection_key 

628 if (conn := await self._get(key, traces)) is not None: 

629 # If we do not have to wait and we can get a connection from the pool 

630 # we can avoid the timeout ceil logic and directly return the connection 

631 if req.proxy: 

632 self._update_proxy_auth_header_and_build_proxy_req(req) 

633 return conn 

634 

635 async with ceil_timeout(timeout.connect, timeout.ceil_threshold): 

636 if self._available_connections(key) <= 0: 

637 await self._wait_for_available_connection(key, traces) 

638 if (conn := await self._get(key, traces)) is not None: 

639 if req.proxy: 

640 self._update_proxy_auth_header_and_build_proxy_req(req) 

641 return conn 

642 

643 placeholder = cast( 

644 ResponseHandler, _TransportPlaceholder(self._placeholder_future) 

645 ) 

646 self._acquired.add(placeholder) 

647 if self._limit_per_host: 

648 self._acquired_per_host[key].add(placeholder) 

649 

650 try: 

651 # Traces are done inside the try block to ensure that the 

652 # that the placeholder is still cleaned up if an exception 

653 # is raised. 

654 if traces: 

655 for trace in traces: 

656 await trace.send_connection_create_start() 

657 proto = await self._create_connection(req, traces, timeout) 

658 if traces: 

659 for trace in traces: 

660 await trace.send_connection_create_end() 

661 except BaseException: 

662 self._release_acquired(key, placeholder) 

663 raise 

664 else: 

665 if self._closed: 

666 proto.close() 

667 raise ClientConnectionError("Connector is closed.") 

668 

669 # The connection was successfully created, drop the placeholder 

670 # and add the real connection to the acquired set. There should 

671 # be no awaits after the proto is added to the acquired set 

672 # to ensure that the connection is not left in the acquired set 

673 # on cancellation. 

674 self._acquired.remove(placeholder) 

675 self._acquired.add(proto) 

676 if self._limit_per_host: 

677 acquired_per_host = self._acquired_per_host[key] 

678 acquired_per_host.remove(placeholder) 

679 acquired_per_host.add(proto) 

680 return Connection(self, key, proto, self._loop) 

681 

682 async def _wait_for_available_connection( 

683 self, key: "ConnectionKey", traces: list["Trace"] 

684 ) -> None: 

685 """Wait for an available connection slot.""" 

686 # We loop here because there is a race between 

687 # the connection limit check and the connection 

688 # being acquired. If the connection is acquired 

689 # between the check and the await statement, we 

690 # need to loop again to check if the connection 

691 # slot is still available. 

692 attempts = 0 

693 while True: 

694 fut: asyncio.Future[None] = self._loop.create_future() 

695 keyed_waiters = self._waiters[key] 

696 keyed_waiters[fut] = None 

697 if attempts: 

698 # If we have waited before, we need to move the waiter 

699 # to the front of the queue as otherwise we might get 

700 # starved and hit the timeout. 

701 keyed_waiters.move_to_end(fut, last=False) 

702 

703 try: 

704 # Traces happen in the try block to ensure that the 

705 # the waiter is still cleaned up if an exception is raised. 

706 if traces: 

707 for trace in traces: 

708 await trace.send_connection_queued_start() 

709 await fut 

710 if traces: 

711 for trace in traces: 

712 await trace.send_connection_queued_end() 

713 finally: 

714 # pop the waiter from the queue if its still 

715 # there and not already removed by _release_waiter 

716 keyed_waiters.pop(fut, None) 

717 if not self._waiters.get(key, True): 

718 del self._waiters[key] 

719 

720 if self._available_connections(key) > 0: 

721 break 

722 attempts += 1 

723 

724 async def _get( 

725 self, key: "ConnectionKey", traces: list["Trace"] 

726 ) -> Connection | None: 

727 """Get next reusable connection for the key or None. 

728 

729 The connection will be marked as acquired. 

730 """ 

731 if (conns := self._conns.get(key)) is None: 

732 return None 

733 

734 t1 = monotonic() 

735 while conns: 

736 proto, t0 = conns.popleft() 

737 # We will we reuse the connection if its connected and 

738 # the keepalive timeout has not been exceeded 

739 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: 

740 if not conns: 

741 # The very last connection was reclaimed: drop the key 

742 del self._conns[key] 

743 self._acquired.add(proto) 

744 if self._limit_per_host: 

745 self._acquired_per_host[key].add(proto) 

746 if traces: 

747 for trace in traces: 

748 try: 

749 await trace.send_connection_reuseconn() 

750 except BaseException: 

751 self._release_acquired(key, proto) 

752 raise 

753 return Connection(self, key, proto, self._loop) 

754 

755 # Connection cannot be reused, close it 

756 transport = proto.transport 

757 proto.close() 

758 # only for SSL transports 

759 if not self._cleanup_closed_disabled and key.is_ssl: 

760 self._cleanup_closed_transports.append(transport) 

761 

762 # No more connections: drop the key 

763 del self._conns[key] 

764 return None 

765 

766 def _release_waiter(self) -> None: 

767 """ 

768 Iterates over all waiters until one to be released is found. 

769 

770 The one to be released is not finished and 

771 belongs to a host that has available connections. 

772 """ 

773 if not self._waiters: 

774 return 

775 

776 # Having the dict keys ordered this avoids to iterate 

777 # at the same order at each call. 

778 queues = list(self._waiters) 

779 random.shuffle(queues) 

780 

781 for key in queues: 

782 if self._available_connections(key) < 1: 

783 continue 

784 

785 waiters = self._waiters[key] 

786 while waiters: 

787 waiter, _ = waiters.popitem(last=False) 

788 if not waiter.done(): 

789 waiter.set_result(None) 

790 return 

791 

792 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

793 """Release acquired connection.""" 

794 if self._closed: 

795 # acquired connection is already released on connector closing 

796 return 

797 

798 self._acquired.discard(proto) 

799 if self._limit_per_host and (conns := self._acquired_per_host.get(key)): 

800 conns.discard(proto) 

801 if not conns: 

802 del self._acquired_per_host[key] 

803 self._release_waiter() 

804 

805 def _release( 

806 self, 

807 key: "ConnectionKey", 

808 protocol: ResponseHandler, 

809 *, 

810 should_close: bool = False, 

811 ) -> None: 

812 if self._closed: 

813 # acquired connection is already released on connector closing 

814 return 

815 

816 self._release_acquired(key, protocol) 

817 

818 if self._force_close or should_close or protocol.should_close: 

819 transport = protocol.transport 

820 protocol.close() 

821 

822 if key.is_ssl and not self._cleanup_closed_disabled: 

823 self._cleanup_closed_transports.append(transport) 

824 return 

825 

826 self._conns[key].append((protocol, monotonic())) 

827 

828 if self._cleanup_handle is None: 

829 self._cleanup_handle = helpers.weakref_handle( 

830 self, 

831 "_cleanup", 

832 self._keepalive_timeout, 

833 self._loop, 

834 timeout_ceil_threshold=self._timeout_ceil_threshold, 

835 ) 

836 

837 async def _create_connection( 

838 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

839 ) -> ResponseHandler: 

840 raise NotImplementedError() 

841 

842 

843class _DNSCacheTable: 

844 def __init__(self, ttl: float | None = None, max_size: int = 1000) -> None: 

845 self._addrs_rr: OrderedDict[ 

846 tuple[str, int], tuple[Iterator[ResolveResult], int] 

847 ] = OrderedDict() 

848 self._timestamps: dict[tuple[str, int], float] = {} 

849 self._ttl = ttl 

850 self._max_size = max_size 

851 

852 def __contains__(self, host: object) -> bool: 

853 return host in self._addrs_rr 

854 

855 def add(self, key: tuple[str, int], addrs: list[ResolveResult]) -> None: 

856 if key in self._addrs_rr: 

857 self._addrs_rr.move_to_end(key) 

858 

859 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

860 

861 if self._ttl is not None: 

862 self._timestamps[key] = monotonic() 

863 

864 if len(self._addrs_rr) > self._max_size: 

865 oldest_key, _ = self._addrs_rr.popitem(last=False) 

866 self._timestamps.pop(oldest_key, None) 

867 

868 def remove(self, key: tuple[str, int]) -> None: 

869 self._addrs_rr.pop(key, None) 

870 self._timestamps.pop(key, None) 

871 

872 def clear(self) -> None: 

873 self._addrs_rr.clear() 

874 self._timestamps.clear() 

875 

876 def next_addrs(self, key: tuple[str, int]) -> list[ResolveResult]: 

877 loop, length = self._addrs_rr[key] 

878 addrs = list(islice(loop, length)) 

879 # Consume one more element to shift internal state of `cycle` 

880 next(loop) 

881 self._addrs_rr.move_to_end(key) 

882 return addrs 

883 

884 def expired(self, key: tuple[str, int]) -> bool: 

885 if self._ttl is None: 

886 return False 

887 

888 return self._timestamps[key] + self._ttl < monotonic() 

889 

890 

891def _make_ssl_context(verified: bool) -> SSLContext: 

892 """Create SSL context. 

893 

894 This method is not async-friendly and should be called from a thread 

895 because it will load certificates from disk and do other blocking I/O. 

896 """ 

897 if ssl is None: 

898 # No ssl support 

899 return None 

900 if verified: 

901 sslcontext = ssl.create_default_context() 

902 else: 

903 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

904 sslcontext.options |= ssl.OP_NO_SSLv2 

905 sslcontext.options |= ssl.OP_NO_SSLv3 

906 sslcontext.check_hostname = False 

907 sslcontext.verify_mode = ssl.CERT_NONE 

908 sslcontext.options |= ssl.OP_NO_COMPRESSION 

909 sslcontext.set_default_verify_paths() 

910 sslcontext.set_alpn_protocols(("http/1.1",)) 

911 return sslcontext 

912 

913 

914# The default SSLContext objects are created at import time 

915# since they do blocking I/O to load certificates from disk, 

916# and imports should always be done before the event loop starts 

917# or in a thread. 

918_SSL_CONTEXT_VERIFIED = _make_ssl_context(True) 

919_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False) 

920 

921 

922class TCPConnector(BaseConnector): 

923 """TCP connector. 

924 

925 verify_ssl - Set to True to check ssl certifications. 

926 fingerprint - Pass the binary sha256 

927 digest of the expected certificate in DER format to verify 

928 that the certificate the server presents matches. See also 

929 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning 

930 resolver - Enable DNS lookups and use this 

931 resolver 

932 use_dns_cache - Use memory cache for DNS lookups. 

933 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

934 family - socket address family 

935 local_addr - local tuple of (host, port) to bind socket to 

936 

937 keepalive_timeout - (optional) Keep-alive timeout. 

938 force_close - Set to True to force close and do reconnect 

939 after each request (and between redirects). 

940 limit - The total number of simultaneous connections. 

941 limit_per_host - Number of simultaneous connections to one host. 

942 enable_cleanup_closed - Enables clean-up closed ssl transports. 

943 Disabled by default. 

944 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

945 as defined in RFC 8305. To disable 

946 the happy eyeballs algorithm, set to None. 

947 interleave - “First Address Family Count” as defined in RFC 8305 

948 loop - Optional event loop. 

949 socket_factory - A SocketFactoryType function that, if supplied, 

950 will be used to create sockets given an 

951 AddrInfoType. 

952 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0. 

953 Grace period for SSL shutdown handshake on TLS 

954 connections. Default is 0 seconds (immediate abort). 

955 This parameter allowed for a clean SSL shutdown by 

956 notifying the remote peer of connection closure, 

957 while avoiding excessive delays during connector cleanup. 

958 Note: Only takes effect on Python 3.11+. 

959 """ 

960 

961 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"}) 

962 

963 def __init__( 

964 self, 

965 *, 

966 verify_ssl: bool = True, 

967 fingerprint: bytes | None = None, 

968 use_dns_cache: bool = True, 

969 ttl_dns_cache: int | None = 10, 

970 dns_cache_max_size: int = 1000, 

971 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC, 

972 ssl_context: SSLContext | None = None, 

973 ssl: bool | Fingerprint | SSLContext = True, 

974 local_addr: tuple[str, int] | None = None, 

975 resolver: AbstractResolver | None = None, 

976 keepalive_timeout: None | float | object = sentinel, 

977 force_close: bool = False, 

978 limit: int = 100, 

979 limit_per_host: int = 0, 

980 enable_cleanup_closed: bool = False, 

981 loop: asyncio.AbstractEventLoop | None = None, 

982 timeout_ceil_threshold: float = 5, 

983 happy_eyeballs_delay: float | None = 0.25, 

984 interleave: int | None = None, 

985 socket_factory: SocketFactoryType | None = None, 

986 ssl_shutdown_timeout: _SENTINEL | None | float = sentinel, 

987 ): 

988 super().__init__( 

989 keepalive_timeout=keepalive_timeout, 

990 force_close=force_close, 

991 limit=limit, 

992 limit_per_host=limit_per_host, 

993 enable_cleanup_closed=enable_cleanup_closed, 

994 loop=loop, 

995 timeout_ceil_threshold=timeout_ceil_threshold, 

996 ) 

997 

998 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

999 

1000 self._resolver: AbstractResolver 

1001 if resolver is None: 

1002 self._resolver = DefaultResolver(loop=self._loop) 

1003 self._resolver_owner = True 

1004 else: 

1005 self._resolver = resolver 

1006 self._resolver_owner = False 

1007 

1008 self._use_dns_cache = use_dns_cache 

1009 self._cached_hosts = _DNSCacheTable( 

1010 ttl=ttl_dns_cache, max_size=dns_cache_max_size 

1011 ) 

1012 self._throttle_dns_futures: dict[tuple[str, int], set[asyncio.Future[None]]] = ( 

1013 {} 

1014 ) 

1015 self._family = family 

1016 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

1017 self._happy_eyeballs_delay = happy_eyeballs_delay 

1018 self._interleave = interleave 

1019 self._resolve_host_tasks: set[asyncio.Task[list[ResolveResult]]] = set() 

1020 self._socket_factory = socket_factory 

1021 self._ssl_shutdown_timeout: float | None 

1022 # Handle ssl_shutdown_timeout with warning for Python < 3.11 

1023 if ssl_shutdown_timeout is sentinel: 

1024 self._ssl_shutdown_timeout = 0 

1025 else: 

1026 # Deprecation warning for ssl_shutdown_timeout parameter 

1027 warnings.warn( 

1028 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0", 

1029 DeprecationWarning, 

1030 stacklevel=2, 

1031 ) 

1032 if ( 

1033 sys.version_info < (3, 11) 

1034 and ssl_shutdown_timeout is not None 

1035 and ssl_shutdown_timeout != 0 

1036 ): 

1037 warnings.warn( 

1038 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; " 

1039 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.", 

1040 RuntimeWarning, 

1041 stacklevel=2, 

1042 ) 

1043 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

1044 

1045 def _close(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]: 

1046 """Close all ongoing DNS calls.""" 

1047 for fut in chain.from_iterable(self._throttle_dns_futures.values()): 

1048 fut.cancel() 

1049 

1050 waiters = super()._close(abort_ssl=abort_ssl) 

1051 

1052 for t in self._resolve_host_tasks: 

1053 t.cancel() 

1054 waiters.append(t) 

1055 

1056 return waiters 

1057 

1058 async def close(self, *, abort_ssl: bool = False) -> None: 

1059 """ 

1060 Close all opened transports. 

1061 

1062 :param abort_ssl: If True, SSL connections will be aborted immediately 

1063 without performing the shutdown handshake. If False (default), 

1064 the behavior is determined by ssl_shutdown_timeout: 

1065 - If ssl_shutdown_timeout=0: connections are aborted 

1066 - If ssl_shutdown_timeout>0: graceful shutdown is performed 

1067 """ 

1068 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default 

1069 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0) 

1070 if self._resolver_owner: 

1071 await self._resolver.close() 

1072 

1073 @property 

1074 def family(self) -> int: 

1075 """Socket family like AF_INET.""" 

1076 return self._family 

1077 

1078 @property 

1079 def use_dns_cache(self) -> bool: 

1080 """True if local DNS caching is enabled.""" 

1081 return self._use_dns_cache 

1082 

1083 def clear_dns_cache(self, host: str | None = None, port: int | None = None) -> None: 

1084 """Remove specified host/port or clear all dns local cache.""" 

1085 if host is not None and port is not None: 

1086 self._cached_hosts.remove((host, port)) 

1087 elif host is not None or port is not None: 

1088 raise ValueError("either both host and port or none of them are allowed") 

1089 else: 

1090 self._cached_hosts.clear() 

1091 

1092 async def _resolve_host( 

1093 self, host: str, port: int, traces: Sequence["Trace"] | None = None 

1094 ) -> list[ResolveResult]: 

1095 """Resolve host and return list of addresses.""" 

1096 if is_ip_address(host): 

1097 # Reject legacy numeric IPv4 forms (e.g. 2130706433, 127.1) that 

1098 # socket would map onto an address, slipping past a connector-level 

1099 # policy that only sees the raw host. 

1100 if ":" not in host and not is_canonical_ipv4_address(host): 

1101 raise InvalidUrlClientError(host, "is not a canonical IPv4 address") 

1102 return [ 

1103 { 

1104 "hostname": host, 

1105 "host": host, 

1106 "port": port, 

1107 "family": self._family, 

1108 "proto": 0, 

1109 "flags": 0, 

1110 } 

1111 ] 

1112 

1113 if not self._use_dns_cache: 

1114 

1115 if traces: 

1116 for trace in traces: 

1117 await trace.send_dns_resolvehost_start(host) 

1118 

1119 if self._closed: 

1120 raise ClientConnectionError("Connector is closed") 

1121 

1122 res = await self._resolver.resolve(host, port, family=self._family) 

1123 

1124 if traces: 

1125 for trace in traces: 

1126 await trace.send_dns_resolvehost_end(host) 

1127 

1128 return res 

1129 

1130 key = (host, port) 

1131 if key in self._cached_hosts and not self._cached_hosts.expired(key): 

1132 # get result early, before any await (#4014) 

1133 result = self._cached_hosts.next_addrs(key) 

1134 

1135 if traces: 

1136 for trace in traces: 

1137 await trace.send_dns_cache_hit(host) 

1138 return result 

1139 

1140 futures: set[asyncio.Future[None]] 

1141 # 

1142 # If multiple connectors are resolving the same host, we wait 

1143 # for the first one to resolve and then use the result for all of them. 

1144 # We use a throttle to ensure that we only resolve the host once 

1145 # and then use the result for all the waiters. 

1146 # 

1147 if key in self._throttle_dns_futures: 

1148 # get futures early, before any await (#4014) 

1149 futures = self._throttle_dns_futures[key] 

1150 future: asyncio.Future[None] = self._loop.create_future() 

1151 futures.add(future) 

1152 if traces: 

1153 for trace in traces: 

1154 await trace.send_dns_cache_hit(host) 

1155 try: 

1156 await future 

1157 finally: 

1158 futures.discard(future) 

1159 return self._cached_hosts.next_addrs(key) 

1160 

1161 # update dict early, before any await (#4014) 

1162 self._throttle_dns_futures[key] = futures = set() 

1163 # In this case we need to create a task to ensure that we can shield 

1164 # the task from cancellation as cancelling this lookup should not cancel 

1165 # the underlying lookup or else the cancel event will get broadcast to 

1166 # all the waiters across all connections. 

1167 # 

1168 coro = self._resolve_host_with_throttle(key, host, port, futures, traces) 

1169 loop = asyncio.get_running_loop() 

1170 if sys.version_info >= (3, 12): 

1171 # Optimization for Python 3.12, try to send immediately 

1172 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) 

1173 else: 

1174 resolved_host_task = loop.create_task(coro) 

1175 

1176 if not resolved_host_task.done(): 

1177 self._resolve_host_tasks.add(resolved_host_task) 

1178 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard) 

1179 

1180 try: 

1181 return await asyncio.shield(resolved_host_task) 

1182 except asyncio.CancelledError: 

1183 

1184 def drop_exception(fut: "asyncio.Future[list[ResolveResult]]") -> None: 

1185 with suppress(Exception, asyncio.CancelledError): 

1186 fut.result() 

1187 

1188 resolved_host_task.add_done_callback(drop_exception) 

1189 raise 

1190 

1191 async def _resolve_host_with_throttle( 

1192 self, 

1193 key: tuple[str, int], 

1194 host: str, 

1195 port: int, 

1196 futures: set["asyncio.Future[None]"], 

1197 traces: Sequence["Trace"] | None, 

1198 ) -> list[ResolveResult]: 

1199 """Resolve host and set result for all waiters. 

1200 

1201 This method must be run in a task and shielded from cancellation 

1202 to avoid cancelling the underlying lookup. 

1203 """ 

1204 try: 

1205 if traces: 

1206 for trace in traces: 

1207 await trace.send_dns_cache_miss(host) 

1208 

1209 for trace in traces: 

1210 await trace.send_dns_resolvehost_start(host) 

1211 

1212 addrs = await self._resolver.resolve(host, port, family=self._family) 

1213 if traces: 

1214 for trace in traces: 

1215 await trace.send_dns_resolvehost_end(host) 

1216 

1217 self._cached_hosts.add(key, addrs) 

1218 for fut in futures: 

1219 set_result(fut, None) 

1220 except BaseException as e: 

1221 # any DNS exception is set for the waiters to raise the same exception. 

1222 # This coro is always run in task that is shielded from cancellation so 

1223 # we should never be propagating cancellation here. 

1224 for fut in futures: 

1225 set_exception(fut, e) 

1226 raise 

1227 finally: 

1228 self._throttle_dns_futures.pop(key) 

1229 

1230 return self._cached_hosts.next_addrs(key) 

1231 

1232 async def _create_connection( 

1233 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1234 ) -> ResponseHandler: 

1235 """Create connection. 

1236 

1237 Has same keyword arguments as BaseEventLoop.create_connection. 

1238 """ 

1239 if req.proxy: 

1240 _, proto = await self._create_proxy_connection(req, traces, timeout) 

1241 else: 

1242 _, proto = await self._create_direct_connection(req, traces, timeout) 

1243 

1244 return proto 

1245 

1246 def _get_ssl_context(self, req: ClientRequest) -> SSLContext | None: 

1247 """Logic to get the correct SSL context 

1248 

1249 0. if req.ssl is false, return None 

1250 

1251 1. if ssl_context is specified in req, use it 

1252 2. if _ssl_context is specified in self, use it 

1253 3. otherwise: 

1254 1. if verify_ssl is not specified in req, use self.ssl_context 

1255 (will generate a default context according to self.verify_ssl) 

1256 2. if verify_ssl is True in req, generate a default SSL context 

1257 3. if verify_ssl is False in req, generate a SSL context that 

1258 won't verify 

1259 """ 

1260 if not req.is_ssl(): 

1261 return None 

1262 

1263 if ssl is None: # pragma: no cover 

1264 raise RuntimeError("SSL is not supported.") 

1265 sslcontext = req.ssl 

1266 if isinstance(sslcontext, ssl.SSLContext): 

1267 return sslcontext 

1268 if sslcontext is not True: 

1269 # not verified or fingerprinted 

1270 return _SSL_CONTEXT_UNVERIFIED 

1271 sslcontext = self._ssl 

1272 if isinstance(sslcontext, ssl.SSLContext): 

1273 return sslcontext 

1274 if sslcontext is not True: 

1275 # not verified or fingerprinted 

1276 return _SSL_CONTEXT_UNVERIFIED 

1277 return _SSL_CONTEXT_VERIFIED 

1278 

1279 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]: 

1280 ret = req.ssl 

1281 if isinstance(ret, Fingerprint): 

1282 return ret 

1283 ret = self._ssl 

1284 if isinstance(ret, Fingerprint): 

1285 return ret 

1286 return None 

1287 

1288 async def _wrap_create_connection( 

1289 self, 

1290 *args: Any, 

1291 addr_infos: list[AddrInfoType], 

1292 req: ClientRequest, 

1293 timeout: "ClientTimeout", 

1294 client_error: type[Exception] = ClientConnectorError, 

1295 **kwargs: Any, 

1296 ) -> tuple[asyncio.Transport, ResponseHandler]: 

1297 try: 

1298 async with ceil_timeout( 

1299 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1300 ): 

1301 sock = await aiohappyeyeballs.start_connection( 

1302 addr_infos=addr_infos, 

1303 local_addr_infos=self._local_addr_infos, 

1304 happy_eyeballs_delay=self._happy_eyeballs_delay, 

1305 interleave=self._interleave, 

1306 loop=self._loop, 

1307 socket_factory=self._socket_factory, 

1308 ) 

1309 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used 

1310 if ( 

1311 kwargs.get("ssl") 

1312 and self._ssl_shutdown_timeout 

1313 and sys.version_info >= (3, 11) 

1314 ): 

1315 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout 

1316 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

1317 except cert_errors as exc: 

1318 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1319 except ssl_errors as exc: 

1320 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1321 except OSError as exc: 

1322 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1323 raise 

1324 raise client_error(req.connection_key, exc) from exc 

1325 

1326 async def _wrap_existing_connection( 

1327 self, 

1328 *args: Any, 

1329 req: ClientRequest, 

1330 timeout: "ClientTimeout", 

1331 client_error: type[Exception] = ClientConnectorError, 

1332 **kwargs: Any, 

1333 ) -> tuple[asyncio.Transport, ResponseHandler]: 

1334 try: 

1335 async with ceil_timeout( 

1336 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1337 ): 

1338 return await self._loop.create_connection(*args, **kwargs) 

1339 except cert_errors as exc: 

1340 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1341 except ssl_errors as exc: 

1342 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1343 except OSError as exc: 

1344 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1345 raise 

1346 raise client_error(req.connection_key, exc) from exc 

1347 

1348 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None: 

1349 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``. 

1350 

1351 It is necessary for TLS-in-TLS so that it is possible to 

1352 send HTTPS queries through HTTPS proxies. 

1353 

1354 This doesn't affect regular HTTP requests, though. 

1355 """ 

1356 if not req.is_ssl(): 

1357 return 

1358 

1359 proxy_url = req.proxy 

1360 assert proxy_url is not None 

1361 if proxy_url.scheme != "https": 

1362 return 

1363 

1364 self._check_loop_for_start_tls() 

1365 

1366 def _check_loop_for_start_tls(self) -> None: 

1367 try: 

1368 self._loop.start_tls 

1369 except AttributeError as attr_exc: 

1370 raise RuntimeError( 

1371 "An HTTPS request is being sent through an HTTPS proxy. " 

1372 "This needs support for TLS in TLS but it is not implemented " 

1373 "in your runtime for the stdlib asyncio.\n\n" 

1374 "Please upgrade to Python 3.11 or higher. For more details, " 

1375 "please see:\n" 

1376 "* https://bugs.python.org/issue37179\n" 

1377 "* https://github.com/python/cpython/pull/28073\n" 

1378 "* https://docs.aiohttp.org/en/stable/" 

1379 "client_advanced.html#proxy-support\n" 

1380 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1381 ) from attr_exc 

1382 

1383 def _loop_supports_start_tls(self) -> bool: 

1384 try: 

1385 self._check_loop_for_start_tls() 

1386 except RuntimeError: 

1387 return False 

1388 else: 

1389 return True 

1390 

1391 def _warn_about_tls_in_tls( 

1392 self, 

1393 underlying_transport: asyncio.Transport, 

1394 req: ClientRequest, 

1395 ) -> None: 

1396 """Issue a warning if the requested URL has HTTPS scheme.""" 

1397 if req.request_info.url.scheme != "https": 

1398 return 

1399 

1400 # TLS-in-TLS only applies when the proxy itself is HTTPS. 

1401 # When the proxy is HTTP, start_tls upgrades a plain TCP connection, 

1402 # which is standard TLS and works on all event loops and Python versions. 

1403 if req.proxy is None or req.proxy.scheme != "https": 

1404 return 

1405 

1406 # Check if uvloop is being used, which supports TLS in TLS, 

1407 # otherwise assume that asyncio's native transport is being used. 

1408 if type(underlying_transport).__module__.startswith("uvloop"): 

1409 return 

1410 

1411 # Support in asyncio was added in Python 3.11 (bpo-44011) 

1412 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr( 

1413 underlying_transport, 

1414 "_start_tls_compatible", 

1415 False, 

1416 ) 

1417 

1418 if asyncio_supports_tls_in_tls: 

1419 return 

1420 

1421 warnings.warn( 

1422 "An HTTPS request is being sent through an HTTPS proxy. " 

1423 "This support for TLS in TLS is known to be disabled " 

1424 "in the stdlib asyncio (Python <3.11). This is why you'll probably see " 

1425 "an error in the log below.\n\n" 

1426 "It is possible to enable it via monkeypatching. " 

1427 "For more details, see:\n" 

1428 "* https://bugs.python.org/issue37179\n" 

1429 "* https://github.com/python/cpython/pull/28073\n\n" 

1430 "You can temporarily patch this as follows:\n" 

1431 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1432 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1433 RuntimeWarning, 

1434 source=self, 

1435 # Why `4`? At least 3 of the calls in the stack originate 

1436 # from the methods in this class. 

1437 stacklevel=3, 

1438 ) 

1439 

1440 async def _start_tls_connection( 

1441 self, 

1442 underlying_transport: asyncio.Transport, 

1443 req: ClientRequest, 

1444 timeout: "ClientTimeout", 

1445 client_error: type[Exception] = ClientConnectorError, 

1446 ) -> tuple[asyncio.BaseTransport, ResponseHandler]: 

1447 """Wrap the raw TCP transport with TLS.""" 

1448 tls_proto = self._factory() # Create a brand new proto for TLS 

1449 sslcontext = self._get_ssl_context(req) 

1450 if TYPE_CHECKING: 

1451 # _start_tls_connection is unreachable in the current code path 

1452 # if sslcontext is None. 

1453 assert sslcontext is not None 

1454 

1455 try: 

1456 async with ceil_timeout( 

1457 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1458 ): 

1459 try: 

1460 # ssl_shutdown_timeout is only available in Python 3.11+ 

1461 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout: 

1462 tls_transport = await self._loop.start_tls( 

1463 underlying_transport, 

1464 tls_proto, 

1465 sslcontext, 

1466 server_hostname=req.server_hostname or req.host, 

1467 ssl_handshake_timeout=timeout.total or None, 

1468 ssl_shutdown_timeout=self._ssl_shutdown_timeout, 

1469 ) 

1470 else: 

1471 tls_transport = await self._loop.start_tls( 

1472 underlying_transport, 

1473 tls_proto, 

1474 sslcontext, 

1475 server_hostname=req.server_hostname or req.host, 

1476 ssl_handshake_timeout=timeout.total or None, 

1477 ) 

1478 except BaseException: 

1479 # We need to close the underlying transport since 

1480 # `start_tls()` probably failed before it had a 

1481 # chance to do this: 

1482 if self._ssl_shutdown_timeout == 0: 

1483 underlying_transport.abort() 

1484 else: 

1485 underlying_transport.close() 

1486 raise 

1487 if isinstance(tls_transport, asyncio.Transport): 

1488 fingerprint = self._get_fingerprint(req) 

1489 if fingerprint: 

1490 try: 

1491 fingerprint.check(tls_transport) 

1492 except ServerFingerprintMismatch: 

1493 tls_transport.close() 

1494 if not self._cleanup_closed_disabled: 

1495 self._cleanup_closed_transports.append(tls_transport) 

1496 raise 

1497 except cert_errors as exc: 

1498 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1499 except ssl_errors as exc: 

1500 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1501 except OSError as exc: 

1502 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1503 raise 

1504 raise client_error(req.connection_key, exc) from exc 

1505 except TypeError as type_err: 

1506 # Example cause looks like this: 

1507 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1508 # object at 0x7f760615e460> is not supported by start_tls() 

1509 

1510 raise ClientConnectionError( 

1511 "Cannot initialize a TLS-in-TLS connection to host " 

1512 f"{req.host!s}:{req.port:d} through an underlying connection " 

1513 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1514 f"[{type_err!s}]" 

1515 ) from type_err 

1516 else: 

1517 if tls_transport is None: 

1518 msg = "Failed to start TLS (possibly caused by closing transport)" 

1519 raise client_error(req.connection_key, OSError(msg)) 

1520 tls_proto.connection_made( 

1521 tls_transport 

1522 ) # Kick the state machine of the new TLS protocol 

1523 

1524 return tls_transport, tls_proto 

1525 

1526 def _convert_hosts_to_addr_infos( 

1527 self, hosts: list[ResolveResult] 

1528 ) -> list[AddrInfoType]: 

1529 """Converts the list of hosts to a list of addr_infos. 

1530 

1531 The list of hosts is the result of a DNS lookup. The list of 

1532 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1533 """ 

1534 addr_infos: list[AddrInfoType] = [] 

1535 for hinfo in hosts: 

1536 host = hinfo["host"] 

1537 is_ipv6 = ":" in host 

1538 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1539 if self._family and self._family != family: 

1540 continue 

1541 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1542 addr_infos.append( 

1543 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1544 ) 

1545 return addr_infos 

1546 

1547 async def _create_direct_connection( 

1548 self, 

1549 req: ClientRequest, 

1550 traces: list["Trace"], 

1551 timeout: "ClientTimeout", 

1552 *, 

1553 client_error: type[Exception] = ClientConnectorError, 

1554 ) -> tuple[asyncio.Transport, ResponseHandler]: 

1555 sslcontext = self._get_ssl_context(req) 

1556 fingerprint = self._get_fingerprint(req) 

1557 

1558 host = req.url.raw_host 

1559 assert host is not None 

1560 # Replace multiple trailing dots with a single one. 

1561 # A trailing dot is only present for fully-qualified domain names. 

1562 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1563 if host.endswith(".."): 

1564 host = host.rstrip(".") + "." 

1565 port = req.port 

1566 assert port is not None 

1567 try: 

1568 # Cancelling this lookup should not cancel the underlying lookup 

1569 # or else the cancel event will get broadcast to all the waiters 

1570 # across all connections. 

1571 hosts = await self._resolve_host(host, port, traces=traces) 

1572 except OSError as exc: 

1573 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1574 raise 

1575 # in case of proxy it is not ClientProxyConnectionError 

1576 # it is problem of resolving proxy ip itself 

1577 raise ClientConnectorDNSError(req.connection_key, exc) from exc 

1578 

1579 last_exc: Exception | None = None 

1580 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1581 while addr_infos: 

1582 # Strip trailing dots, certificates contain FQDN without dots. 

1583 # See https://github.com/aio-libs/aiohttp/issues/3636 

1584 server_hostname = ( 

1585 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1586 ) 

1587 

1588 try: 

1589 transp, proto = await self._wrap_create_connection( 

1590 self._factory, 

1591 timeout=timeout, 

1592 ssl=sslcontext, 

1593 addr_infos=addr_infos, 

1594 server_hostname=server_hostname, 

1595 req=req, 

1596 client_error=client_error, 

1597 ) 

1598 except (ClientConnectorError, asyncio.TimeoutError) as exc: 

1599 last_exc = exc 

1600 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1601 continue 

1602 

1603 if req.is_ssl() and fingerprint: 

1604 try: 

1605 fingerprint.check(transp) 

1606 except ServerFingerprintMismatch as exc: 

1607 transp.close() 

1608 if not self._cleanup_closed_disabled: 

1609 self._cleanup_closed_transports.append(transp) 

1610 last_exc = exc 

1611 # Remove the bad peer from the list of addr_infos 

1612 sock: socket.socket = transp.get_extra_info("socket") 

1613 bad_peer = sock.getpeername() 

1614 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1615 continue 

1616 

1617 return transp, proto 

1618 else: 

1619 assert last_exc is not None 

1620 raise last_exc 

1621 

1622 async def _create_proxy_connection( 

1623 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1624 ) -> tuple[asyncio.BaseTransport, ResponseHandler]: 

1625 self._fail_on_no_start_tls(req) 

1626 runtime_has_start_tls = self._loop_supports_start_tls() 

1627 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req) 

1628 

1629 # create connection to proxy server 

1630 transport, proto = await self._create_direct_connection( 

1631 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1632 ) 

1633 

1634 if req.is_ssl(): 

1635 if runtime_has_start_tls: 

1636 self._warn_about_tls_in_tls(transport, req) 

1637 

1638 # For HTTPS requests over HTTP proxy 

1639 # we must notify proxy to tunnel connection 

1640 # so we send CONNECT command: 

1641 # CONNECT www.python.org:443 HTTP/1.1 

1642 # Host: www.python.org 

1643 # 

1644 # next we must do TLS handshake and so on 

1645 # to do this we must wrap raw socket into secure one 

1646 # asyncio handles this perfectly 

1647 proxy_req.method = hdrs.METH_CONNECT 

1648 proxy_req.url = req.url 

1649 key = req.connection_key._replace( 

1650 proxy=None, proxy_auth=None, proxy_headers_hash=None 

1651 ) 

1652 conn = _ConnectTunnelConnection(self, key, proto, self._loop) 

1653 proxy_resp = await proxy_req.send(conn) 

1654 try: 

1655 protocol = conn._protocol 

1656 assert protocol is not None 

1657 

1658 # read_until_eof=True will ensure the connection isn't closed 

1659 # once the response is received and processed allowing 

1660 # START_TLS to work on the connection below. 

1661 protocol.set_response_params( 

1662 read_until_eof=runtime_has_start_tls, 

1663 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1664 ) 

1665 resp = await proxy_resp.start(conn) 

1666 except BaseException: 

1667 proxy_resp.close() 

1668 conn.close() 

1669 raise 

1670 else: 

1671 conn._protocol = None 

1672 try: 

1673 if resp.status != 200: 

1674 message = resp.reason 

1675 if message is None: 

1676 message = HTTPStatus(resp.status).phrase 

1677 raise ClientHttpProxyError( 

1678 proxy_resp.request_info, 

1679 resp.history, 

1680 status=resp.status, 

1681 message=message, 

1682 headers=resp.headers, 

1683 ) 

1684 if not runtime_has_start_tls: 

1685 rawsock = transport.get_extra_info("socket", default=None) 

1686 if rawsock is None: 

1687 raise RuntimeError( 

1688 "Transport does not expose socket instance" 

1689 ) 

1690 # Duplicate the socket, so now we can close proxy transport 

1691 rawsock = rawsock.dup() 

1692 except BaseException: 

1693 # It shouldn't be closed in `finally` because it's fed to 

1694 # `loop.start_tls()` and the docs say not to touch it after 

1695 # passing there. 

1696 transport.close() 

1697 raise 

1698 finally: 

1699 if not runtime_has_start_tls: 

1700 transport.close() 

1701 

1702 if not runtime_has_start_tls: 

1703 # HTTP proxy with support for upgrade to HTTPS 

1704 sslcontext = self._get_ssl_context(req) 

1705 return await self._wrap_existing_connection( 

1706 self._factory, 

1707 timeout=timeout, 

1708 ssl=sslcontext, 

1709 sock=rawsock, 

1710 server_hostname=req.host, 

1711 req=req, 

1712 ) 

1713 

1714 return await self._start_tls_connection( 

1715 # Access the old transport for the last time before it's 

1716 # closed and forgotten forever: 

1717 transport, 

1718 req=req, 

1719 timeout=timeout, 

1720 ) 

1721 finally: 

1722 proxy_resp.close() 

1723 

1724 return transport, proto 

1725 

1726 

1727class UnixConnector(BaseConnector): 

1728 """Unix socket connector. 

1729 

1730 path - Unix socket path. 

1731 keepalive_timeout - (optional) Keep-alive timeout. 

1732 force_close - Set to True to force close and do reconnect 

1733 after each request (and between redirects). 

1734 limit - The total number of simultaneous connections. 

1735 limit_per_host - Number of simultaneous connections to one host. 

1736 loop - Optional event loop. 

1737 """ 

1738 

1739 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"}) 

1740 

1741 def __init__( 

1742 self, 

1743 path: str, 

1744 force_close: bool = False, 

1745 keepalive_timeout: object | float | None = sentinel, 

1746 limit: int = 100, 

1747 limit_per_host: int = 0, 

1748 loop: asyncio.AbstractEventLoop | None = None, 

1749 ) -> None: 

1750 super().__init__( 

1751 force_close=force_close, 

1752 keepalive_timeout=keepalive_timeout, 

1753 limit=limit, 

1754 limit_per_host=limit_per_host, 

1755 loop=loop, 

1756 ) 

1757 self._path = path 

1758 

1759 @property 

1760 def path(self) -> str: 

1761 """Path to unix socket.""" 

1762 return self._path 

1763 

1764 async def _create_connection( 

1765 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1766 ) -> ResponseHandler: 

1767 try: 

1768 async with ceil_timeout( 

1769 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1770 ): 

1771 _, proto = await self._loop.create_unix_connection( 

1772 self._factory, self._path 

1773 ) 

1774 except OSError as exc: 

1775 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1776 raise 

1777 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1778 

1779 return proto 

1780 

1781 

1782class NamedPipeConnector(BaseConnector): 

1783 """Named pipe connector. 

1784 

1785 Only supported by the proactor event loop. 

1786 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1787 

1788 path - Windows named pipe path. 

1789 keepalive_timeout - (optional) Keep-alive timeout. 

1790 force_close - Set to True to force close and do reconnect 

1791 after each request (and between redirects). 

1792 limit - The total number of simultaneous connections. 

1793 limit_per_host - Number of simultaneous connections to one host. 

1794 loop - Optional event loop. 

1795 """ 

1796 

1797 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"}) 

1798 

1799 def __init__( 

1800 self, 

1801 path: str, 

1802 force_close: bool = False, 

1803 keepalive_timeout: object | float | None = sentinel, 

1804 limit: int = 100, 

1805 limit_per_host: int = 0, 

1806 loop: asyncio.AbstractEventLoop | None = None, 

1807 ) -> None: 

1808 super().__init__( 

1809 force_close=force_close, 

1810 keepalive_timeout=keepalive_timeout, 

1811 limit=limit, 

1812 limit_per_host=limit_per_host, 

1813 loop=loop, 

1814 ) 

1815 if not isinstance( 

1816 self._loop, 

1817 asyncio.ProactorEventLoop, # type: ignore[attr-defined] 

1818 ): 

1819 raise RuntimeError( 

1820 "Named Pipes only available in proactor loop under windows" 

1821 ) 

1822 self._path = path 

1823 

1824 @property 

1825 def path(self) -> str: 

1826 """Path to the named pipe.""" 

1827 return self._path 

1828 

1829 async def _create_connection( 

1830 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1831 ) -> ResponseHandler: 

1832 try: 

1833 async with ceil_timeout( 

1834 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1835 ): 

1836 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1837 self._factory, self._path 

1838 ) 

1839 # the drain is required so that the connection_made is called 

1840 # and transport is set otherwise it is not set before the 

1841 # `assert conn.transport is not None` 

1842 # in client.py's _request method 

1843 await asyncio.sleep(0) 

1844 # other option is to manually set transport like 

1845 # `proto.transport = trans` 

1846 except OSError as exc: 

1847 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1848 raise 

1849 raise ClientConnectorError(req.connection_key, exc) from exc 

1850 

1851 return cast(ResponseHandler, proto)