Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

776 statements  

1import asyncio 

2import functools 

3import random 

4import socket 

5import sys 

6import traceback 

7import warnings 

8from collections import OrderedDict, defaultdict, deque 

9from collections.abc import Awaitable, Callable, Iterator, Sequence 

10from contextlib import suppress 

11from http import HTTPStatus 

12from itertools import chain, cycle, islice 

13from time import monotonic 

14from types import TracebackType 

15from typing import TYPE_CHECKING, Any, Literal, cast 

16 

17import aiohappyeyeballs 

18from aiohappyeyeballs import AddrInfoType, SocketFactoryType 

19from multidict import CIMultiDict 

20 

21from . import hdrs, helpers 

22from .abc import AbstractResolver, ResolveResult 

23from .client_exceptions import ( 

24 ClientConnectionError, 

25 ClientConnectorCertificateError, 

26 ClientConnectorDNSError, 

27 ClientConnectorError, 

28 ClientConnectorSSLError, 

29 ClientHttpProxyError, 

30 ClientProxyConnectionError, 

31 ServerFingerprintMismatch, 

32 UnixClientConnectorError, 

33 cert_errors, 

34 ssl_errors, 

35) 

36from .client_proto import ResponseHandler 

37from .client_reqrep import ( 

38 SSL_ALLOWED_TYPES, 

39 ClientRequest, 

40 ClientRequestBase, 

41 Fingerprint, 

42) 

43from .helpers import ( 

44 _SENTINEL, 

45 ceil_timeout, 

46 is_ip_address, 

47 sentinel, 

48 set_exception, 

49 set_result, 

50) 

51from .log import client_logger 

52from .resolver import DefaultResolver 

53 

54if sys.version_info >= (3, 12): 

55 from collections.abc import Buffer 

56else: 

57 Buffer = "bytes | bytearray | memoryview[int] | memoryview[bytes]" 

58 

59try: 

60 import ssl 

61 

62 SSLContext = ssl.SSLContext 

63except ImportError: # pragma: no cover 

64 ssl = None # type: ignore[assignment] 

65 SSLContext = object # type: ignore[misc,assignment] 

66 

67EMPTY_SCHEMA_SET = frozenset({""}) 

68HTTP_SCHEMA_SET = frozenset({"http", "https"}) 

69WS_SCHEMA_SET = frozenset({"ws", "wss"}) 

70 

71HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET 

72HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET 

73 

74NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < ( 

75 3, 

76 13, 

77 1, 

78) or sys.version_info < (3, 12, 8) 

79# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 

80# which first appeared in Python 3.12.8 and 3.13.1 

81 

82 

83__all__ = ( 

84 "BaseConnector", 

85 "TCPConnector", 

86 "UnixConnector", 

87 "NamedPipeConnector", 

88 "AddrInfoType", 

89 "SocketFactoryType", 

90) 

91 

92 

93if TYPE_CHECKING: 

94 from .client import ClientTimeout 

95 from .client_reqrep import ConnectionKey 

96 from .tracing import Trace 

97 

98 

99class Connection: 

100 """Represents a single connection.""" 

101 

102 __slots__ = ( 

103 "_key", 

104 "_connector", 

105 "_loop", 

106 "_protocol", 

107 "_callbacks", 

108 "_source_traceback", 

109 ) 

110 

111 def __init__( 

112 self, 

113 connector: "BaseConnector", 

114 key: "ConnectionKey", 

115 protocol: ResponseHandler, 

116 loop: asyncio.AbstractEventLoop, 

117 ) -> None: 

118 self._key = key 

119 self._connector = connector 

120 self._loop = loop 

121 self._protocol: ResponseHandler | None = protocol 

122 self._callbacks: list[Callable[[], None]] = [] 

123 self._source_traceback = ( 

124 traceback.extract_stack(sys._getframe(1)) if loop.get_debug() else None 

125 ) 

126 

127 def __repr__(self) -> str: 

128 return f"Connection<{self._key}>" 

129 

130 def __del__(self, _warnings: Any = warnings) -> None: 

131 if self._protocol is not None: 

132 _warnings.warn( 

133 f"Unclosed connection {self!r}", ResourceWarning, source=self 

134 ) 

135 if self._loop.is_closed(): 

136 return 

137 

138 self._connector._release(self._key, self._protocol, should_close=True) 

139 

140 context = {"client_connection": self, "message": "Unclosed connection"} 

141 if self._source_traceback is not None: 

142 context["source_traceback"] = self._source_traceback 

143 self._loop.call_exception_handler(context) 

144 

145 def __bool__(self) -> Literal[True]: 

146 """Force subclasses to not be falsy, to make checks simpler.""" 

147 return True 

148 

149 @property 

150 def transport(self) -> asyncio.Transport | None: 

151 if self._protocol is None: 

152 return None 

153 return self._protocol.transport 

154 

155 @property 

156 def protocol(self) -> ResponseHandler | None: 

157 return self._protocol 

158 

159 def add_callback(self, callback: Callable[[], None]) -> None: 

160 if callback is not None: 

161 self._callbacks.append(callback) 

162 

163 def _notify_release(self) -> None: 

164 callbacks, self._callbacks = self._callbacks[:], [] 

165 

166 for cb in callbacks: 

167 with suppress(Exception): 

168 cb() 

169 

170 def close(self) -> None: 

171 self._notify_release() 

172 

173 if self._protocol is not None: 

174 self._connector._release(self._key, self._protocol, should_close=True) 

175 self._protocol = None 

176 

177 def release(self) -> None: 

178 self._notify_release() 

179 

180 if self._protocol is not None: 

181 self._connector._release(self._key, self._protocol) 

182 self._protocol = None 

183 

184 @property 

185 def closed(self) -> bool: 

186 return self._protocol is None or not self._protocol.is_connected() 

187 

188 

189class _ConnectTunnelConnection(Connection): 

190 """Special connection wrapper for CONNECT tunnels that must never be pooled. 

191 

192 This connection wraps the proxy connection that will be upgraded with TLS. 

193 It must never be released to the pool because: 

194 1. Its 'closed' future will never complete, causing session.close() to hang 

195 2. It represents an intermediate state, not a reusable connection 

196 3. The real connection (with TLS) will be created separately 

197 """ 

198 

199 def release(self) -> None: 

200 """Do nothing - don't pool or close the connection. 

201 

202 These connections are an intermediate state during the CONNECT tunnel 

203 setup and will be cleaned up naturally after the TLS upgrade. If they 

204 were to be pooled, they would never be properly closed, causing 

205 session.close() to wait forever for their 'closed' future. 

206 """ 

207 

208 

209class _TransportPlaceholder: 

210 """placeholder for BaseConnector.connect function""" 

211 

212 __slots__ = ("closed", "transport") 

213 

214 def __init__(self, closed_future: asyncio.Future[Exception | None]) -> None: 

215 """Initialize a placeholder for a transport.""" 

216 self.closed = closed_future 

217 self.transport = None 

218 

219 def close(self) -> None: 

220 """Close the placeholder.""" 

221 

222 def abort(self) -> None: 

223 """Abort the placeholder (does nothing).""" 

224 

225 

226class BaseConnector: 

