Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

847 statements  

1import asyncio 

2import functools 

3import random 

4import socket 

5import sys 

6import traceback 

7import warnings 

8from collections import OrderedDict, defaultdict, deque 

9from contextlib import suppress 

10from http import HTTPStatus 

11from itertools import chain, cycle, islice 

12from time import monotonic 

13from types import TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Awaitable, 

18 Callable, 

19 DefaultDict, 

20 Deque, 

21 Dict, 

22 Iterator, 

23 List, 

24 Literal, 

25 Optional, 

26 Sequence, 

27 Set, 

28 Tuple, 

29 Type, 

30 Union, 

31 cast, 

32) 

33 

34import aiohappyeyeballs 

35from aiohappyeyeballs import AddrInfoType, SocketFactoryType 

36 

37from . import hdrs, helpers 

38from .abc import AbstractResolver, ResolveResult 

39from .client_exceptions import ( 

40 ClientConnectionError, 

41 ClientConnectorCertificateError, 

42 ClientConnectorDNSError, 

43 ClientConnectorError, 

44 ClientConnectorSSLError, 

45 ClientHttpProxyError, 

46 ClientProxyConnectionError, 

47 ServerFingerprintMismatch, 

48 UnixClientConnectorError, 

49 cert_errors, 

50 ssl_errors, 

51) 

52from .client_proto import ResponseHandler 

53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params 

54from .helpers import ( 

55 _SENTINEL, 

56 ceil_timeout, 

57 is_ip_address, 

58 noop, 

59 sentinel, 

60 set_exception, 

61 set_result, 

62) 

63from .log import client_logger 

64from .resolver import DefaultResolver 

65 

66if sys.version_info >= (3, 12): 

67 from collections.abc import Buffer 

68else: 

69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

70 

71if TYPE_CHECKING: 

72 import ssl 

73 

74 SSLContext = ssl.SSLContext 

75else: 

76 try: 

77 import ssl 

78 

79 SSLContext = ssl.SSLContext 

80 except ImportError: # pragma: no cover 

81 ssl = None # type: ignore[assignment] 

82 SSLContext = object # type: ignore[misc,assignment] 

83 

84EMPTY_SCHEMA_SET = frozenset({""}) 

85HTTP_SCHEMA_SET = frozenset({"http", "https"}) 

86WS_SCHEMA_SET = frozenset({"ws", "wss"}) 

87 

88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET 

89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET 

90 

91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < ( 

92 3, 

93 13, 

94 1, 

95) or sys.version_info < (3, 12, 8) 

96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 

97# which first appeared in Python 3.12.8 and 3.13.1 

98 

99 

100__all__ = ( 

101 "BaseConnector", 

102 "TCPConnector", 

103 "UnixConnector", 

104 "NamedPipeConnector", 

105 "AddrInfoType", 

106 "SocketFactoryType", 

107) 

108 

109 

110if TYPE_CHECKING: 

111 from .client import ClientTimeout 

112 from .client_reqrep import ConnectionKey 

113 from .tracing import Trace 

114 

115 

116class _DeprecationWaiter: 

117 __slots__ = ("_awaitable", "_awaited") 

118 

119 def __init__(self, awaitable: Awaitable[Any]) -> None: 

120 self._awaitable = awaitable 

121 self._awaited = False 

122 

123 def __await__(self) -> Any: 

124 self._awaited = True 

125 return self._awaitable.__await__() 

126 

127 def __del__(self) -> None: 

128 if not self._awaited: 

129 warnings.warn( 

130 "Connector.close() is a coroutine, " 

131 "please use await connector.close()", 

132 DeprecationWarning, 

133 ) 

134 

135 

136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None: 

137 """Wait for all waiters to finish closing.""" 

138 results = await asyncio.gather(*waiters, return_exceptions=True) 

139 for res in results: 

140 if isinstance(res, Exception): 

141 client_logger.debug("Error while closing connector: %r", res) 

142 

143 

144class Connection: 

145 

146 _source_traceback = None 

147 

148 def __init__( 

149 self, 

150 connector: "BaseConnector", 

151 key: "ConnectionKey", 

152 protocol: ResponseHandler, 

153 loop: asyncio.AbstractEventLoop, 

154 ) -> None: 

155 self._key = key 

156 self._connector = connector 

157 self._loop = loop 

158 self._protocol: Optional[ResponseHandler] = protocol 

159 self._callbacks: List[Callable[[], None]] = [] 

160 

161 if loop.get_debug(): 

162 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

163 

164 def __repr__(self) -> str: 

165 return f"Connection<{self._key}>" 

166 

167 def __del__(self, _warnings: Any = warnings) -> None: 

168 if self._protocol is not None: 

169 kwargs = {"source": self} 

170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs) 

171 if self._loop.is_closed(): 

172 return 

173 

174 self._connector._release(self._key, self._protocol, should_close=True) 

175 

176 context = {"client_connection": self, "message": "Unclosed connection"} 

177 if self._source_traceback is not None: 

178 context["source_traceback"] = self._source_traceback 

179 self._loop.call_exception_handler(context) 

180 

181 def __bool__(self) -> Literal[True]: 

182 """Force subclasses to not be falsy, to make checks simpler.""" 

183 return True 

184 

185 @property 

186 def loop(self) -> asyncio.AbstractEventLoop: 

187 warnings.warn( 

188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2 

189 ) 

190 return self._loop 

191 

192 @property 

193 def transport(self) -> Optional[asyncio.Transport]: 

194 if self._protocol is None: 

195 return None 

196 return self._protocol.transport 

197 

198 @property 

199 def protocol(self) -> Optional[ResponseHandler]: 

200 return self._protocol 

201 

202 def add_callback(self, callback: Callable[[], None]) -> None: 

203 if callback is not None: 

204 self._callbacks.append(callback) 

205 

206 def _notify_release(self) -> None: 

207 callbacks, self._callbacks = self._callbacks[:], [] 

208 

209 for cb in callbacks: 

210 with suppress(Exception): 

211 cb() 

212 

213 def close(self) -> None: 

214 self._notify_release() 

215 

216 if self._protocol is not None: 

217 self._connector._release(self._key, self._protocol, should_close=True) 

218 self._protocol = None 

219 

220 def release(self) -> None: 

221 self._notify_release() 

222 

223 if self._protocol is not None: 

224 self._connector._release(self._key, self._protocol) 

225 self._protocol = None 

226 

227 @property 

228 def closed(self) -> bool: 

229 return self._protocol is None or not self._protocol.is_connected() 

230 

231 

232class _ConnectTunnelConnection(Connection): 

233 """Special connection wrapper for CONNECT tunnels that must never be pooled. 

234 

235 This connection wraps the proxy connection that will be upgraded with TLS. 

236 It must never be released to the pool because: 

237 1. Its 'closed' future will never complete, causing session.close() to hang 

238 2. It represents an intermediate state, not a reusable connection 

239 3. The real connection (with TLS) will be created separately 

240 """ 

241 

242 def release(self) -> None: 

243 """Do nothing - don't pool or close the connection. 

244 

245 These connections are an intermediate state during the CONNECT tunnel 

246 setup and will be cleaned up naturally after the TLS upgrade. If they 

247 were to be pooled, they would never be properly closed, causing 

248 session.close() to wait forever for their 'closed' future. 

249 """ 

250 

251 

252class _TransportPlaceholder: 

253 """placeholder for BaseConnector.connect function""" 

254 

255 __slots__ = ("closed", "transport") 

256 

257 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None: 

258 """Initialize a placeholder for a transport.""" 

259 self.closed = closed_future 

260 self.transport = None 

261 

262 def close(self) -> None: 

263 """Close the placeholder.""" 

264 

265 def abort(self) -> None: 

266 """Abort the placeholder (does nothing).""" 

267 

268 

269class BaseConnector: 

270 """Base connector class. 

271 

272 keepalive_timeout - (optional) Keep-alive timeout. 

273 force_close - Set to True to force close and do reconnect 

274 after each request (and between redirects). 

275 limit - The total number of simultaneous connections. 

276 limit_per_host - Number of simultaneous connections to one host. 

277 enable_cleanup_closed - Enables clean-up closed ssl transports. 

278 Disabled by default. 

279 timeout_ceil_threshold - Trigger ceiling of timeout values when 

280 it's above timeout_ceil_threshold. 

281 loop - Optional event loop. 

282 """ 

283 

284 _closed = True # prevent AttributeError in __del__ if ctor was failed 

285 _source_traceback = None 

286 

287 # abort transport after 2 seconds (cleanup broken connections) 

288 _cleanup_closed_period = 2.0 

289 

290 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET 

291 

292 def __init__( 

293 self, 

294 *, 

295 keepalive_timeout: Union[object, None, float] = sentinel, 

296 force_close: bool = False, 

297 limit: int = 100, 

298 limit_per_host: int = 0, 

299 enable_cleanup_closed: bool = False, 

300 loop: Optional[asyncio.AbstractEventLoop] = None, 

301 timeout_ceil_threshold: float = 5, 

302 ) -> None: 

303 

304 if force_close: 

305 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

306 raise ValueError( 

307 "keepalive_timeout cannot be set if force_close is True" 

308 ) 

309 else: 

310 if keepalive_timeout is sentinel: 

311 keepalive_timeout = 15.0 

312 

313 loop = loop or asyncio.get_running_loop() 

314 self._timeout_ceil_threshold = timeout_ceil_threshold 

315 

316 self._closed = False 

317 if loop.get_debug(): 

318 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

319 

320 # Connection pool of reusable connections. 

321 # We use a deque to store connections because it has O(1) popleft() 

322 # and O(1) append() operations to implement a FIFO queue. 

323 self._conns: DefaultDict[ 

324 ConnectionKey, Deque[Tuple[ResponseHandler, float]] 

325 ] = defaultdict(deque) 

326 self._limit = limit 

327 self._limit_per_host = limit_per_host 

328 self._acquired: Set[ResponseHandler] = set() 

329 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = ( 

330 defaultdict(set) 

331 ) 

332 self._keepalive_timeout = cast(float, keepalive_timeout) 

333 self._force_close = force_close 

334 

335 # {host_key: FIFO list of waiters} 

336 # The FIFO is implemented with an OrderedDict with None keys because 

337 # python does not have an ordered set. 

338 self._waiters: DefaultDict[ 

339 ConnectionKey, OrderedDict[asyncio.Future[None], None] 

340 ] = defaultdict(OrderedDict) 

341 

342 self._loop = loop 

343 self._factory = functools.partial(ResponseHandler, loop=loop) 

344 

345 # start keep-alive connection cleanup task 

346 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

347 

348 # start cleanup closed transports task 

349 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

350 

351 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED: 

352 warnings.warn( 

353 "enable_cleanup_closed ignored because " 

354 "https://github.com/python/cpython/pull/118960 is fixed " 

355 f"in Python version {sys.version_info}", 

356 DeprecationWarning, 

357 stacklevel=2, 

358 ) 

359 enable_cleanup_closed = False 

360 

361 self._cleanup_closed_disabled = not enable_cleanup_closed 

362 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

363 self._placeholder_future: asyncio.Future[Optional[Exception]] = ( 

364 loop.create_future() 

365 ) 

366 self._placeholder_future.set_result(None) 

367 self._cleanup_closed() 

368 

369 def __del__(self, _warnings: Any = warnings) -> None: 

370 if self._closed: 

371 return 

372 if not self._conns: 

373 return 

374 

375 conns = [repr(c) for c in self._conns.values()] 

376 

377 self._close() 

378 

379 kwargs = {"source": self} 

380 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs) 

381 context = { 

382 "connector": self, 

383 "connections": conns, 

384 "message": "Unclosed connector", 

385 } 

386 if self._source_traceback is not None: 

387 context["source_traceback"] = self._source_traceback 

388 self._loop.call_exception_handler(context) 

389 

390 def __enter__(self) -> "BaseConnector": 

391 warnings.warn( 

392 '"with Connector():" is deprecated, ' 

393 'use "async with Connector():" instead', 

394 DeprecationWarning, 

395 ) 

396 return self 

397 

398 def __exit__(self, *exc: Any) -> None: 

399 self._close() 

400 

401 async def __aenter__(self) -> "BaseConnector": 

402 return self 

403 

404 async def __aexit__( 

405 self, 

406 exc_type: Optional[Type[BaseException]] = None, 

407 exc_value: Optional[BaseException] = None, 

408 exc_traceback: Optional[TracebackType] = None, 

409 ) -> None: 

410 await self.close() 

411 

412 @property 

413 def force_close(self) -> bool: 

414 """Ultimately close connection on releasing if True.""" 

415 return self._force_close 

416 

417 @property 

418 def limit(self) -> int: 

419 """The total number for simultaneous connections. 

420 

421 If limit is 0 the connector has no limit. 

422 The default limit size is 100. 

423 """ 

424 return self._limit 

425 

426 @property 

427 def limit_per_host(self) -> int: 

428 """The limit for simultaneous connections to the same endpoint. 

429 

430 Endpoints are the same if they are have equal 

431 (host, port, is_ssl) triple. 

432 """ 

433 return self._limit_per_host 

434 

435 def _cleanup(self) -> None: 

436 """Cleanup unused transports.""" 

437 if self._cleanup_handle: 

438 self._cleanup_handle.cancel() 

439 # _cleanup_handle should be unset, otherwise _release() will not 

440 # recreate it ever! 

441 self._cleanup_handle = None 

442 

443 now = monotonic() 

444 timeout = self._keepalive_timeout 

445 

446 if self._conns: 

447 connections = defaultdict(deque) 

448 deadline = now - timeout 

449 for key, conns in self._conns.items(): 

450 alive: Deque[Tuple[ResponseHandler, float]] = deque() 

451 for proto, use_time in conns: 

452 if proto.is_connected() and use_time - deadline >= 0: 

453 alive.append((proto, use_time)) 

454 continue 

455 transport = proto.transport 

456 proto.close() 

457 if not self._cleanup_closed_disabled and key.is_ssl: 

458 self._cleanup_closed_transports.append(transport) 

459 

460 if alive: 

461 connections[key] = alive 

462 

463 self._conns = connections 

464 

465 if self._conns: 

466 self._cleanup_handle = helpers.weakref_handle( 

467 self, 

468 "_cleanup", 

469 timeout, 

470 self._loop, 

471 timeout_ceil_threshold=self._timeout_ceil_threshold, 

472 ) 

473 

474 def _cleanup_closed(self) -> None: 

475 """Double confirmation for transport close. 

476 

477 Some broken ssl servers may leave socket open without proper close. 

478 """ 

479 if self._cleanup_closed_handle: 

480 self._cleanup_closed_handle.cancel() 

481 

482 for transport in self._cleanup_closed_transports: 

483 if transport is not None: 

484 transport.abort() 

485 

486 self._cleanup_closed_transports = [] 

487 

488 if not self._cleanup_closed_disabled: 

489 self._cleanup_closed_handle = helpers.weakref_handle( 

490 self, 

491 "_cleanup_closed", 

492 self._cleanup_closed_period, 

493 self._loop, 

494 timeout_ceil_threshold=self._timeout_ceil_threshold, 

495 ) 

496 

497 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]: 

498 """Close all opened transports. 

499 

500 :param abort_ssl: If True, SSL connections will be aborted immediately 

501 without performing the shutdown handshake. This provides 

502 faster cleanup at the cost of less graceful disconnection. 

503 """ 