227 """Base connector class. 

228 

229 keepalive_timeout - (optional) Keep-alive timeout. 

230 force_close - Set to True to force close and do reconnect 

231 after each request (and between redirects). 

232 limit - The total number of simultaneous connections. 

233 limit_per_host - Number of simultaneous connections to one host. 

234 enable_cleanup_closed - Enables clean-up closed ssl transports. 

235 Disabled by default. 

236 timeout_ceil_threshold - Trigger ceiling of timeout values when 

237 it's above timeout_ceil_threshold. 

238 loop - Optional event loop. 

239 """ 

240 

241 _closed = True # prevent AttributeError in __del__ if ctor was failed 

242 _source_traceback = None 

243 

244 # abort transport after 2 seconds (cleanup broken connections) 

245 _cleanup_closed_period = 2.0 

246 

247 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET 

248 

249 def __init__( 

250 self, 

251 *, 

252 keepalive_timeout: _SENTINEL | None | float = sentinel, 

253 force_close: bool = False, 

254 limit: int = 100, 

255 limit_per_host: int = 0, 

256 enable_cleanup_closed: bool = False, 

257 timeout_ceil_threshold: float = 5, 

258 ) -> None: 

259 if force_close: 

260 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

261 raise ValueError( 

262 "keepalive_timeout cannot be set if force_close is True" 

263 ) 

264 else: 

265 if keepalive_timeout is sentinel: 

266 keepalive_timeout = 15.0 

267 

268 self._timeout_ceil_threshold = timeout_ceil_threshold 

269 

270 loop = asyncio.get_running_loop() 

271 

272 self._closed = False 

273 if loop.get_debug(): 

274 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

275 

276 # Connection pool of reusable connections. 

277 # We use a deque to store connections because it has O(1) popleft() 

278 # and O(1) append() operations to implement a FIFO queue. 

279 self._conns: defaultdict[ 

280 ConnectionKey, deque[tuple[ResponseHandler, float]] 

281 ] = defaultdict(deque) 

282 self._limit = limit 

283 self._limit_per_host = limit_per_host 

284 self._acquired: set[ResponseHandler] = set() 

285 self._acquired_per_host: defaultdict[ConnectionKey, set[ResponseHandler]] = ( 

286 defaultdict(set) 

287 ) 

288 self._keepalive_timeout = cast(float, keepalive_timeout) 

289 self._force_close = force_close 

290 

291 # {host_key: FIFO list of waiters} 

292 # The FIFO is implemented with an OrderedDict with None keys because 

293 # python does not have an ordered set. 

294 self._waiters: defaultdict[ 

295 ConnectionKey, OrderedDict[asyncio.Future[None], None] 

296 ] = defaultdict(OrderedDict) 

297 

298 self._loop = loop 

299 self._factory = functools.partial(ResponseHandler, loop=loop) 

300 

301 # start keep-alive connection cleanup task 

302 self._cleanup_handle: asyncio.TimerHandle | None = None 

303 

304 # start cleanup closed transports task 

305 self._cleanup_closed_handle: asyncio.TimerHandle | None = None 

306 

307 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED: 

308 warnings.warn( 

309 "enable_cleanup_closed ignored because " 

310 "https://github.com/python/cpython/pull/118960 is fixed " 

311 f"in Python version {sys.version_info}", 

312 DeprecationWarning, 

313 stacklevel=2, 

314 ) 

315 enable_cleanup_closed = False 

316 

317 self._cleanup_closed_disabled = not enable_cleanup_closed 

318 self._cleanup_closed_transports: list[asyncio.Transport | None] = [] 

319 

320 self._placeholder_future: asyncio.Future[Exception | None] = ( 

321 loop.create_future() 

322 ) 

323 self._placeholder_future.set_result(None) 

324 self._cleanup_closed() 

325 

326 def __del__(self, _warnings: Any = warnings) -> None: 

327 if self._closed: 

328 return 

329 if not self._conns: 

330 return 

331 

332 conns = [repr(c) for c in self._conns.values()] 

333 

334 self._close_immediately() 

335 

336 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self) 

337 context = { 

338 "connector": self, 

339 "connections": conns, 

340 "message": "Unclosed connector", 

341 } 

342 if self._source_traceback is not None: 

343 context["source_traceback"] = self._source_traceback 

344 self._loop.call_exception_handler(context) 

345 

346 async def __aenter__(self) -> "BaseConnector": 

347 return self 

348 

349 async def __aexit__( 

350 self, 

351 exc_type: type[BaseException] | None = None, 

352 exc_value: BaseException | None = None, 

353 exc_traceback: TracebackType | None = None, 

354 ) -> None: 

355 await self.close() 

356 

357 @property 

358 def force_close(self) -> bool: 

359 """Ultimately close connection on releasing if True.""" 

360 return self._force_close 

361 

362 @property 

363 def limit(self) -> int: 

364 """The total number for simultaneous connections. 

365 

366 If limit is 0 the connector has no limit. 

367 The default limit size is 100. 

368 """ 

369 return self._limit 

370 

371 @property 

372 def limit_per_host(self) -> int: 

373 """The limit for simultaneous connections to the same endpoint. 

374 

375 Endpoints are the same if they are have equal 

376 (host, port, is_ssl) triple. 

377 """ 

378 return self._limit_per_host 

379 

380 def _cleanup(self) -> None: 

381 """Cleanup unused transports.""" 

382 if self._cleanup_handle: 

383 self._cleanup_handle.cancel() 

384 # _cleanup_handle should be unset, otherwise _release() will not 

385 # recreate it ever! 

386 self._cleanup_handle = None 

387 

388 now = monotonic() 

389 timeout = self._keepalive_timeout 

390 

391 if self._conns: 

392 connections = defaultdict(deque) 

393 deadline = now - timeout 

394 for key, conns in self._conns.items(): 

395 alive: deque[tuple[ResponseHandler, float]] = deque() 

396 for proto, use_time in conns: 

397 if proto.is_connected() and use_time - deadline >= 0: 

398 alive.append((proto, use_time)) 

399 continue 

400 transport = proto.transport 

401 proto.close() 

402 if not self._cleanup_closed_disabled and key.is_ssl: 

403 self._cleanup_closed_transports.append(transport) 

404 

405 if alive: 

406 connections[key] = alive 

407 

408 self._conns = connections 

409 

410 if self._conns: 

411 self._cleanup_handle = helpers.weakref_handle( 

412 self, 

413 "_cleanup", 

414 timeout, 

415 self._loop, 

416 timeout_ceil_threshold=self._timeout_ceil_threshold, 

417 ) 

418 

419 def _cleanup_closed(self) -> None: 

420 """Double confirmation for transport close. 

421 

422 Some broken ssl servers may leave socket open without proper close. 

423 """ 

424 if self._cleanup_closed_handle: 

425 self._cleanup_closed_handle.cancel() 

426 

427 for transport in self._cleanup_closed_transports: 

428 if transport is not None: 

429 transport.abort() 

430 

431 self._cleanup_closed_transports = [] 

432 

433 if not self._cleanup_closed_disabled: 

434 self._cleanup_closed_handle = helpers.weakref_handle( 

435 self, 

436 "_cleanup_closed", 

437 self._cleanup_closed_period, 

438 self._loop, 

439 timeout_ceil_threshold=self._timeout_ceil_threshold, 

440 ) 

441 

442 async def close(self, *, abort_ssl: bool = False) -> None: 

443 """Close all opened transports. 

444 

445 :param abort_ssl: If True, SSL connections will be aborted immediately 

446 without performing the shutdown handshake. This provides 

447 faster cleanup at the cost of less graceful disconnection. 

448 """ 