504 if not (waiters := self._close(abort_ssl=abort_ssl)): 

505 # If there are no connections to close, we can return a noop 

506 # awaitable to avoid scheduling a task on the event loop. 

507 return _DeprecationWaiter(noop()) 

508 coro = _wait_for_close(waiters) 

509 if sys.version_info >= (3, 12): 

510 # Optimization for Python 3.12, try to close connections 

511 # immediately to avoid having to schedule the task on the event loop. 

512 task = asyncio.Task(coro, loop=self._loop, eager_start=True) 

513 else: 

514 task = self._loop.create_task(coro) 

515 return _DeprecationWaiter(task) 

516 

517 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

518 waiters: List[Awaitable[object]] = [] 

519 

520 if self._closed: 

521 return waiters 

522 

523 self._closed = True 

524 

525 try: 

526 if self._loop.is_closed(): 

527 return waiters 

528 

529 # cancel cleanup task 

530 if self._cleanup_handle: 

531 self._cleanup_handle.cancel() 

532 

533 # cancel cleanup close task 

534 if self._cleanup_closed_handle: 

535 self._cleanup_closed_handle.cancel() 

536 

537 for data in self._conns.values(): 

538 for proto, _ in data: 

539 if ( 

540 abort_ssl 

541 and proto.transport 

542 and proto.transport.get_extra_info("sslcontext") is not None 

543 ): 

544 proto.abort() 

545 else: 

546 proto.close() 

547 if closed := proto.closed: 

548 waiters.append(closed) 

549 

550 for proto in self._acquired: 

551 if ( 

552 abort_ssl 

553 and proto.transport 

554 and proto.transport.get_extra_info("sslcontext") is not None 

555 ): 

556 proto.abort() 

557 else: 

558 proto.close() 

559 if closed := proto.closed: 

560 waiters.append(closed) 

561 

562 for transport in self._cleanup_closed_transports: 

563 if transport is not None: 

564 transport.abort() 

565 

566 return waiters 

567 

568 finally: 

569 self._conns.clear() 

570 self._acquired.clear() 

571 for keyed_waiters in self._waiters.values(): 

572 for keyed_waiter in keyed_waiters: 

573 keyed_waiter.cancel() 

574 self._waiters.clear() 

575 self._cleanup_handle = None 

576 self._cleanup_closed_transports.clear() 

577 self._cleanup_closed_handle = None 

578 

579 @property 

580 def closed(self) -> bool: 

581 """Is connector closed. 

582 

583 A readonly property. 

584 """ 

585 return self._closed 

586 

587 def _available_connections(self, key: "ConnectionKey") -> int: 

588 """ 

589 Return number of available connections. 

590 

591 The limit, limit_per_host and the connection key are taken into account. 

592 

593 If it returns less than 1 means that there are no connections 

594 available. 

595 """ 

596 # check total available connections 

597 # If there are no limits, this will always return 1 

598 total_remain = 1 

599 

600 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: 

601 return total_remain 

602 

603 # check limit per host 

604 if host_remain := self._limit_per_host: 

605 if acquired := self._acquired_per_host.get(key): 

606 host_remain -= len(acquired) 

607 if total_remain > host_remain: 

608 return host_remain 

609 

610 return total_remain 

611 

612 def _update_proxy_auth_header_and_build_proxy_req( 

613 self, req: ClientRequest 

614 ) -> ClientRequest: 

615 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests.""" 

616 url = req.proxy 

617 assert url is not None 

618 headers: Dict[str, str] = {} 

619 if req.proxy_headers is not None: 

620 headers = req.proxy_headers # type: ignore[assignment] 

621 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

622 proxy_req = ClientRequest( 

623 hdrs.METH_GET, 

624 url, 

625 headers=headers, 

626 auth=req.proxy_auth, 

627 loop=self._loop, 

628 ssl=req.ssl, 

629 ) 

630 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

631 if auth is not None: 

632 if not req.is_ssl(): 

633 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

634 else: 

635 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

636 return proxy_req 

637 

638 async def connect( 

639 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

640 ) -> Connection: 

641 """Get from pool or create new connection.""" 

642 key = req.connection_key 

643 if (conn := await self._get(key, traces)) is not None: 

644 # If we do not have to wait and we can get a connection from the pool 

645 # we can avoid the timeout ceil logic and directly return the connection 

646 if req.proxy: 

647 self._update_proxy_auth_header_and_build_proxy_req(req) 

648 return conn 

649 

650 async with ceil_timeout(timeout.connect, timeout.ceil_threshold): 

651 if self._available_connections(key) <= 0: 

652 await self._wait_for_available_connection(key, traces) 

653 if (conn := await self._get(key, traces)) is not None: 

654 if req.proxy: 

655 self._update_proxy_auth_header_and_build_proxy_req(req) 

656 return conn 

657 

658 placeholder = cast( 

659 ResponseHandler, _TransportPlaceholder(self._placeholder_future) 

660 ) 

661 self._acquired.add(placeholder) 

662 if self._limit_per_host: 

663 self._acquired_per_host[key].add(placeholder) 

664 

665 try: 

666 # Traces are done inside the try block to ensure that the 

667 # that the placeholder is still cleaned up if an exception 

668 # is raised. 

669 if traces: 

670 for trace in traces: 

671 await trace.send_connection_create_start() 

672 proto = await self._create_connection(req, traces, timeout) 

673 if traces: 

674 for trace in traces: 

675 await trace.send_connection_create_end() 

676 except BaseException: 

677 self._release_acquired(key, placeholder) 

678 raise 

679 else: 

680 if self._closed: 

681 proto.close() 

682 raise ClientConnectionError("Connector is closed.") 

683 

684 # The connection was successfully created, drop the placeholder 

685 # and add the real connection to the acquired set. There should 

686 # be no awaits after the proto is added to the acquired set 

687 # to ensure that the connection is not left in the acquired set 

688 # on cancellation. 

689 self._acquired.remove(placeholder) 

690 self._acquired.add(proto) 

691 if self._limit_per_host: 

692 acquired_per_host = self._acquired_per_host[key] 

693 acquired_per_host.remove(placeholder) 

694 acquired_per_host.add(proto) 

695 return Connection(self, key, proto, self._loop) 

696 

697 async def _wait_for_available_connection( 

698 self, key: "ConnectionKey", traces: List["Trace"] 

699 ) -> None: 

700 """Wait for an available connection slot.""" 

701 # We loop here because there is a race between 

702 # the connection limit check and the connection 

703 # being acquired. If the connection is acquired 

704 # between the check and the await statement, we 

705 # need to loop again to check if the connection 

706 # slot is still available. 

707 attempts = 0 

708 while True: 

709 fut: asyncio.Future[None] = self._loop.create_future() 

710 keyed_waiters = self._waiters[key] 

711 keyed_waiters[fut] = None 

712 if attempts: 

713 # If we have waited before, we need to move the waiter 

714 # to the front of the queue as otherwise we might get 

715 # starved and hit the timeout. 

716 keyed_waiters.move_to_end(fut, last=False) 

717 

718 try: 

719 # Traces happen in the try block to ensure that the 

720 # the waiter is still cleaned up if an exception is raised. 

721 if traces: 

722 for trace in traces: 

723 await trace.send_connection_queued_start() 

724 await fut 

725 if traces: 

726 for trace in traces: 

727 await trace.send_connection_queued_end() 

728 finally: 

729 # pop the waiter from the queue if its still 

730 # there and not already removed by _release_waiter 

731 keyed_waiters.pop(fut, None) 

732 if not self._waiters.get(key, True): 

733 del self._waiters[key] 

734 

735 if self._available_connections(key) > 0: 

736 break 

737 attempts += 1 

738 

739 async def _get( 

740 self, key: "ConnectionKey", traces: List["Trace"] 

741 ) -> Optional[Connection]: 

742 """Get next reusable connection for the key or None. 