449 waiters = self._close_immediately(abort_ssl=abort_ssl) 

450 if waiters: 

451 results = await asyncio.gather(*waiters, return_exceptions=True) 

452 for res in results: 

453 if isinstance(res, Exception): 

454 err_msg = "Error while closing connector: " + repr(res) 

455 client_logger.debug(err_msg) 

456 

457 def _close_immediately(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]: 

458 waiters: list[Awaitable[object]] = [] 

459 

460 if self._closed: 

461 return waiters 

462 

463 self._closed = True 

464 

465 try: 

466 if self._loop.is_closed(): 

467 return waiters 

468 

469 # cancel cleanup task 

470 if self._cleanup_handle: 

471 self._cleanup_handle.cancel() 

472 

473 # cancel cleanup close task 

474 if self._cleanup_closed_handle: 

475 self._cleanup_closed_handle.cancel() 

476 

477 for data in self._conns.values(): 

478 for proto, _ in data: 

479 if ( 

480 abort_ssl 

481 and proto.transport 

482 and proto.transport.get_extra_info("sslcontext") is not None 

483 ): 

484 proto.abort() 

485 else: 

486 proto.close() 

487 if closed := proto.closed: 

488 waiters.append(closed) 

489 

490 for proto in self._acquired: 

491 if ( 

492 abort_ssl 

493 and proto.transport 

494 and proto.transport.get_extra_info("sslcontext") is not None 

495 ): 

496 proto.abort() 

497 else: 

498 proto.close() 

499 if closed := proto.closed: 

500 waiters.append(closed) 

501 

502 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures 

503 for transport in self._cleanup_closed_transports: 

504 if transport is not None: 

505 transport.abort() 

506 

507 return waiters 

508 

509 finally: 

510 self._conns.clear() 

511 self._acquired.clear() 

512 for keyed_waiters in self._waiters.values(): 

513 for keyed_waiter in keyed_waiters: 

514 keyed_waiter.cancel() 

515 self._waiters.clear() 

516 self._cleanup_handle = None 

517 self._cleanup_closed_transports.clear() 

518 self._cleanup_closed_handle = None 

519 

520 @property 

521 def closed(self) -> bool: 

522 """Is connector closed. 

523 

524 A readonly property. 

525 """ 

526 return self._closed 

527 

528 def _available_connections(self, key: "ConnectionKey") -> int: 

529 """ 

530 Return number of available connections. 

531 

532 The limit, limit_per_host and the connection key are taken into account. 

533 

534 If it returns less than 1 means that there are no connections 

535 available. 

536 """ 

537 # check total available connections 

538 # If there are no limits, this will always return 1 

539 total_remain = 1 

540 

541 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: 

542 return total_remain 

543 

544 # check limit per host 

545 if host_remain := self._limit_per_host: 

546 if acquired := self._acquired_per_host.get(key): 

547 host_remain -= len(acquired) 

548 if total_remain > host_remain: 

549 return host_remain 

550 

551 return total_remain 

552 

553 def _update_proxy_auth_header_and_build_proxy_req( 

554 self, req: ClientRequest 

555 ) -> ClientRequestBase: 

556 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests.""" 

557 url = req.proxy 

558 assert url is not None 

559 headers = req.proxy_headers or CIMultiDict[str]() 

560 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

561 proxy_req = ClientRequestBase( 

562 hdrs.METH_GET, 

563 url, 

564 headers=headers, 

565 auth=req.proxy_auth, 

566 loop=self._loop, 

567 ssl=req.ssl, 

568 ) 

569 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

570 if auth is not None: 

571 if not req.is_ssl(): 

572 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

573 else: 

574 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

575 return proxy_req 

576 

577 async def connect( 

578 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

579 ) -> Connection: 

580 """Get from pool or create new connection.""" 

581 key = req.connection_key 

582 if (conn := await self._get(key, traces)) is not None: 

583 # If we do not have to wait and we can get a connection from the pool 

584 # we can avoid the timeout ceil logic and directly return the connection 

585 if req.proxy: 

586 self._update_proxy_auth_header_and_build_proxy_req(req) 

587 return conn 

588 

589 async with ceil_timeout(timeout.connect, timeout.ceil_threshold): 

590 if self._available_connections(key) <= 0: 

591 await self._wait_for_available_connection(key, traces) 

592 if (conn := await self._get(key, traces)) is not None: 

593 if req.proxy: 

594 self._update_proxy_auth_header_and_build_proxy_req(req) 

595 return conn 

596 

597 placeholder = cast( 

598 ResponseHandler, _TransportPlaceholder(self._placeholder_future) 

599 ) 

600 self._acquired.add(placeholder) 

601 if self._limit_per_host: 

602 self._acquired_per_host[key].add(placeholder) 

603 

604 try: 

605 # Traces are done inside the try block to ensure that the 

606 # that the placeholder is still cleaned up if an exception 

607 # is raised. 

608 if traces: 

609 for trace in traces: 

610 await trace.send_connection_create_start() 

611 proto = await self._create_connection(req, traces, timeout) 

612 if traces: 

613 for trace in traces: 

614 await trace.send_connection_create_end() 

615 except BaseException: 

616 self._release_acquired(key, placeholder) 

617 raise 

618 else: 

619 if self._closed: 

620 proto.close() 

621 raise ClientConnectionError("Connector is closed.") 

622 

623 # The connection was successfully created, drop the placeholder 

624 # and add the real connection to the acquired set. There should 

625 # be no awaits after the proto is added to the acquired set 

626 # to ensure that the connection is not left in the acquired set 

627 # on cancellation. 

628 self._acquired.remove(placeholder) 

629 self._acquired.add(proto) 

630 if self._limit_per_host: 

631 acquired_per_host = self._acquired_per_host[key] 

632 acquired_per_host.remove(placeholder) 

633 acquired_per_host.add(proto) 

634 return Connection(self, key, proto, self._loop) 

635 

636 async def _wait_for_available_connection( 

637 self, key: "ConnectionKey", traces: list["Trace"] 

638 ) -> None: 

639 """Wait for an available connection slot.""" 

640 # We loop here because there is a race between 

641 # the connection limit check and the connection 

642 # being acquired. If the connection is acquired 

643 # between the check and the await statement, we 

644 # need to loop again to check if the connection 

645 # slot is still available. 

646 attempts = 0 

647 while True: 

648 fut: asyncio.Future[None] = self._loop.create_future() 

649 keyed_waiters = self._waiters[key] 

650 keyed_waiters[fut] = None 

651 if attempts: 

652 # If we have waited before, we need to move the waiter 

653 # to the front of the queue as otherwise we might get 

654 # starved and hit the timeout. 

655 keyed_waiters.move_to_end(fut, last=False) 

656 

657 try: 

658 # Traces happen in the try block to ensure that the 

659 # the waiter is still cleaned up if an exception is raised. 

660 if traces: 

661 for trace in traces: 

662 await trace.send_connection_queued_start() 

663 await fut 

664 if traces: 

665 for trace in traces: 

666 await trace.send_connection_queued_end() 

667 finally: 

668 # pop the waiter from the queue if its still 

669 # there and not already removed by _release_waiter 

670 keyed_waiters.pop(fut, None) 

671 if not self._waiters.get(key, True): 

672 del self._waiters[key] 

673 

674 if self._available_connections(key) > 0: 

675 break 

676 attempts += 1 

677 

678 async def _get( 

679 self, key: "ConnectionKey", traces: list["Trace"] 

680 ) -> Connection | None: 

681 """Get next reusable connection for the key or None. 