743 

744 The connection will be marked as acquired. 

745 """ 

746 if (conns := self._conns.get(key)) is None: 

747 return None 

748 

749 t1 = monotonic() 

750 while conns: 

751 proto, t0 = conns.popleft() 

752 # We will we reuse the connection if its connected and 

753 # the keepalive timeout has not been exceeded 

754 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: 

755 if not conns: 

756 # The very last connection was reclaimed: drop the key 

757 del self._conns[key] 

758 self._acquired.add(proto) 

759 if self._limit_per_host: 

760 self._acquired_per_host[key].add(proto) 

761 if traces: 

762 for trace in traces: 

763 try: 

764 await trace.send_connection_reuseconn() 

765 except BaseException: 

766 self._release_acquired(key, proto) 

767 raise 

768 return Connection(self, key, proto, self._loop) 

769 

770 # Connection cannot be reused, close it 

771 transport = proto.transport 

772 proto.close() 

773 # only for SSL transports 

774 if not self._cleanup_closed_disabled and key.is_ssl: 

775 self._cleanup_closed_transports.append(transport) 

776 

777 # No more connections: drop the key 

778 del self._conns[key] 

779 return None 

780 

781 def _release_waiter(self) -> None: 

782 """ 

783 Iterates over all waiters until one to be released is found. 

784 

785 The one to be released is not finished and 

786 belongs to a host that has available connections. 

787 """ 

788 if not self._waiters: 

789 return 

790 

791 # Having the dict keys ordered this avoids to iterate 

792 # at the same order at each call. 

793 queues = list(self._waiters) 

794 random.shuffle(queues) 

795 

796 for key in queues: 

797 if self._available_connections(key) < 1: 

798 continue 

799 

800 waiters = self._waiters[key] 

801 while waiters: 

802 waiter, _ = waiters.popitem(last=False) 

803 if not waiter.done(): 

804 waiter.set_result(None) 

805 return 

806 

807 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

808 """Release acquired connection.""" 

809 if self._closed: 

810 # acquired connection is already released on connector closing 

811 return 

812 

813 self._acquired.discard(proto) 

814 if self._limit_per_host and (conns := self._acquired_per_host.get(key)): 

815 conns.discard(proto) 

816 if not conns: 

817 del self._acquired_per_host[key] 

818 self._release_waiter() 

819 

820 def _release( 

821 self, 

822 key: "ConnectionKey", 

823 protocol: ResponseHandler, 

824 *, 

825 should_close: bool = False, 

826 ) -> None: 

827 if self._closed: 

828 # acquired connection is already released on connector closing 

829 return 

830 

831 self._release_acquired(key, protocol) 

832 

833 if self._force_close or should_close or protocol.should_close: 

834 transport = protocol.transport 

835 protocol.close() 

836 

837 if key.is_ssl and not self._cleanup_closed_disabled: 

838 self._cleanup_closed_transports.append(transport) 

839 return 

840 

841 self._conns[key].append((protocol, monotonic())) 

842 

843 if self._cleanup_handle is None: 

844 self._cleanup_handle = helpers.weakref_handle( 

845 self, 

846 "_cleanup", 

847 self._keepalive_timeout, 

848 self._loop, 

849 timeout_ceil_threshold=self._timeout_ceil_threshold, 

850 ) 

851 

852 async def _create_connection( 

853 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

854 ) -> ResponseHandler: 

855 raise NotImplementedError() 

856 

857 

858class _DNSCacheTable: 

859 def __init__(self, ttl: Optional[float] = None, max_size: int = 1000) -> None: 

860 self._addrs_rr: OrderedDict[ 

861 Tuple[str, int], Tuple[Iterator[ResolveResult], int] 

862 ] = OrderedDict() 

863 self._timestamps: Dict[Tuple[str, int], float] = {} 

864 self._ttl = ttl 

865 self._max_size = max_size 

866 

867 def __contains__(self, host: object) -> bool: 

868 return host in self._addrs_rr 

869 

870 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None: 

871 if key in self._addrs_rr: 

872 self._addrs_rr.move_to_end(key) 

873 

874 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

875 

876 if self._ttl is not None: 

877 self._timestamps[key] = monotonic() 

878 

879 if len(self._addrs_rr) > self._max_size: 

880 oldest_key, _ = self._addrs_rr.popitem(last=False) 

881 self._timestamps.pop(oldest_key, None) 

882 

883 def remove(self, key: Tuple[str, int]) -> None: 

884 self._addrs_rr.pop(key, None) 

885 self._timestamps.pop(key, None) 

886 

887 def clear(self) -> None: 

888 self._addrs_rr.clear() 

889 self._timestamps.clear() 

890 

891 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]: 

892 loop, length = self._addrs_rr[key] 

893 addrs = list(islice(loop, length)) 

894 # Consume one more element to shift internal state of `cycle` 

895 next(loop) 

896 self._addrs_rr.move_to_end(key) 

897 return addrs 

898 

899 def expired(self, key: Tuple[str, int]) -> bool: 

900 if self._ttl is None: 

901 return False 

902 

903 return self._timestamps[key] + self._ttl < monotonic() 

904 

905 

906def _make_ssl_context(verified: bool) -> SSLContext: 

907 """Create SSL context. 

908 

909 This method is not async-friendly and should be called from a thread 

910 because it will load certificates from disk and do other blocking I/O. 

911 """ 

912 if ssl is None: 

913 # No ssl support 

914 return None 

915 if verified: 

916 sslcontext = ssl.create_default_context() 

917 else: 

918 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

919 sslcontext.options |= ssl.OP_NO_SSLv2 

920 sslcontext.options |= ssl.OP_NO_SSLv3 

921 sslcontext.check_hostname = False 

922 sslcontext.verify_mode = ssl.CERT_NONE 

923 sslcontext.options |= ssl.OP_NO_COMPRESSION 

924 sslcontext.set_default_verify_paths() 

925 sslcontext.set_alpn_protocols(("http/1.1",)) 

926 return sslcontext 

927 

928 

929# The default SSLContext objects are created at import time 

930# since they do blocking I/O to load certificates from disk, 

931# and imports should always be done before the event loop starts 

932# or in a thread. 

933_SSL_CONTEXT_VERIFIED = _make_ssl_context(True) 

934_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False) 

935 

936 

937class TCPConnector(BaseConnector): 

938 """TCP connector. 

939 

940 verify_ssl - Set to True to check ssl certifications. 

941 fingerprint - Pass the binary sha256 

942 digest of the expected certificate in DER format to verify 

943 that the certificate the server presents matches. See also 

944 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning 

945 resolver - Enable DNS lookups and use this 

946 resolver 

947 use_dns_cache - Use memory cache for DNS lookups. 

948 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

949 family - socket address family 

950 local_addr - local tuple of (host, port) to bind socket to 

951 

952 keepalive_timeout - (optional) Keep-alive timeout. 

953 force_close - Set to True to force close and do reconnect 

954 after each request (and between redirects). 

955 limit - The total number of simultaneous connections. 

956 limit_per_host - Number of simultaneous connections to one host. 

957 enable_cleanup_closed - Enables clean-up closed ssl transports. 

958 Disabled by default. 

959 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

960 as defined in RFC 8305. To disable 

961 the happy eyeballs algorithm, set to None. 

962 interleave - “First Address Family Count” as defined in RFC 8305 

963 loop - Optional event loop. 

964 socket_factory - A SocketFactoryType function that, if supplied, 

965 will be used to create sockets given an 

966 AddrInfoType. 

967 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0. 

968 Grace period for SSL shutdown handshake on TLS 

969 connections. Default is 0 seconds (immediate abort). 

970 This parameter allowed for a clean SSL shutdown by 

971 notifying the remote peer of connection closure, 

972 while avoiding excessive delays during connector cleanup. 