682 

683 The connection will be marked as acquired. 

684 """ 

685 if (conns := self._conns.get(key)) is None: 

686 return None 

687 

688 t1 = monotonic() 

689 while conns: 

690 proto, t0 = conns.popleft() 

691 # We will we reuse the connection if its connected and 

692 # the keepalive timeout has not been exceeded 

693 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: 

694 if not conns: 

695 # The very last connection was reclaimed: drop the key 

696 del self._conns[key] 

697 self._acquired.add(proto) 

698 if self._limit_per_host: 

699 self._acquired_per_host[key].add(proto) 

700 if traces: 

701 for trace in traces: 

702 try: 

703 await trace.send_connection_reuseconn() 

704 except BaseException: 

705 self._release_acquired(key, proto) 

706 raise 

707 return Connection(self, key, proto, self._loop) 

708 

709 # Connection cannot be reused, close it 

710 transport = proto.transport 

711 proto.close() 

712 # only for SSL transports 

713 if not self._cleanup_closed_disabled and key.is_ssl: 

714 self._cleanup_closed_transports.append(transport) 

715 

716 # No more connections: drop the key 

717 del self._conns[key] 

718 return None 

719 

720 def _release_waiter(self) -> None: 

721 """ 

722 Iterates over all waiters until one to be released is found. 

723 

724 The one to be released is not finished and 

725 belongs to a host that has available connections. 

726 """ 

727 if not self._waiters: 

728 return 

729 

730 # Having the dict keys ordered this avoids to iterate 

731 # at the same order at each call. 

732 queues = list(self._waiters) 

733 random.shuffle(queues) 

734 

735 for key in queues: 

736 if self._available_connections(key) < 1: 

737 continue 

738 

739 waiters = self._waiters[key] 

740 while waiters: 

741 waiter, _ = waiters.popitem(last=False) 

742 if not waiter.done(): 

743 waiter.set_result(None) 

744 return 

745 

746 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

747 """Release acquired connection.""" 

748 if self._closed: 

749 # acquired connection is already released on connector closing 

750 return 

751 

752 self._acquired.discard(proto) 

753 if self._limit_per_host and (conns := self._acquired_per_host.get(key)): 

754 conns.discard(proto) 

755 if not conns: 

756 del self._acquired_per_host[key] 

757 self._release_waiter() 

758 

759 def _release( 

760 self, 

761 key: "ConnectionKey", 

762 protocol: ResponseHandler, 

763 *, 

764 should_close: bool = False, 

765 ) -> None: 

766 if self._closed: 

767 # acquired connection is already released on connector closing 

768 return 

769 

770 self._release_acquired(key, protocol) 

771 

772 if self._force_close or should_close or protocol.should_close: 

773 transport = protocol.transport 

774 protocol.close() 

775 if key.is_ssl and not self._cleanup_closed_disabled: 

776 self._cleanup_closed_transports.append(transport) 

777 return 

778 

779 self._conns[key].append((protocol, monotonic())) 

780 

781 if self._cleanup_handle is None: 

782 self._cleanup_handle = helpers.weakref_handle( 

783 self, 

784 "_cleanup", 

785 self._keepalive_timeout, 

786 self._loop, 

787 timeout_ceil_threshold=self._timeout_ceil_threshold, 

788 ) 

789 

790 async def _create_connection( 

791 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

792 ) -> ResponseHandler: 

793 raise NotImplementedError() 

794 

795 

796class _DNSCacheTable: 

797 def __init__(self, ttl: float | None = None, max_size: int = 1000) -> None: 

798 self._addrs_rr: OrderedDict[ 

799 tuple[str, int], tuple[Iterator[ResolveResult], int] 

800 ] = OrderedDict() 

801 self._timestamps: dict[tuple[str, int], float] = {} 

802 self._ttl = ttl 

803 self._max_size = max_size 

804 

805 def __contains__(self, host: object) -> bool: 

806 return host in self._addrs_rr 

807 

808 def add(self, key: tuple[str, int], addrs: list[ResolveResult]) -> None: 

809 if key in self._addrs_rr: 

810 self._addrs_rr.move_to_end(key) 

811 

812 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

813 

814 if self._ttl is not None: 

815 self._timestamps[key] = monotonic() 

816 

817 if len(self._addrs_rr) > self._max_size: 

818 oldest_key, _ = self._addrs_rr.popitem(last=False) 

819 self._timestamps.pop(oldest_key, None) 

820 

821 def remove(self, key: tuple[str, int]) -> None: 

822 self._addrs_rr.pop(key, None) 

823 self._timestamps.pop(key, None) 

824 

825 def clear(self) -> None: 

826 self._addrs_rr.clear() 

827 self._timestamps.clear() 

828 

829 def next_addrs(self, key: tuple[str, int]) -> list[ResolveResult]: 

830 loop, length = self._addrs_rr[key] 

831 addrs = list(islice(loop, length)) 

832 # Consume one more element to shift internal state of `cycle` 

833 next(loop) 

834 self._addrs_rr.move_to_end(key) 

835 return addrs 

836 

837 def expired(self, key: tuple[str, int]) -> bool: 

838 if self._ttl is None: 

839 return False 

840 

841 return self._timestamps[key] + self._ttl < monotonic() 

842 

843 

844def _make_ssl_context(verified: bool) -> SSLContext: 

845 """Create SSL context. 

846 

847 This method is not async-friendly and should be called from a thread 

848 because it will load certificates from disk and do other blocking I/O. 

849 """ 

850 if ssl is None: 

851 # No ssl support 

852 return None # type: ignore[unreachable] 

853 if verified: 

854 sslcontext = ssl.create_default_context() 

855 else: 

856 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

857 sslcontext.options |= ssl.OP_NO_SSLv2 

858 sslcontext.options |= ssl.OP_NO_SSLv3 

859 sslcontext.check_hostname = False 

860 sslcontext.verify_mode = ssl.CERT_NONE 

861 sslcontext.options |= ssl.OP_NO_COMPRESSION 

862 sslcontext.set_default_verify_paths() 

863 sslcontext.set_alpn_protocols(("http/1.1",)) 

864 return sslcontext 

865 

866 

867# The default SSLContext objects are created at import time 

868# since they do blocking I/O to load certificates from disk, 

869# and imports should always be done before the event loop starts 

870# or in a thread. 

871_SSL_CONTEXT_VERIFIED = _make_ssl_context(True) 

872_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False) 

873 

874 

875class TCPConnector(BaseConnector): 

876 """TCP connector. 

877 

878 verify_ssl - Set to True to check ssl certifications. 

879 fingerprint - Pass the binary sha256 

880 digest of the expected certificate in DER format to verify 

881 that the certificate the server presents matches. See also 

882 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning 

883 resolver - Enable DNS lookups and use this 

884 resolver 

885 use_dns_cache - Use memory cache for DNS lookups. 

886 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

887 family - socket address family 

888 local_addr - local tuple of (host, port) to bind socket to 

889 

890 keepalive_timeout - (optional) Keep-alive timeout. 

891 force_close - Set to True to force close and do reconnect 

892 after each request (and between redirects). 