973 Note: Only takes effect on Python 3.11+. 

974 """ 

975 

976 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"}) 

977 

978 def __init__( 

979 self, 

980 *, 

981 verify_ssl: bool = True, 

982 fingerprint: Optional[bytes] = None, 

983 use_dns_cache: bool = True, 

984 ttl_dns_cache: Optional[int] = 10, 

985 dns_cache_max_size: int = 1000, 

986 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC, 

987 ssl_context: Optional[SSLContext] = None, 

988 ssl: Union[bool, Fingerprint, SSLContext] = True, 

989 local_addr: Optional[Tuple[str, int]] = None, 

990 resolver: Optional[AbstractResolver] = None, 

991 keepalive_timeout: Union[None, float, object] = sentinel, 

992 force_close: bool = False, 

993 limit: int = 100, 

994 limit_per_host: int = 0, 

995 enable_cleanup_closed: bool = False, 

996 loop: Optional[asyncio.AbstractEventLoop] = None, 

997 timeout_ceil_threshold: float = 5, 

998 happy_eyeballs_delay: Optional[float] = 0.25, 

999 interleave: Optional[int] = None, 

1000 socket_factory: Optional[SocketFactoryType] = None, 

1001 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel, 

1002 ): 

1003 super().__init__( 

1004 keepalive_timeout=keepalive_timeout, 

1005 force_close=force_close, 

1006 limit=limit, 

1007 limit_per_host=limit_per_host, 

1008 enable_cleanup_closed=enable_cleanup_closed, 

1009 loop=loop, 

1010 timeout_ceil_threshold=timeout_ceil_threshold, 

1011 ) 

1012 

1013 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

1014 

1015 self._resolver: AbstractResolver 

1016 if resolver is None: 

1017 self._resolver = DefaultResolver(loop=self._loop) 

1018 self._resolver_owner = True 

1019 else: 

1020 self._resolver = resolver 

1021 self._resolver_owner = False 

1022 

1023 self._use_dns_cache = use_dns_cache 

1024 self._cached_hosts = _DNSCacheTable( 

1025 ttl=ttl_dns_cache, max_size=dns_cache_max_size 

1026 ) 

1027 self._throttle_dns_futures: Dict[ 

1028 Tuple[str, int], Set["asyncio.Future[None]"] 

1029 ] = {} 

1030 self._family = family 

1031 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

1032 self._happy_eyeballs_delay = happy_eyeballs_delay 

1033 self._interleave = interleave 

1034 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set() 

1035 self._socket_factory = socket_factory 

1036 self._ssl_shutdown_timeout: Optional[float] 

1037 # Handle ssl_shutdown_timeout with warning for Python < 3.11 

1038 if ssl_shutdown_timeout is sentinel: 

1039 self._ssl_shutdown_timeout = 0 

1040 else: 

1041 # Deprecation warning for ssl_shutdown_timeout parameter 

1042 warnings.warn( 

1043 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0", 

1044 DeprecationWarning, 

1045 stacklevel=2, 

1046 ) 

1047 if ( 

1048 sys.version_info < (3, 11) 

1049 and ssl_shutdown_timeout is not None 

1050 and ssl_shutdown_timeout != 0 

1051 ): 

1052 warnings.warn( 

1053 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; " 

1054 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.", 

1055 RuntimeWarning, 

1056 stacklevel=2, 

1057 ) 

1058 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

1059 

1060 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

1061 """Close all ongoing DNS calls.""" 

1062 for fut in chain.from_iterable(self._throttle_dns_futures.values()): 

1063 fut.cancel() 

1064 

1065 waiters = super()._close(abort_ssl=abort_ssl) 

1066 

1067 for t in self._resolve_host_tasks: 

1068 t.cancel() 

1069 waiters.append(t) 

1070 

1071 return waiters 

1072 

1073 async def close(self, *, abort_ssl: bool = False) -> None: 

1074 """ 

1075 Close all opened transports. 

1076 

1077 :param abort_ssl: If True, SSL connections will be aborted immediately 

1078 without performing the shutdown handshake. If False (default), 

1079 the behavior is determined by ssl_shutdown_timeout: 

1080 - If ssl_shutdown_timeout=0: connections are aborted 

1081 - If ssl_shutdown_timeout>0: graceful shutdown is performed 

1082 """ 

1083 if self._resolver_owner: 

1084 await self._resolver.close() 

1085 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default 

1086 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0) 

1087 

1088 @property 

1089 def family(self) -> int: 

1090 """Socket family like AF_INET.""" 

1091 return self._family 

1092 

1093 @property 

1094 def use_dns_cache(self) -> bool: 

1095 """True if local DNS caching is enabled.""" 

1096 return self._use_dns_cache 

1097 

1098 def clear_dns_cache( 

1099 self, host: Optional[str] = None, port: Optional[int] = None 

1100 ) -> None: 

1101 """Remove specified host/port or clear all dns local cache.""" 

1102 if host is not None and port is not None: 

1103 self._cached_hosts.remove((host, port)) 

1104 elif host is not None or port is not None: 

1105 raise ValueError("either both host and port or none of them are allowed") 

1106 else: 

1107 self._cached_hosts.clear() 

1108 

1109 async def _resolve_host( 

1110 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None 

1111 ) -> List[ResolveResult]: 

1112 """Resolve host and return list of addresses.""" 

1113 if is_ip_address(host): 

1114 return [ 

1115 { 

1116 "hostname": host, 

1117 "host": host, 

1118 "port": port, 

1119 "family": self._family, 

1120 "proto": 0, 

1121 "flags": 0, 

1122 } 

1123 ] 

1124 

1125 if not self._use_dns_cache: 

1126 

1127 if traces: 

1128 for trace in traces: 

1129 await trace.send_dns_resolvehost_start(host) 

1130 

1131 res = await self._resolver.resolve(host, port, family=self._family) 

1132 

1133 if traces: 

1134 for trace in traces: 

1135 await trace.send_dns_resolvehost_end(host) 

1136 

1137 return res 

1138 

1139 key = (host, port) 

1140 if key in self._cached_hosts and not self._cached_hosts.expired(key): 

1141 # get result early, before any await (#4014) 

1142 result = self._cached_hosts.next_addrs(key) 

1143 

1144 if traces: 

1145 for trace in traces: 

1146 await trace.send_dns_cache_hit(host) 

1147 return result 

1148 

1149 futures: Set["asyncio.Future[None]"] 

1150 # 

1151 # If multiple connectors are resolving the same host, we wait 

1152 # for the first one to resolve and then use the result for all of them. 

1153 # We use a throttle to ensure that we only resolve the host once 

1154 # and then use the result for all the waiters. 

1155 # 

1156 if key in self._throttle_dns_futures: 

1157 # get futures early, before any await (#4014) 

1158 futures = self._throttle_dns_futures[key] 

1159 future: asyncio.Future[None] = self._loop.create_future() 

1160 futures.add(future) 

1161 if traces: 

1162 for trace in traces: 

1163 await trace.send_dns_cache_hit(host) 

1164 try: 

1165 await future 

1166 finally: 

1167 futures.discard(future) 

1168 return self._cached_hosts.next_addrs(key) 

1169 

1170 # update dict early, before any await (#4014) 

1171 self._throttle_dns_futures[key] = futures = set() 

1172 # In this case we need to create a task to ensure that we can shield 

1173 # the task from cancellation as cancelling this lookup should not cancel 

1174 # the underlying lookup or else the cancel event will get broadcast to 

1175 # all the waiters across all connections. 

1176 # 

1177 coro = self._resolve_host_with_throttle(key, host, port, futures, traces) 

1178 loop = asyncio.get_running_loop() 

1179 if sys.version_info >= (3, 12): 

1180 # Optimization for Python 3.12, try to send immediately 

1181 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) 

1182 else: 

1183 resolved_host_task = loop.create_task(coro) 

1184 

1185 if not resolved_host_task.done(): 

1186 self._resolve_host_tasks.add(resolved_host_task) 

1187 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard) 

1188 

1189 try: 

1190 return await asyncio.shield(resolved_host_task) 

1191 except asyncio.CancelledError: 

1192 

1193 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None: 

1194 with suppress(Exception, asyncio.CancelledError): 

1195 fut.result() 

1196 

1197 resolved_host_task.add_done_callback(drop_exception) 

1198 raise 

1199 

1200 async def _resolve_host_with_throttle( 

1201 self, 

1202 key: Tuple[str, int], 

1203 host: str, 

1204 port: int, 

1205 futures: Set["asyncio.Future[None]"], 

1206 traces: Optional[Sequence["Trace"]], 

1207 ) -> List[ResolveResult]: 

1208 """Resolve host and set result for all waiters. 