893 limit - The total number of simultaneous connections. 

894 limit_per_host - Number of simultaneous connections to one host. 

895 enable_cleanup_closed - Enables clean-up closed ssl transports. 

896 Disabled by default. 

897 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

898 as defined in RFC 8305. To disable 

899 the happy eyeballs algorithm, set to None. 

900 interleave - “First Address Family Count” as defined in RFC 8305 

901 loop - Optional event loop. 

902 socket_factory - A SocketFactoryType function that, if supplied, 

903 will be used to create sockets given an 

904 AddrInfoType. 

905 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0. 

906 Grace period for SSL shutdown handshake on TLS 

907 connections. Default is 0 seconds (immediate abort). 

908 This parameter allowed for a clean SSL shutdown by 

909 notifying the remote peer of connection closure, 

910 while avoiding excessive delays during connector cleanup. 

911 Note: Only takes effect on Python 3.11+. 

912 """ 

913 

914 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"}) 

915 

916 def __init__( 

917 self, 

918 *, 

919 use_dns_cache: bool = True, 

920 ttl_dns_cache: int | None = 10, 

921 dns_cache_max_size: int = 1000, 

922 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC, 

923 ssl: bool | Fingerprint | SSLContext = True, 

924 local_addr: tuple[str, int] | None = None, 

925 resolver: AbstractResolver | None = None, 

926 keepalive_timeout: None | float | _SENTINEL = sentinel, 

927 force_close: bool = False, 

928 limit: int = 100, 

929 limit_per_host: int = 0, 

930 enable_cleanup_closed: bool = False, 

931 timeout_ceil_threshold: float = 5, 

932 happy_eyeballs_delay: float | None = 0.25, 

933 interleave: int | None = None, 

934 socket_factory: SocketFactoryType | None = None, 

935 ssl_shutdown_timeout: _SENTINEL | None | float = sentinel, 

936 ): 

937 super().__init__( 

938 keepalive_timeout=keepalive_timeout, 

939 force_close=force_close, 

940 limit=limit, 

941 limit_per_host=limit_per_host, 

942 enable_cleanup_closed=enable_cleanup_closed, 

943 timeout_ceil_threshold=timeout_ceil_threshold, 

944 ) 

945 

946 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

947 raise TypeError( 

948 "ssl should be SSLContext, Fingerprint, or bool, " 

949 f"got {ssl!r} instead." 

950 ) 

951 self._ssl = ssl 

952 

953 self._resolver: AbstractResolver 

954 if resolver is None: 

955 self._resolver = DefaultResolver() 

956 self._resolver_owner = True 

957 else: 

958 self._resolver = resolver 

959 self._resolver_owner = False 

960 

961 self._use_dns_cache = use_dns_cache 

962 self._cached_hosts = _DNSCacheTable( 

963 ttl=ttl_dns_cache, max_size=dns_cache_max_size 

964 ) 

965 self._throttle_dns_futures: dict[tuple[str, int], set[asyncio.Future[None]]] = ( 

966 {} 

967 ) 

968 self._family = family 

969 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

970 self._happy_eyeballs_delay = happy_eyeballs_delay 

971 self._interleave = interleave 

972 self._resolve_host_tasks: set[asyncio.Task[list[ResolveResult]]] = set() 

973 self._socket_factory = socket_factory 

974 self._ssl_shutdown_timeout: float | None 

975 

976 # Handle ssl_shutdown_timeout with warning for Python < 3.11 

977 if ssl_shutdown_timeout is sentinel: 

978 self._ssl_shutdown_timeout = 0 

979 else: 

980 # Deprecation warning for ssl_shutdown_timeout parameter 

981 warnings.warn( 

982 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0", 

983 DeprecationWarning, 

984 stacklevel=2, 

985 ) 

986 if ( 

987 sys.version_info < (3, 11) 

988 and ssl_shutdown_timeout is not None 

989 and ssl_shutdown_timeout != 0 

990 ): 

991 warnings.warn( 

992 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; " 

993 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.", 

994 RuntimeWarning, 

995 stacklevel=2, 

996 ) 

997 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

998 

999 async def close(self, *, abort_ssl: bool = False) -> None: 

1000 """Close all opened transports. 

1001 

1002 :param abort_ssl: If True, SSL connections will be aborted immediately 

1003 without performing the shutdown handshake. If False (default), 

1004 the behavior is determined by ssl_shutdown_timeout: 

1005 - If ssl_shutdown_timeout=0: connections are aborted 

1006 - If ssl_shutdown_timeout>0: graceful shutdown is performed 

1007 """ 

1008 if self._resolver_owner: 

1009 await self._resolver.close() 

1010 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default 

1011 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0) 

1012 

1013 def _close_immediately(self, *, abort_ssl: bool = False) -> list[Awaitable[object]]: 

1014 for fut in chain.from_iterable(self._throttle_dns_futures.values()): 

1015 fut.cancel() 

1016 

1017 waiters = super()._close_immediately(abort_ssl=abort_ssl) 

1018 

1019 for t in self._resolve_host_tasks: 

1020 t.cancel() 

1021 waiters.append(t) 

1022 

1023 return waiters 

1024 

1025 @property 

1026 def family(self) -> int: 

1027 """Socket family like AF_INET.""" 

1028 return self._family 

1029 

1030 @property 

1031 def use_dns_cache(self) -> bool: 

1032 """True if local DNS caching is enabled.""" 

1033 return self._use_dns_cache 

1034 

1035 def clear_dns_cache(self, host: str | None = None, port: int | None = None) -> None: 

1036 """Remove specified host/port or clear all dns local cache.""" 

1037 if host is not None and port is not None: 

1038 self._cached_hosts.remove((host, port)) 

1039 elif host is not None or port is not None: 

1040 raise ValueError("either both host and port or none of them are allowed") 

1041 else: 

1042 self._cached_hosts.clear() 

1043 

1044 async def _resolve_host( 

1045 self, host: str, port: int, traces: Sequence["Trace"] | None = None 

1046 ) -> list[ResolveResult]: 

1047 """Resolve host and return list of addresses.""" 

1048 if is_ip_address(host): 

1049 return [ 

1050 { 

1051 "hostname": host, 

1052 "host": host, 

1053 "port": port, 

1054 "family": self._family, 

1055 "proto": 0, 

1056 "flags": 0, 

1057 } 

1058 ] 

1059 

1060 if not self._use_dns_cache: 

1061 if traces: 

1062 for trace in traces: 

1063 await trace.send_dns_resolvehost_start(host) 

1064 

1065 res = await self._resolver.resolve(host, port, family=self._family) 

1066 

1067 if traces: 

1068 for trace in traces: 

1069 await trace.send_dns_resolvehost_end(host) 

1070 

1071 return res 

1072 

1073 key = (host, port) 

1074 if key in self._cached_hosts and not self._cached_hosts.expired(key): 

1075 # get result early, before any await (#4014) 

1076 result = self._cached_hosts.next_addrs(key) 

1077 

1078 if traces: 

1079 for trace in traces: 

1080 await trace.send_dns_cache_hit(host) 

1081 return result 

1082 

1083 futures: set[asyncio.Future[None]] 

1084 # 

1085 # If multiple connectors are resolving the same host, we wait 

1086 # for the first one to resolve and then use the result for all of them. 

1087 # We use a throttle to ensure that we only resolve the host once 

1088 # and then use the result for all the waiters. 

1089 # 

1090 if key in self._throttle_dns_futures: 

1091 # get futures early, before any await (#4014) 

1092 futures = self._throttle_dns_futures[key] 

1093 future: asyncio.Future[None] = self._loop.create_future() 

1094 futures.add(future) 

1095 if traces: 

1096 for trace in traces: 

1097 await trace.send_dns_cache_hit(host) 

1098 try: 

1099 await future 

1100 finally: 

1101 futures.discard(future) 

1102 return self._cached_hosts.next_addrs(key) 

1103 

1104 # update dict early, before any await (#4014) 

1105 self._throttle_dns_futures[key] = futures = set() 

1106 # In this case we need to create a task to ensure that we can shield 

1107 # the task from cancellation as cancelling this lookup should not cancel 

1108 # the underlying lookup or else the cancel event will get broadcast to 

1109 # all the waiters across all connections. 

1110 # 

1111 coro = self._resolve_host_with_throttle(key, host, port, futures, traces) 

1112 loop = asyncio.get_running_loop() 

1113 if sys.version_info >= (3, 12): 

1114 # Optimization for Python 3.12, try to send immediately 

1115 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) 

1116 else: 

1117 resolved_host_task = loop.create_task(coro) 

1118 

1119 if not resolved_host_task.done(): 

1120 self._resolve_host_tasks.add(resolved_host_task) 

1121 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard) 

1122 

1123 try: 

1124 return await asyncio.shield(resolved_host_task) 

1125 except asyncio.CancelledError: 

1126 

1127 def drop_exception(fut: "asyncio.Future[list[ResolveResult]]") -> None: 

1128 with suppress(Exception, asyncio.CancelledError): 

1129 fut.result() 

1130 

1131 resolved_host_task.add_done_callback(drop_exception) 

1132 raise 

1133 

1134 async def _resolve_host_with_throttle( 

1135 self, 

1136 key: tuple[str, int], 

1137 host: str, 

1138 port: int, 

1139 futures: set[asyncio.Future[None]], 

1140 traces: Sequence["Trace"] | None, 

1141 ) -> list[ResolveResult]: 

1142 """Resolve host and set result for all waiters. 