1209 

1210 This method must be run in a task and shielded from cancellation 

1211 to avoid cancelling the underlying lookup. 

1212 """ 

1213 try: 

1214 if traces: 

1215 for trace in traces: 

1216 await trace.send_dns_cache_miss(host) 

1217 

1218 for trace in traces: 

1219 await trace.send_dns_resolvehost_start(host) 

1220 

1221 addrs = await self._resolver.resolve(host, port, family=self._family) 

1222 if traces: 

1223 for trace in traces: 

1224 await trace.send_dns_resolvehost_end(host) 

1225 

1226 self._cached_hosts.add(key, addrs) 

1227 for fut in futures: 

1228 set_result(fut, None) 

1229 except BaseException as e: 

1230 # any DNS exception is set for the waiters to raise the same exception. 

1231 # This coro is always run in task that is shielded from cancellation so 

1232 # we should never be propagating cancellation here. 

1233 for fut in futures: 

1234 set_exception(fut, e) 

1235 raise 

1236 finally: 

1237 self._throttle_dns_futures.pop(key) 

1238 

1239 return self._cached_hosts.next_addrs(key) 

1240 

1241 async def _create_connection( 

1242 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1243 ) -> ResponseHandler: 

1244 """Create connection. 

1245 

1246 Has same keyword arguments as BaseEventLoop.create_connection. 

1247 """ 

1248 if req.proxy: 

1249 _, proto = await self._create_proxy_connection(req, traces, timeout) 

1250 else: 

1251 _, proto = await self._create_direct_connection(req, traces, timeout) 

1252 

1253 return proto 

1254 

1255 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]: 

1256 """Logic to get the correct SSL context 

1257 

1258 0. if req.ssl is false, return None 

1259 

1260 1. if ssl_context is specified in req, use it 

1261 2. if _ssl_context is specified in self, use it 

1262 3. otherwise: 

1263 1. if verify_ssl is not specified in req, use self.ssl_context 

1264 (will generate a default context according to self.verify_ssl) 

1265 2. if verify_ssl is True in req, generate a default SSL context 

1266 3. if verify_ssl is False in req, generate a SSL context that 

1267 won't verify 

1268 """ 

1269 if not req.is_ssl(): 

1270 return None 

1271 

1272 if ssl is None: # pragma: no cover 

1273 raise RuntimeError("SSL is not supported.") 

1274 sslcontext = req.ssl 

1275 if isinstance(sslcontext, ssl.SSLContext): 

1276 return sslcontext 

1277 if sslcontext is not True: 

1278 # not verified or fingerprinted 

1279 return _SSL_CONTEXT_UNVERIFIED 

1280 sslcontext = self._ssl 

1281 if isinstance(sslcontext, ssl.SSLContext): 

1282 return sslcontext 

1283 if sslcontext is not True: 

1284 # not verified or fingerprinted 

1285 return _SSL_CONTEXT_UNVERIFIED 

1286 return _SSL_CONTEXT_VERIFIED 

1287 

1288 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]: 

1289 ret = req.ssl 

1290 if isinstance(ret, Fingerprint): 

1291 return ret 

1292 ret = self._ssl 

1293 if isinstance(ret, Fingerprint): 

1294 return ret 

1295 return None 

1296 

1297 async def _wrap_create_connection( 

1298 self, 

1299 *args: Any, 

1300 addr_infos: List[AddrInfoType], 

1301 req: ClientRequest, 

1302 timeout: "ClientTimeout", 

1303 client_error: Type[Exception] = ClientConnectorError, 

1304 **kwargs: Any, 

1305 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1306 try: 

1307 async with ceil_timeout( 

1308 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1309 ): 

1310 sock = await aiohappyeyeballs.start_connection( 

1311 addr_infos=addr_infos, 

1312 local_addr_infos=self._local_addr_infos, 

1313 happy_eyeballs_delay=self._happy_eyeballs_delay, 

1314 interleave=self._interleave, 

1315 loop=self._loop, 

1316 socket_factory=self._socket_factory, 

1317 ) 

1318 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used 

1319 if ( 

1320 kwargs.get("ssl") 

1321 and self._ssl_shutdown_timeout 

1322 and sys.version_info >= (3, 11) 

1323 ): 

1324 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout 

1325 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

1326 except cert_errors as exc: 

1327 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1328 except ssl_errors as exc: 

1329 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1330 except OSError as exc: 

1331 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1332 raise 

1333 raise client_error(req.connection_key, exc) from exc 

1334 

1335 async def _wrap_existing_connection( 

1336 self, 

1337 *args: Any, 

1338 req: ClientRequest, 

1339 timeout: "ClientTimeout", 

1340 client_error: Type[Exception] = ClientConnectorError, 

1341 **kwargs: Any, 

1342 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1343 try: 

1344 async with ceil_timeout( 

1345 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1346 ): 

1347 return await self._loop.create_connection(*args, **kwargs) 

1348 except cert_errors as exc: 

1349 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1350 except ssl_errors as exc: 

1351 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1352 except OSError as exc: 

1353 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1354 raise 

1355 raise client_error(req.connection_key, exc) from exc 

1356 

1357 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None: 

1358 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``. 

1359 

1360 It is necessary for TLS-in-TLS so that it is possible to 

1361 send HTTPS queries through HTTPS proxies. 

1362 

1363 This doesn't affect regular HTTP requests, though. 

1364 """ 

1365 if not req.is_ssl(): 

1366 return 

1367 

1368 proxy_url = req.proxy 

1369 assert proxy_url is not None 

1370 if proxy_url.scheme != "https": 

1371 return 

1372 

1373 self._check_loop_for_start_tls() 

1374 

1375 def _check_loop_for_start_tls(self) -> None: 

1376 try: 

1377 self._loop.start_tls 

1378 except AttributeError as attr_exc: 

1379 raise RuntimeError( 

1380 "An HTTPS request is being sent through an HTTPS proxy. " 

1381 "This needs support for TLS in TLS but it is not implemented " 

1382 "in your runtime for the stdlib asyncio.\n\n" 

1383 "Please upgrade to Python 3.11 or higher. For more details, " 

1384 "please see:\n" 

1385 "* https://bugs.python.org/issue37179\n" 

1386 "* https://github.com/python/cpython/pull/28073\n" 

1387 "* https://docs.aiohttp.org/en/stable/" 

1388 "client_advanced.html#proxy-support\n" 

1389 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1390 ) from attr_exc 

1391 

1392 def _loop_supports_start_tls(self) -> bool: 

1393 try: 

1394 self._check_loop_for_start_tls() 

1395 except RuntimeError: 

1396 return False 

1397 else: 

1398 return True 

1399 

1400 def _warn_about_tls_in_tls( 

1401 self, 

1402 underlying_transport: asyncio.Transport, 

1403 req: ClientRequest, 

1404 ) -> None: 

1405 """Issue a warning if the requested URL has HTTPS scheme.""" 