1143 

1144 This method must be run in a task and shielded from cancellation 

1145 to avoid cancelling the underlying lookup. 

1146 """ 

1147 try: 

1148 if traces: 

1149 for trace in traces: 

1150 await trace.send_dns_cache_miss(host) 

1151 

1152 for trace in traces: 

1153 await trace.send_dns_resolvehost_start(host) 

1154 

1155 addrs = await self._resolver.resolve(host, port, family=self._family) 

1156 if traces: 

1157 for trace in traces: 

1158 await trace.send_dns_resolvehost_end(host) 

1159 

1160 self._cached_hosts.add(key, addrs) 

1161 for fut in futures: 

1162 set_result(fut, None) 

1163 except BaseException as e: 

1164 # any DNS exception is set for the waiters to raise the same exception. 

1165 # This coro is always run in task that is shielded from cancellation so 

1166 # we should never be propagating cancellation here. 

1167 for fut in futures: 

1168 set_exception(fut, e) 

1169 raise 

1170 finally: 

1171 self._throttle_dns_futures.pop(key) 

1172 

1173 return self._cached_hosts.next_addrs(key) 

1174 

1175 async def _create_connection( 

1176 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1177 ) -> ResponseHandler: 

1178 """Create connection. 

1179 

1180 Has same keyword arguments as BaseEventLoop.create_connection. 

1181 """ 

1182 if req.proxy: 

1183 _, proto = await self._create_proxy_connection(req, traces, timeout) 

1184 else: 

1185 _, proto = await self._create_direct_connection(req, traces, timeout) 

1186 

1187 return proto 

1188 

1189 def _get_ssl_context(self, req: ClientRequestBase) -> SSLContext | None: 

1190 """Logic to get the correct SSL context 

1191 

1192 0. if req.ssl is false, return None 

1193 

1194 1. if ssl_context is specified in req, use it 

1195 2. if _ssl_context is specified in self, use it 

1196 3. otherwise: 

1197 1. if verify_ssl is not specified in req, use self.ssl_context 

1198 (will generate a default context according to self.verify_ssl) 

1199 2. if verify_ssl is True in req, generate a default SSL context 

1200 3. if verify_ssl is False in req, generate a SSL context that 

1201 won't verify 