1406 if req.request_info.url.scheme != "https": 

1407 return 

1408 

1409 # Check if uvloop is being used, which supports TLS in TLS, 

1410 # otherwise assume that asyncio's native transport is being used. 

1411 if type(underlying_transport).__module__.startswith("uvloop"): 

1412 return 

1413 

1414 # Support in asyncio was added in Python 3.11 (bpo-44011) 

1415 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr( 

1416 underlying_transport, 

1417 "_start_tls_compatible", 

1418 False, 

1419 ) 

1420 

1421 if asyncio_supports_tls_in_tls: 

1422 return 

1423 

1424 warnings.warn( 

1425 "An HTTPS request is being sent through an HTTPS proxy. " 

1426 "This support for TLS in TLS is known to be disabled " 

1427 "in the stdlib asyncio (Python <3.11). This is why you'll probably see " 

1428 "an error in the log below.\n\n" 

1429 "It is possible to enable it via monkeypatching. " 

1430 "For more details, see:\n" 

1431 "* https://bugs.python.org/issue37179\n" 

1432 "* https://github.com/python/cpython/pull/28073\n\n" 

1433 "You can temporarily patch this as follows:\n" 

1434 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1435 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1436 RuntimeWarning, 

1437 source=self, 

1438 # Why `4`? At least 3 of the calls in the stack originate 

1439 # from the methods in this class. 

1440 stacklevel=3, 

1441 ) 

1442 