1202 """ 

1203 if not req.is_ssl(): 

1204 return None 

1205 

1206 if ssl is None: # pragma: no cover 

1207 raise RuntimeError("SSL is not supported.") 

1208 sslcontext = req.ssl 

1209 if isinstance(sslcontext, ssl.SSLContext): 

1210 return sslcontext 

1211 if sslcontext is not True: 

1212 # not verified or fingerprinted 

1213 return _SSL_CONTEXT_UNVERIFIED 

1214 sslcontext = self._ssl 

1215 if isinstance(sslcontext, ssl.SSLContext): 

1216 return sslcontext 

1217 if sslcontext is not True: 

1218 # not verified or fingerprinted 

1219 return _SSL_CONTEXT_UNVERIFIED 

1220 return _SSL_CONTEXT_VERIFIED 

1221 

1222 def _get_fingerprint(self, req: ClientRequestBase) -> "Fingerprint | None": 

1223 ret = req.ssl 

1224 if isinstance(ret, Fingerprint): 

1225 return ret 

1226 ret = self._ssl 

1227 if isinstance(ret, Fingerprint): 

1228 return ret 

1229 return None 

1230 

1231 async def _wrap_create_connection( 

1232 self, 

1233 *args: Any, 

1234 addr_infos: list[AddrInfoType], 

1235 req: ClientRequestBase, 

1236 timeout: "ClientTimeout", 

1237 client_error: type[Exception] = ClientConnectorError, 

1238 **kwargs: Any, 

1239 ) -> tuple[asyncio.Transport, ResponseHandler]: 

1240 try: 

1241 async with ceil_timeout( 

1242 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1243 ): 

1244 sock = await aiohappyeyeballs.start_connection( 

1245 addr_infos=addr_infos, 

1246 local_addr_infos=self._local_addr_infos, 

1247 happy_eyeballs_delay=self._happy_eyeballs_delay, 

1248 interleave=self._interleave, 

1249 loop=self._loop, 

1250 socket_factory=self._socket_factory, 

1251 ) 

1252 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used 

1253 if ( 

1254 kwargs.get("ssl") 

1255 and self._ssl_shutdown_timeout 

1256 and sys.version_info >= (3, 11) 

1257 ): 

1258 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout 

1259 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

1260 except cert_errors as exc: 

1261 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1262 except ssl_errors as exc: 

1263 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1264 except OSError as exc: 

1265 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1266 raise 

1267 raise client_error(req.connection_key, exc) from exc 

1268 

1269 def _warn_about_tls_in_tls( 

1270 self, 

1271 underlying_transport: asyncio.Transport, 

1272 req: ClientRequest, 

1273 ) -> None: 

1274 """Issue a warning if the requested URL has HTTPS scheme.""" 

1275 if req.url.scheme != "https": 

1276 return 

1277 

1278 # TLS-in-TLS only applies when the proxy itself is HTTPS. 

1279 # When the proxy is HTTP, start_tls upgrades a plain TCP connection, 

1280 # which is standard TLS and works on all event loops and Python versions. 

1281 if req.proxy is None or req.proxy.scheme != "https": 

1282 return 

1283 

1284 # Check if uvloop is being used, which supports TLS in TLS, 

1285 # otherwise assume that asyncio's native transport is being used. 

1286 if type(underlying_transport).__module__.startswith("uvloop"): 

1287 return 

1288 

1289 # Support in asyncio was added in Python 3.11 (bpo-44011) 

1290 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr( 

1291 underlying_transport, 

1292 "_start_tls_compatible", 

1293 False, 

1294 ) 

1295 

1296 if asyncio_supports_tls_in_tls: 

1297 return 

1298 

1299 warnings.warn( 

1300 "An HTTPS request is being sent through an HTTPS proxy. " 

1301 "This support for TLS in TLS is known to be disabled " 

1302 "in the stdlib asyncio. This is why you'll probably see " 

1303 "an error in the log below.\n\n" 

1304 "It is possible to enable it via monkeypatching. " 

1305 "For more details, see:\n" 

1306 "* https://bugs.python.org/issue37179\n" 

1307 "* https://github.com/python/cpython/pull/28073\n\n" 

1308 "You can temporarily patch this as follows:\n" 

1309 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1310 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1311 RuntimeWarning, 

1312 source=self, 

1313 # Why `4`? At least 3 of the calls in the stack originate 

1314 # from the methods in this class. 

1315 stacklevel=3, 

1316 ) 

1317 

1318 async def _start_tls_connection( 

1319 self, 

1320 underlying_transport: asyncio.Transport, 

1321 req: ClientRequest, 

1322 timeout: "ClientTimeout", 

1323 client_error: type[Exception] = ClientConnectorError, 

1324 ) -> tuple[asyncio.BaseTransport, ResponseHandler]: 

1325 """Wrap the raw TCP transport with TLS.""" 

1326 tls_proto = self._factory() # Create a brand new proto for TLS 

1327 sslcontext = self._get_ssl_context(req) 

1328 if TYPE_CHECKING: 

1329 # _start_tls_connection is unreachable in the current code path 

1330 # if sslcontext is None. 

1331 assert sslcontext is not None 

1332 

1333 try: 

1334 async with ceil_timeout( 

1335 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1336 ): 

1337 try: 

1338 # ssl_shutdown_timeout is only available in Python 3.11+ 

1339 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout: 

1340 tls_transport = await self._loop.start_tls( 

1341 underlying_transport, 

1342 tls_proto, 

1343 sslcontext, 

1344 server_hostname=req.server_hostname or req.url.raw_host, 

1345 ssl_handshake_timeout=timeout.total, 

1346 ssl_shutdown_timeout=self._ssl_shutdown_timeout, 

1347 ) 

1348 else: 

1349 tls_transport = await self._loop.start_tls( 

1350 underlying_transport, 

1351 tls_proto, 

1352 sslcontext, 

1353 server_hostname=req.server_hostname or req.url.raw_host, 

1354 ssl_handshake_timeout=timeout.total, 

1355 ) 

1356 except BaseException: 

1357 # We need to close the underlying transport since 

1358 # `start_tls()` probably failed before it had a 

1359 # chance to do this: 

1360 if self._ssl_shutdown_timeout == 0: 

1361 underlying_transport.abort() 

1362 else: 

1363 underlying_transport.close() 

1364 raise 

1365 if isinstance(tls_transport, asyncio.Transport): 

1366 fingerprint = self._get_fingerprint(req) 

1367 if fingerprint: 

1368 try: 

1369 fingerprint.check(tls_transport) 

1370 except ServerFingerprintMismatch: 

1371 tls_transport.close() 

1372 if not self._cleanup_closed_disabled: 

1373 self._cleanup_closed_transports.append(tls_transport) 

1374 raise 

1375 except cert_errors as exc: 

1376 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1377 except ssl_errors as exc: 

1378 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1379 except OSError as exc: 

1380 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1381 raise 

1382 raise client_error(req.connection_key, exc) from exc 

1383 except TypeError as type_err: 

1384 # Example cause looks like this: 

1385 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1386 # object at 0x7f760615e460> is not supported by start_tls() 

1387 

1388 raise ClientConnectionError( 

1389 "Cannot initialize a TLS-in-TLS connection to host " 

1390 f"{req.url.host!s}:{req.url.port:d} through an underlying connection " 

1391 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1392 f"[{type_err!s}]" 

1393 ) from type_err 

1394 else: 

1395 if tls_transport is None: 

1396 msg = "Failed to start TLS (possibly caused by closing transport)" 

1397 raise client_error(req.connection_key, OSError(msg)) 

1398 tls_proto.connection_made( 

1399 tls_transport 

1400 ) # Kick the state machine of the new TLS protocol 

1401 

1402 return tls_transport, tls_proto 

1403 

1404 def _convert_hosts_to_addr_infos( 

1405 self, hosts: list[ResolveResult] 

1406 ) -> list[AddrInfoType]: 

1407 """Converts the list of hosts to a list of addr_infos. 

1408 

1409 The list of hosts is the result of a DNS lookup. The list of 

1410 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1411 """ 

1412 addr_infos: list[AddrInfoType] = [] 

1413 for hinfo in hosts: 

1414 host = hinfo["host"] 

1415 is_ipv6 = ":" in host 

1416 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1417 if self._family and self._family != family: 

1418 continue 

1419 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1420 addr_infos.append( 

1421 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1422 ) 

1423 return addr_infos 

1424 

1425 async def _create_direct_connection( 

1426 self, 

1427 req: ClientRequestBase, 

1428 traces: list["Trace"], 

1429 timeout: "ClientTimeout", 

1430 *, 

1431 client_error: type[Exception] = ClientConnectorError, 

1432 ) -> tuple[asyncio.Transport, ResponseHandler]: 

1433 sslcontext = self._get_ssl_context(req) 

1434 fingerprint = self._get_fingerprint(req) 

1435 

1436 host = req.url.raw_host 

1437 assert host is not None 

1438 # Replace multiple trailing dots with a single one. 

1439 # A trailing dot is only present for fully-qualified domain names. 

1440 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1441 if host.endswith(".."): 

1442 host = host.rstrip(".") + "." 

1443 port = req.url.port 

1444 assert port is not None 

1445 try: 

1446 # Cancelling this lookup should not cancel the underlying lookup 

1447 # or else the cancel event will get broadcast to all the waiters 

1448 # across all connections. 

1449 hosts = await self._resolve_host(host, port, traces=traces) 

1450 except OSError as exc: 

1451 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1452 raise 

1453 # in case of proxy it is not ClientProxyConnectionError 

1454 # it is problem of resolving proxy ip itself 

1455 raise ClientConnectorDNSError(req.connection_key, exc) from exc 

1456 

1457 last_exc: Exception | None = None 

1458 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1459 while addr_infos: 

1460 # Strip trailing dots, certificates contain FQDN without dots. 

1461 # See https://github.com/aio-libs/aiohttp/issues/3636 

1462 server_hostname = ( 

1463 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1464 ) 

1465 

1466 try: 

1467 transp, proto = await self._wrap_create_connection( 

1468 self._factory, 

1469 timeout=timeout, 

1470 ssl=sslcontext, 

1471 addr_infos=addr_infos, 

1472 server_hostname=server_hostname, 

1473 req=req, 

1474 client_error=client_error, 

1475 ) 

1476 except (ClientConnectorError, asyncio.TimeoutError) as exc: 

1477 last_exc = exc 

1478 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1479 continue 

1480 

1481 if req.is_ssl() and fingerprint: 

1482 try: 

1483 fingerprint.check(transp) 

1484 except ServerFingerprintMismatch as exc: 

1485 transp.close() 

1486 if not self._cleanup_closed_disabled: 

1487 self._cleanup_closed_transports.append(transp) 

1488 last_exc = exc 

1489 # Remove the bad peer from the list of addr_infos 

1490 sock: socket.socket = transp.get_extra_info("socket") 

1491 bad_peer = sock.getpeername() 

1492 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1493 continue 

1494 

1495 return transp, proto 

1496 assert last_exc is not None 

1497 raise last_exc 

1498 

1499 async def _create_proxy_connection( 

1500 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1501 ) -> tuple[asyncio.BaseTransport, ResponseHandler]: 

1502 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req) 

1503 

1504 # create connection to proxy server 

1505 transport, proto = await self._create_direct_connection( 

1506 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1507 ) 

1508 

1509 if req.is_ssl(): 

1510 self._warn_about_tls_in_tls(transport, req) 

1511 

1512 # For HTTPS requests over HTTP proxy 

1513 # we must notify proxy to tunnel connection 

1514 # so we send CONNECT command: 

1515 # CONNECT www.python.org:443 HTTP/1.1 

1516 # Host: www.python.org 

1517 # 

1518 # next we must do TLS handshake and so on 

1519 # to do this we must wrap raw socket into secure one 

1520 # asyncio handles this perfectly 

1521 proxy_req.method = hdrs.METH_CONNECT 

1522 proxy_req.url = req.url 

1523 key = req.connection_key._replace( 

1524 proxy=None, proxy_auth=None, proxy_headers_hash=None 

1525 ) 

1526 conn = _ConnectTunnelConnection(self, key, proto, self._loop) 

1527 proxy_resp = await proxy_req._send(conn) 

1528 try: 

1529 protocol = conn._protocol 

1530 assert protocol is not None 

1531 

1532 # read_until_eof=True will ensure the connection isn't closed 

1533 # once the response is received and processed allowing 

1534 # START_TLS to work on the connection below. 

1535 protocol.set_response_params( 

1536 read_until_eof=True, 

1537 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1538 ) 

1539 resp = await proxy_resp.start(conn) 

1540 except BaseException: 

1541 proxy_resp.close() 

1542 conn.close() 

1543 raise 

1544 else: 

1545 conn._protocol = None 

1546 try: 

1547 if resp.status != 200: 

1548 message = resp.reason 

1549 if message is None: 

1550 message = HTTPStatus(resp.status).phrase 

1551 raise ClientHttpProxyError( 

1552 proxy_resp.request_info, 

1553 resp.history, 

1554 status=resp.status, 

1555 message=message, 

1556 headers=resp.headers, 

1557 ) 

1558 except BaseException: 

1559 # It shouldn't be closed in `finally` because it's fed to 

1560 # `loop.start_tls()` and the docs say not to touch it after 

1561 # passing there. 

1562 transport.close() 

1563 raise 

1564 

1565 return await self._start_tls_connection( 

1566 # Access the old transport for the last time before it's 

1567 # closed and forgotten forever: 

1568 transport, 

1569 req=req, 

1570 timeout=timeout, 

1571 ) 

1572 finally: 

1573 proxy_resp.close() 

1574 

1575 return transport, proto 

1576 

1577 

1578class UnixConnector(BaseConnector): 

1579 """Unix socket connector. 

1580 

1581 path - Unix socket path. 

1582 keepalive_timeout - (optional) Keep-alive timeout. 

1583 force_close - Set to True to force close and do reconnect 

1584 after each request (and between redirects). 

1585 limit - The total number of simultaneous connections. 

1586 limit_per_host - Number of simultaneous connections to one host. 

1587 loop - Optional event loop. 

1588 """ 

1589 

1590 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"}) 

1591 

1592 def __init__( 

1593 self, 

1594 path: str, 

1595 force_close: bool = False, 

1596 keepalive_timeout: _SENTINEL | float | None = sentinel, 

1597 limit: int = 100, 

1598 limit_per_host: int = 0, 

1599 ) -> None: 

1600 super().__init__( 

1601 force_close=force_close, 

1602 keepalive_timeout=keepalive_timeout, 

1603 limit=limit, 

1604 limit_per_host=limit_per_host, 

1605 ) 

1606 self._path = path 

1607 

1608 @property 

1609 def path(self) -> str: 

1610 """Path to unix socket.""" 

1611 return self._path 

1612 

1613 async def _create_connection( 

1614 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1615 ) -> ResponseHandler: 

1616 try: 

1617 async with ceil_timeout( 

1618 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1619 ): 

1620 _, proto = await self._loop.create_unix_connection( 

1621 self._factory, self._path 

1622 ) 

1623 except OSError as exc: 

1624 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1625 raise 

1626 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1627 

1628 return proto 

1629 

1630 

1631class NamedPipeConnector(BaseConnector): 

1632 """Named pipe connector. 

1633 

1634 Only supported by the proactor event loop. 

1635 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1636 

1637 path - Windows named pipe path. 

1638 keepalive_timeout - (optional) Keep-alive timeout. 

1639 force_close - Set to True to force close and do reconnect 

1640 after each request (and between redirects). 

1641 limit - The total number of simultaneous connections. 

1642 limit_per_host - Number of simultaneous connections to one host. 

1643 loop - Optional event loop. 

1644 """ 

1645 

1646 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"}) 

1647 

1648 def __init__( 

1649 self, 

1650 path: str, 

1651 force_close: bool = False, 

1652 keepalive_timeout: _SENTINEL | float | None = sentinel, 

1653 limit: int = 100, 

1654 limit_per_host: int = 0, 

1655 ) -> None: 

1656 super().__init__( 

1657 force_close=force_close, 

1658 keepalive_timeout=keepalive_timeout, 

1659 limit=limit, 

1660 limit_per_host=limit_per_host, 

1661 ) 

1662 if not isinstance( 

1663 self._loop, 

1664 asyncio.ProactorEventLoop, # type: ignore[attr-defined] 

1665 ): 

1666 raise RuntimeError( 

1667 "Named Pipes only available in proactor loop under windows" 

1668 ) 

1669 self._path = path 

1670 

1671 @property 

1672 def path(self) -> str: 

1673 """Path to the named pipe.""" 

1674 return self._path 

1675 

1676 async def _create_connection( 

1677 self, req: ClientRequest, traces: list["Trace"], timeout: "ClientTimeout" 

1678 ) -> ResponseHandler: 

1679 try: 

1680 async with ceil_timeout( 

1681 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1682 ): 

1683 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1684 self._factory, self._path 

1685 ) 

1686 # the drain is required so that the connection_made is called 

1687 # and transport is set otherwise it is not set before the 

1688 # `assert conn.transport is not None` 

1689 # in client.py's _request method 

1690 await asyncio.sleep(0) 

1691 # other option is to manually set transport like 

1692 # `proto.transport = trans` 

1693 except OSError as exc: 

1694 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1695 raise 

1696 raise ClientConnectorError(req.connection_key, exc) from exc 

1697 

1698 return cast(ResponseHandler, proto)