1443 async def _start_tls_connection( 

1444 self, 

1445 underlying_transport: asyncio.Transport, 

1446 req: ClientRequest, 

1447 timeout: "ClientTimeout", 

1448 client_error: Type[Exception] = ClientConnectorError, 

1449 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1450 """Wrap the raw TCP transport with TLS.""" 

1451 tls_proto = self._factory() # Create a brand new proto for TLS 

1452 sslcontext = self._get_ssl_context(req) 

1453 if TYPE_CHECKING: 

1454 # _start_tls_connection is unreachable in the current code path 

1455 # if sslcontext is None. 

1456 assert sslcontext is not None 

1457 

1458 try: 

1459 async with ceil_timeout( 

1460 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1461 ): 

1462 try: 

1463 # ssl_shutdown_timeout is only available in Python 3.11+ 

1464 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout: 

1465 tls_transport = await self._loop.start_tls( 

1466 underlying_transport, 

1467 tls_proto, 

1468 sslcontext, 

1469 server_hostname=req.server_hostname or req.host, 

1470 ssl_handshake_timeout=timeout.total or None, 

1471 ssl_shutdown_timeout=self._ssl_shutdown_timeout, 

1472 ) 

1473 else: 

1474 tls_transport = await self._loop.start_tls( 

1475 underlying_transport, 

1476 tls_proto, 

1477 sslcontext, 

1478 server_hostname=req.server_hostname or req.host, 

1479 ssl_handshake_timeout=timeout.total or None, 

1480 ) 

1481 except BaseException: 

1482 # We need to close the underlying transport since 

1483 # `start_tls()` probably failed before it had a 

1484 # chance to do this: 

1485 if self._ssl_shutdown_timeout == 0: 

1486 underlying_transport.abort() 

1487 else: 

1488 underlying_transport.close() 

1489 raise 

1490 if isinstance(tls_transport, asyncio.Transport): 

1491 fingerprint = self._get_fingerprint(req) 

1492 if fingerprint: 

1493 try: 

1494 fingerprint.check(tls_transport) 

1495 except ServerFingerprintMismatch: 

1496 tls_transport.close() 

1497 if not self._cleanup_closed_disabled: 

1498 self._cleanup_closed_transports.append(tls_transport) 

1499 raise 

1500 except cert_errors as exc: 

1501 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1502 except ssl_errors as exc: 

1503 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1504 except OSError as exc: 

1505 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1506 raise 

1507 raise client_error(req.connection_key, exc) from exc 

1508 except TypeError as type_err: 

1509 # Example cause looks like this: 

1510 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1511 # object at 0x7f760615e460> is not supported by start_tls() 

1512 

1513 raise ClientConnectionError( 

1514 "Cannot initialize a TLS-in-TLS connection to host " 

1515 f"{req.host!s}:{req.port:d} through an underlying connection " 

1516 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1517 f"[{type_err!s}]" 

1518 ) from type_err 

1519 else: 

1520 if tls_transport is None: 

1521 msg = "Failed to start TLS (possibly caused by closing transport)" 

1522 raise client_error(req.connection_key, OSError(msg)) 

1523 tls_proto.connection_made( 

1524 tls_transport 

1525 ) # Kick the state machine of the new TLS protocol 

1526 

1527 return tls_transport, tls_proto 

1528 

1529 def _convert_hosts_to_addr_infos( 

1530 self, hosts: List[ResolveResult] 

1531 ) -> List[AddrInfoType]: 

1532 """Converts the list of hosts to a list of addr_infos. 

1533 

1534 The list of hosts is the result of a DNS lookup. The list of 

1535 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1536 """ 

1537 addr_infos: List[AddrInfoType] = [] 

1538 for hinfo in hosts: 

1539 host = hinfo["host"] 

1540 is_ipv6 = ":" in host 

1541 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1542 if self._family and self._family != family: 

1543 continue 

1544 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1545 addr_infos.append( 

1546 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1547 ) 

1548 return addr_infos 

1549 

1550 async def _create_direct_connection( 

1551 self, 

1552 req: ClientRequest, 

1553 traces: List["Trace"], 

1554 timeout: "ClientTimeout", 

1555 *, 

1556 client_error: Type[Exception] = ClientConnectorError, 

1557 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1558 sslcontext = self._get_ssl_context(req) 

1559 fingerprint = self._get_fingerprint(req) 

1560 

1561 host = req.url.raw_host 

1562 assert host is not None 

1563 # Replace multiple trailing dots with a single one. 

1564 # A trailing dot is only present for fully-qualified domain names. 

1565 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1566 if host.endswith(".."): 

1567 host = host.rstrip(".") + "." 

1568 port = req.port 

1569 assert port is not None 

1570 try: 

1571 # Cancelling this lookup should not cancel the underlying lookup 

1572 # or else the cancel event will get broadcast to all the waiters 

1573 # across all connections. 

1574 hosts = await self._resolve_host(host, port, traces=traces) 

1575 except OSError as exc: 

1576 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1577 raise 

1578 # in case of proxy it is not ClientProxyConnectionError 

1579 # it is problem of resolving proxy ip itself 

1580 raise ClientConnectorDNSError(req.connection_key, exc) from exc 

1581 

1582 last_exc: Optional[Exception] = None 

1583 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1584 while addr_infos: 

1585 # Strip trailing dots, certificates contain FQDN without dots. 

1586 # See https://github.com/aio-libs/aiohttp/issues/3636 

1587 server_hostname = ( 

1588 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1589 ) 

1590 

1591 try: 

1592 transp, proto = await self._wrap_create_connection( 

1593 self._factory, 

1594 timeout=timeout, 

1595 ssl=sslcontext, 

1596 addr_infos=addr_infos, 

1597 server_hostname=server_hostname, 

1598 req=req, 

1599 client_error=client_error, 

1600 ) 

1601 except (ClientConnectorError, asyncio.TimeoutError) as exc: 

1602 last_exc = exc 

1603 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1604 continue 

1605 

1606 if req.is_ssl() and fingerprint: 

1607 try: 

1608 fingerprint.check(transp) 

1609 except ServerFingerprintMismatch as exc: 

1610 transp.close() 

1611 if not self._cleanup_closed_disabled: 

1612 self._cleanup_closed_transports.append(transp) 

1613 last_exc = exc 

1614 # Remove the bad peer from the list of addr_infos 

1615 sock: socket.socket = transp.get_extra_info("socket") 

1616 bad_peer = sock.getpeername() 

1617 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1618 continue 

1619 

1620 return transp, proto 

1621 else: 

1622 assert last_exc is not None 

1623 raise last_exc 

1624 

1625 async def _create_proxy_connection( 

1626 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1627 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1628 self._fail_on_no_start_tls(req) 

1629 runtime_has_start_tls = self._loop_supports_start_tls() 

1630 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req) 

1631 

1632 # create connection to proxy server 

1633 transport, proto = await self._create_direct_connection( 

1634 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1635 ) 

1636 

1637 if req.is_ssl(): 

1638 if runtime_has_start_tls: 

1639 self._warn_about_tls_in_tls(transport, req) 

1640 

1641 # For HTTPS requests over HTTP proxy 

1642 # we must notify proxy to tunnel connection 

1643 # so we send CONNECT command: 

1644 # CONNECT www.python.org:443 HTTP/1.1 

1645 # Host: www.python.org 

1646 # 

1647 # next we must do TLS handshake and so on 

1648 # to do this we must wrap raw socket into secure one 

1649 # asyncio handles this perfectly 

1650 proxy_req.method = hdrs.METH_CONNECT 

1651 proxy_req.url = req.url 

1652 key = req.connection_key._replace( 

1653 proxy=None, proxy_auth=None, proxy_headers_hash=None 

1654 ) 

1655 conn = _ConnectTunnelConnection(self, key, proto, self._loop) 

1656 proxy_resp = await proxy_req.send(conn) 

1657 try: 

1658 protocol = conn._protocol 

1659 assert protocol is not None 

1660 

1661 # read_until_eof=True will ensure the connection isn't closed 

1662 # once the response is received and processed allowing 

1663 # START_TLS to work on the connection below. 

1664 protocol.set_response_params( 

1665 read_until_eof=runtime_has_start_tls, 

1666 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1667 ) 

1668 resp = await proxy_resp.start(conn) 

1669 except BaseException: 

1670 proxy_resp.close() 

1671 conn.close() 

1672 raise 

1673 else: 

1674 conn._protocol = None 

1675 try: 

1676 if resp.status != 200: 

1677 message = resp.reason 

1678 if message is None: 

1679 message = HTTPStatus(resp.status).phrase 

1680 raise ClientHttpProxyError( 

1681 proxy_resp.request_info, 

1682 resp.history, 

1683 status=resp.status, 

1684 message=message, 

1685 headers=resp.headers, 

1686 ) 

1687 if not runtime_has_start_tls: 

1688 rawsock = transport.get_extra_info("socket", default=None) 

1689 if rawsock is None: 

1690 raise RuntimeError( 

1691 "Transport does not expose socket instance" 

1692 ) 

1693 # Duplicate the socket, so now we can close proxy transport 

1694 rawsock = rawsock.dup() 

1695 except BaseException: 

1696 # It shouldn't be closed in `finally` because it's fed to 

1697 # `loop.start_tls()` and the docs say not to touch it after 

1698 # passing there. 

1699 transport.close() 

1700 raise 

1701 finally: 

1702 if not runtime_has_start_tls: 

1703 transport.close() 

1704 

1705 if not runtime_has_start_tls: 

1706 # HTTP proxy with support for upgrade to HTTPS 

1707 sslcontext = self._get_ssl_context(req) 

1708 return await self._wrap_existing_connection( 

1709 self._factory, 

1710 timeout=timeout, 

1711 ssl=sslcontext, 

1712 sock=rawsock, 

1713 server_hostname=req.host, 

1714 req=req, 

1715 ) 

1716 

1717 return await self._start_tls_connection( 

1718 # Access the old transport for the last time before it's 

1719 # closed and forgotten forever: 

1720 transport, 

1721 req=req, 

1722 timeout=timeout, 

1723 ) 

1724 finally: 

1725 proxy_resp.close() 

1726 

1727 return transport, proto 

1728 

1729 

1730class UnixConnector(BaseConnector): 

1731 """Unix socket connector. 

1732 

1733 path - Unix socket path. 

1734 keepalive_timeout - (optional) Keep-alive timeout. 

1735 force_close - Set to True to force close and do reconnect 

1736 after each request (and between redirects). 

1737 limit - The total number of simultaneous connections. 

1738 limit_per_host - Number of simultaneous connections to one host. 

1739 loop - Optional event loop. 

1740 """ 

1741 

1742 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"}) 

1743 

1744 def __init__( 

1745 self, 

1746 path: str, 

1747 force_close: bool = False, 

1748 keepalive_timeout: Union[object, float, None] = sentinel, 

1749 limit: int = 100, 

1750 limit_per_host: int = 0, 

1751 loop: Optional[asyncio.AbstractEventLoop] = None, 

1752 ) -> None: 

1753 super().__init__( 

1754 force_close=force_close, 

1755 keepalive_timeout=keepalive_timeout, 

1756 limit=limit, 

1757 limit_per_host=limit_per_host, 

1758 loop=loop, 

1759 ) 

1760 self._path = path 

1761 

1762 @property 

1763 def path(self) -> str: 

1764 """Path to unix socket.""" 

1765 return self._path 

1766 

1767 async def _create_connection( 

1768 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1769 ) -> ResponseHandler: 

1770 try: 

1771 async with ceil_timeout( 

1772 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1773 ): 

1774 _, proto = await self._loop.create_unix_connection( 

1775 self._factory, self._path 

1776 ) 

1777 except OSError as exc: 

1778 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1779 raise 

1780 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1781 

1782 return proto 

1783 

1784 

1785class NamedPipeConnector(BaseConnector): 

1786 """Named pipe connector. 

1787 

1788 Only supported by the proactor event loop. 

1789 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1790 

1791 path - Windows named pipe path. 

1792 keepalive_timeout - (optional) Keep-alive timeout. 

1793 force_close - Set to True to force close and do reconnect 

1794 after each request (and between redirects). 

1795 limit - The total number of simultaneous connections. 

1796 limit_per_host - Number of simultaneous connections to one host. 

1797 loop - Optional event loop. 

1798 """ 

1799 

1800 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"}) 

1801 

1802 def __init__( 

1803 self, 

1804 path: str, 

1805 force_close: bool = False, 

1806 keepalive_timeout: Union[object, float, None] = sentinel, 

1807 limit: int = 100, 

1808 limit_per_host: int = 0, 

1809 loop: Optional[asyncio.AbstractEventLoop] = None, 

1810 ) -> None: 

1811 super().__init__( 

1812 force_close=force_close, 

1813 keepalive_timeout=keepalive_timeout, 

1814 limit=limit, 

1815 limit_per_host=limit_per_host, 

1816 loop=loop, 

1817 ) 

1818 if not isinstance( 

1819 self._loop, 

1820 asyncio.ProactorEventLoop, # type: ignore[attr-defined] 

1821 ): 

1822 raise RuntimeError( 

1823 "Named Pipes only available in proactor loop under windows" 

1824 ) 

1825 self._path = path 

1826 

1827 @property 

1828 def path(self) -> str: 

1829 """Path to the named pipe.""" 

1830 return self._path 

1831 

1832 async def _create_connection( 

1833 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1834 ) -> ResponseHandler: 

1835 try: 

1836 async with ceil_timeout( 

1837 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1838 ): 

1839 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1840 self._factory, self._path 

1841 ) 

1842 # the drain is required so that the connection_made is called 

1843 # and transport is set otherwise it is not set before the 

1844 # `assert conn.transport is not None` 

1845 # in client.py's _request method 

1846 await asyncio.sleep(0) 

1847 # other option is to manually set transport like 

1848 # `proto.transport = trans` 

1849 except OSError as exc: 

1850 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1851 raise 

1852 raise ClientConnectorError(req.connection_key, exc) from exc 

1853 

1854 return cast(ResponseHandler, proto)