Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

841 statements  

1import asyncio 

2import functools 

3import random 

4import socket 

5import sys 

6import traceback 

7import warnings 

8from collections import OrderedDict, defaultdict, deque 

9from contextlib import suppress 

10from http import HTTPStatus 

11from itertools import chain, cycle, islice 

12from time import monotonic 

13from types import TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Awaitable, 

18 Callable, 

19 DefaultDict, 

20 Deque, 

21 Dict, 

22 Iterator, 

23 List, 

24 Literal, 

25 Optional, 

26 Sequence, 

27 Set, 

28 Tuple, 

29 Type, 

30 Union, 

31 cast, 

32) 

33 

34import aiohappyeyeballs 

35from aiohappyeyeballs import AddrInfoType, SocketFactoryType 

36 

37from . import hdrs, helpers 

38from .abc import AbstractResolver, ResolveResult 

39from .client_exceptions import ( 

40 ClientConnectionError, 

41 ClientConnectorCertificateError, 

42 ClientConnectorDNSError, 

43 ClientConnectorError, 

44 ClientConnectorSSLError, 

45 ClientHttpProxyError, 

46 ClientProxyConnectionError, 

47 ServerFingerprintMismatch, 

48 UnixClientConnectorError, 

49 cert_errors, 

50 ssl_errors, 

51) 

52from .client_proto import ResponseHandler 

53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params 

54from .helpers import ( 

55 _SENTINEL, 

56 ceil_timeout, 

57 is_ip_address, 

58 noop, 

59 sentinel, 

60 set_exception, 

61 set_result, 

62) 

63from .log import client_logger 

64from .resolver import DefaultResolver 

65 

66if sys.version_info >= (3, 12): 

67 from collections.abc import Buffer 

68else: 

69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

70 

71if TYPE_CHECKING: 

72 import ssl 

73 

74 SSLContext = ssl.SSLContext 

75else: 

76 try: 

77 import ssl 

78 

79 SSLContext = ssl.SSLContext 

80 except ImportError: # pragma: no cover 

81 ssl = None # type: ignore[assignment] 

82 SSLContext = object # type: ignore[misc,assignment] 

83 

84EMPTY_SCHEMA_SET = frozenset({""}) 

85HTTP_SCHEMA_SET = frozenset({"http", "https"}) 

86WS_SCHEMA_SET = frozenset({"ws", "wss"}) 

87 

88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET 

89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET 

90 

91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < ( 

92 3, 

93 13, 

94 1, 

95) or sys.version_info < (3, 12, 7) 

96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 

97# which first appeared in Python 3.12.7 and 3.13.1 

98 

99 

100__all__ = ( 

101 "BaseConnector", 

102 "TCPConnector", 

103 "UnixConnector", 

104 "NamedPipeConnector", 

105 "AddrInfoType", 

106 "SocketFactoryType", 

107) 

108 

109 

110if TYPE_CHECKING: 

111 from .client import ClientTimeout 

112 from .client_reqrep import ConnectionKey 

113 from .tracing import Trace 

114 

115 

116class _DeprecationWaiter: 

117 __slots__ = ("_awaitable", "_awaited") 

118 

119 def __init__(self, awaitable: Awaitable[Any]) -> None: 

120 self._awaitable = awaitable 

121 self._awaited = False 

122 

123 def __await__(self) -> Any: 

124 self._awaited = True 

125 return self._awaitable.__await__() 

126 

127 def __del__(self) -> None: 

128 if not self._awaited: 

129 warnings.warn( 

130 "Connector.close() is a coroutine, " 

131 "please use await connector.close()", 

132 DeprecationWarning, 

133 ) 

134 

135 

136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None: 

137 """Wait for all waiters to finish closing.""" 

138 results = await asyncio.gather(*waiters, return_exceptions=True) 

139 for res in results: 

140 if isinstance(res, Exception): 

141 client_logger.debug("Error while closing connector: %r", res) 

142 

143 

144class Connection: 

145 

146 _source_traceback = None 

147 

148 def __init__( 

149 self, 

150 connector: "BaseConnector", 

151 key: "ConnectionKey", 

152 protocol: ResponseHandler, 

153 loop: asyncio.AbstractEventLoop, 

154 ) -> None: 

155 self._key = key 

156 self._connector = connector 

157 self._loop = loop 

158 self._protocol: Optional[ResponseHandler] = protocol 

159 self._callbacks: List[Callable[[], None]] = [] 

160 

161 if loop.get_debug(): 

162 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

163 

164 def __repr__(self) -> str: 

165 return f"Connection<{self._key}>" 

166 

167 def __del__(self, _warnings: Any = warnings) -> None: 

168 if self._protocol is not None: 

169 kwargs = {"source": self} 

170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs) 

171 if self._loop.is_closed(): 

172 return 

173 

174 self._connector._release(self._key, self._protocol, should_close=True) 

175 

176 context = {"client_connection": self, "message": "Unclosed connection"} 

177 if self._source_traceback is not None: 

178 context["source_traceback"] = self._source_traceback 

179 self._loop.call_exception_handler(context) 

180 

181 def __bool__(self) -> Literal[True]: 

182 """Force subclasses to not be falsy, to make checks simpler.""" 

183 return True 

184 

185 @property 

186 def loop(self) -> asyncio.AbstractEventLoop: 

187 warnings.warn( 

188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2 

189 ) 

190 return self._loop 

191 

192 @property 

193 def transport(self) -> Optional[asyncio.Transport]: 

194 if self._protocol is None: 

195 return None 

196 return self._protocol.transport 

197 

198 @property 

199 def protocol(self) -> Optional[ResponseHandler]: 

200 return self._protocol 

201 

202 def add_callback(self, callback: Callable[[], None]) -> None: 

203 if callback is not None: 

204 self._callbacks.append(callback) 

205 

206 def _notify_release(self) -> None: 

207 callbacks, self._callbacks = self._callbacks[:], [] 

208 

209 for cb in callbacks: 

210 with suppress(Exception): 

211 cb() 

212 

213 def close(self) -> None: 

214 self._notify_release() 

215 

216 if self._protocol is not None: 

217 self._connector._release(self._key, self._protocol, should_close=True) 

218 self._protocol = None 

219 

220 def release(self) -> None: 

221 self._notify_release() 

222 

223 if self._protocol is not None: 

224 self._connector._release(self._key, self._protocol) 

225 self._protocol = None 

226 

227 @property 

228 def closed(self) -> bool: 

229 return self._protocol is None or not self._protocol.is_connected() 

230 

231 

232class _ConnectTunnelConnection(Connection): 

233 """Special connection wrapper for CONNECT tunnels that must never be pooled. 

234 

235 This connection wraps the proxy connection that will be upgraded with TLS. 

236 It must never be released to the pool because: 

237 1. Its 'closed' future will never complete, causing session.close() to hang 

238 2. It represents an intermediate state, not a reusable connection 

239 3. The real connection (with TLS) will be created separately 

240 """ 

241 

242 def release(self) -> None: 

243 """Do nothing - don't pool or close the connection. 

244 

245 These connections are an intermediate state during the CONNECT tunnel 

246 setup and will be cleaned up naturally after the TLS upgrade. If they 

247 were to be pooled, they would never be properly closed, causing 

248 session.close() to wait forever for their 'closed' future. 

249 """ 

250 

251 

252class _TransportPlaceholder: 

253 """placeholder for BaseConnector.connect function""" 

254 

255 __slots__ = ("closed", "transport") 

256 

257 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None: 

258 """Initialize a placeholder for a transport.""" 

259 self.closed = closed_future 

260 self.transport = None 

261 

262 def close(self) -> None: 

263 """Close the placeholder.""" 

264 

265 def abort(self) -> None: 

266 """Abort the placeholder (does nothing).""" 

267 

268 

269class BaseConnector: 

270 """Base connector class. 

271 

272 keepalive_timeout - (optional) Keep-alive timeout. 

273 force_close - Set to True to force close and do reconnect 

274 after each request (and between redirects). 

275 limit - The total number of simultaneous connections. 

276 limit_per_host - Number of simultaneous connections to one host. 

277 enable_cleanup_closed - Enables clean-up closed ssl transports. 

278 Disabled by default. 

279 timeout_ceil_threshold - Trigger ceiling of timeout values when 

280 it's above timeout_ceil_threshold. 

281 loop - Optional event loop. 

282 """ 

283 

284 _closed = True # prevent AttributeError in __del__ if ctor was failed 

285 _source_traceback = None 

286 

287 # abort transport after 2 seconds (cleanup broken connections) 

288 _cleanup_closed_period = 2.0 

289 

290 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET 

291 

292 def __init__( 

293 self, 

294 *, 

295 keepalive_timeout: Union[object, None, float] = sentinel, 

296 force_close: bool = False, 

297 limit: int = 100, 

298 limit_per_host: int = 0, 

299 enable_cleanup_closed: bool = False, 

300 loop: Optional[asyncio.AbstractEventLoop] = None, 

301 timeout_ceil_threshold: float = 5, 

302 ) -> None: 

303 

304 if force_close: 

305 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

306 raise ValueError( 

307 "keepalive_timeout cannot be set if force_close is True" 

308 ) 

309 else: 

310 if keepalive_timeout is sentinel: 

311 keepalive_timeout = 15.0 

312 

313 loop = loop or asyncio.get_running_loop() 

314 self._timeout_ceil_threshold = timeout_ceil_threshold 

315 

316 self._closed = False 

317 if loop.get_debug(): 

318 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

319 

320 # Connection pool of reusable connections. 

321 # We use a deque to store connections because it has O(1) popleft() 

322 # and O(1) append() operations to implement a FIFO queue. 

323 self._conns: DefaultDict[ 

324 ConnectionKey, Deque[Tuple[ResponseHandler, float]] 

325 ] = defaultdict(deque) 

326 self._limit = limit 

327 self._limit_per_host = limit_per_host 

328 self._acquired: Set[ResponseHandler] = set() 

329 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = ( 

330 defaultdict(set) 

331 ) 

332 self._keepalive_timeout = cast(float, keepalive_timeout) 

333 self._force_close = force_close 

334 

335 # {host_key: FIFO list of waiters} 

336 # The FIFO is implemented with an OrderedDict with None keys because 

337 # python does not have an ordered set. 

338 self._waiters: DefaultDict[ 

339 ConnectionKey, OrderedDict[asyncio.Future[None], None] 

340 ] = defaultdict(OrderedDict) 

341 

342 self._loop = loop 

343 self._factory = functools.partial(ResponseHandler, loop=loop) 

344 

345 # start keep-alive connection cleanup task 

346 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

347 

348 # start cleanup closed transports task 

349 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

350 

351 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED: 

352 warnings.warn( 

353 "enable_cleanup_closed ignored because " 

354 "https://github.com/python/cpython/pull/118960 is fixed " 

355 f"in Python version {sys.version_info}", 

356 DeprecationWarning, 

357 stacklevel=2, 

358 ) 

359 enable_cleanup_closed = False 

360 

361 self._cleanup_closed_disabled = not enable_cleanup_closed 

362 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

363 self._placeholder_future: asyncio.Future[Optional[Exception]] = ( 

364 loop.create_future() 

365 ) 

366 self._placeholder_future.set_result(None) 

367 self._cleanup_closed() 

368 

369 def __del__(self, _warnings: Any = warnings) -> None: 

370 if self._closed: 

371 return 

372 if not self._conns: 

373 return 

374 

375 conns = [repr(c) for c in self._conns.values()] 

376 

377 self._close() 

378 

379 kwargs = {"source": self} 

380 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs) 

381 context = { 

382 "connector": self, 

383 "connections": conns, 

384 "message": "Unclosed connector", 

385 } 

386 if self._source_traceback is not None: 

387 context["source_traceback"] = self._source_traceback 

388 self._loop.call_exception_handler(context) 

389 

390 def __enter__(self) -> "BaseConnector": 

391 warnings.warn( 

392 '"with Connector():" is deprecated, ' 

393 'use "async with Connector():" instead', 

394 DeprecationWarning, 

395 ) 

396 return self 

397 

398 def __exit__(self, *exc: Any) -> None: 

399 self._close() 

400 

401 async def __aenter__(self) -> "BaseConnector": 

402 return self 

403 

404 async def __aexit__( 

405 self, 

406 exc_type: Optional[Type[BaseException]] = None, 

407 exc_value: Optional[BaseException] = None, 

408 exc_traceback: Optional[TracebackType] = None, 

409 ) -> None: 

410 await self.close() 

411 

412 @property 

413 def force_close(self) -> bool: 

414 """Ultimately close connection on releasing if True.""" 

415 return self._force_close 

416 

417 @property 

418 def limit(self) -> int: 

419 """The total number for simultaneous connections. 

420 

421 If limit is 0 the connector has no limit. 

422 The default limit size is 100. 

423 """ 

424 return self._limit 

425 

426 @property 

427 def limit_per_host(self) -> int: 

428 """The limit for simultaneous connections to the same endpoint. 

429 

430 Endpoints are the same if they are have equal 

431 (host, port, is_ssl) triple. 

432 """ 

433 return self._limit_per_host 

434 

435 def _cleanup(self) -> None: 

436 """Cleanup unused transports.""" 

437 if self._cleanup_handle: 

438 self._cleanup_handle.cancel() 

439 # _cleanup_handle should be unset, otherwise _release() will not 

440 # recreate it ever! 

441 self._cleanup_handle = None 

442 

443 now = monotonic() 

444 timeout = self._keepalive_timeout 

445 

446 if self._conns: 

447 connections = defaultdict(deque) 

448 deadline = now - timeout 

449 for key, conns in self._conns.items(): 

450 alive: Deque[Tuple[ResponseHandler, float]] = deque() 

451 for proto, use_time in conns: 

452 if proto.is_connected() and use_time - deadline >= 0: 

453 alive.append((proto, use_time)) 

454 continue 

455 transport = proto.transport 

456 proto.close() 

457 if not self._cleanup_closed_disabled and key.is_ssl: 

458 self._cleanup_closed_transports.append(transport) 

459 

460 if alive: 

461 connections[key] = alive 

462 

463 self._conns = connections 

464 

465 if self._conns: 

466 self._cleanup_handle = helpers.weakref_handle( 

467 self, 

468 "_cleanup", 

469 timeout, 

470 self._loop, 

471 timeout_ceil_threshold=self._timeout_ceil_threshold, 

472 ) 

473 

474 def _cleanup_closed(self) -> None: 

475 """Double confirmation for transport close. 

476 

477 Some broken ssl servers may leave socket open without proper close. 

478 """ 

479 if self._cleanup_closed_handle: 

480 self._cleanup_closed_handle.cancel() 

481 

482 for transport in self._cleanup_closed_transports: 

483 if transport is not None: 

484 transport.abort() 

485 

486 self._cleanup_closed_transports = [] 

487 

488 if not self._cleanup_closed_disabled: 

489 self._cleanup_closed_handle = helpers.weakref_handle( 

490 self, 

491 "_cleanup_closed", 

492 self._cleanup_closed_period, 

493 self._loop, 

494 timeout_ceil_threshold=self._timeout_ceil_threshold, 

495 ) 

496 

497 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]: 

498 """Close all opened transports. 

499 

500 :param abort_ssl: If True, SSL connections will be aborted immediately 

501 without performing the shutdown handshake. This provides 

502 faster cleanup at the cost of less graceful disconnection. 

503 """ 

504 if not (waiters := self._close(abort_ssl=abort_ssl)): 

505 # If there are no connections to close, we can return a noop 

506 # awaitable to avoid scheduling a task on the event loop. 

507 return _DeprecationWaiter(noop()) 

508 coro = _wait_for_close(waiters) 

509 if sys.version_info >= (3, 12): 

510 # Optimization for Python 3.12, try to close connections 

511 # immediately to avoid having to schedule the task on the event loop. 

512 task = asyncio.Task(coro, loop=self._loop, eager_start=True) 

513 else: 

514 task = self._loop.create_task(coro) 

515 return _DeprecationWaiter(task) 

516 

517 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

518 waiters: List[Awaitable[object]] = [] 

519 

520 if self._closed: 

521 return waiters 

522 

523 self._closed = True 

524 

525 try: 

526 if self._loop.is_closed(): 

527 return waiters 

528 

529 # cancel cleanup task 

530 if self._cleanup_handle: 

531 self._cleanup_handle.cancel() 

532 

533 # cancel cleanup close task 

534 if self._cleanup_closed_handle: 

535 self._cleanup_closed_handle.cancel() 

536 

537 for data in self._conns.values(): 

538 for proto, _ in data: 

539 if ( 

540 abort_ssl 

541 and proto.transport 

542 and proto.transport.get_extra_info("sslcontext") is not None 

543 ): 

544 proto.abort() 

545 else: 

546 proto.close() 

547 if closed := proto.closed: 

548 waiters.append(closed) 

549 

550 for proto in self._acquired: 

551 if ( 

552 abort_ssl 

553 and proto.transport 

554 and proto.transport.get_extra_info("sslcontext") is not None 

555 ): 

556 proto.abort() 

557 else: 

558 proto.close() 

559 if closed := proto.closed: 

560 waiters.append(closed) 

561 

562 for transport in self._cleanup_closed_transports: 

563 if transport is not None: 

564 transport.abort() 

565 

566 return waiters 

567 

568 finally: 

569 self._conns.clear() 

570 self._acquired.clear() 

571 for keyed_waiters in self._waiters.values(): 

572 for keyed_waiter in keyed_waiters: 

573 keyed_waiter.cancel() 

574 self._waiters.clear() 

575 self._cleanup_handle = None 

576 self._cleanup_closed_transports.clear() 

577 self._cleanup_closed_handle = None 

578 

579 @property 

580 def closed(self) -> bool: 

581 """Is connector closed. 

582 

583 A readonly property. 

584 """ 

585 return self._closed 

586 

587 def _available_connections(self, key: "ConnectionKey") -> int: 

588 """ 

589 Return number of available connections. 

590 

591 The limit, limit_per_host and the connection key are taken into account. 

592 

593 If it returns less than 1 means that there are no connections 

594 available. 

595 """ 

596 # check total available connections 

597 # If there are no limits, this will always return 1 

598 total_remain = 1 

599 

600 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: 

601 return total_remain 

602 

603 # check limit per host 

604 if host_remain := self._limit_per_host: 

605 if acquired := self._acquired_per_host.get(key): 

606 host_remain -= len(acquired) 

607 if total_remain > host_remain: 

608 return host_remain 

609 

610 return total_remain 

611 

612 def _update_proxy_auth_header_and_build_proxy_req( 

613 self, req: ClientRequest 

614 ) -> ClientRequest: 

615 """Set Proxy-Authorization header for non-SSL proxy requests and builds the proxy request for SSL proxy requests.""" 

616 url = req.proxy 

617 assert url is not None 

618 headers: Dict[str, str] = {} 

619 if req.proxy_headers is not None: 

620 headers = req.proxy_headers # type: ignore[assignment] 

621 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

622 proxy_req = ClientRequest( 

623 hdrs.METH_GET, 

624 url, 

625 headers=headers, 

626 auth=req.proxy_auth, 

627 loop=self._loop, 

628 ssl=req.ssl, 

629 ) 

630 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

631 if auth is not None: 

632 if not req.is_ssl(): 

633 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

634 else: 

635 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

636 return proxy_req 

637 

638 async def connect( 

639 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

640 ) -> Connection: 

641 """Get from pool or create new connection.""" 

642 key = req.connection_key 

643 if (conn := await self._get(key, traces)) is not None: 

644 # If we do not have to wait and we can get a connection from the pool 

645 # we can avoid the timeout ceil logic and directly return the connection 

646 if req.proxy: 

647 self._update_proxy_auth_header_and_build_proxy_req(req) 

648 return conn 

649 

650 async with ceil_timeout(timeout.connect, timeout.ceil_threshold): 

651 if self._available_connections(key) <= 0: 

652 await self._wait_for_available_connection(key, traces) 

653 if (conn := await self._get(key, traces)) is not None: 

654 if req.proxy: 

655 self._update_proxy_auth_header_and_build_proxy_req(req) 

656 return conn 

657 

658 placeholder = cast( 

659 ResponseHandler, _TransportPlaceholder(self._placeholder_future) 

660 ) 

661 self._acquired.add(placeholder) 

662 if self._limit_per_host: 

663 self._acquired_per_host[key].add(placeholder) 

664 

665 try: 

666 # Traces are done inside the try block to ensure that the 

667 # that the placeholder is still cleaned up if an exception 

668 # is raised. 

669 if traces: 

670 for trace in traces: 

671 await trace.send_connection_create_start() 

672 proto = await self._create_connection(req, traces, timeout) 

673 if traces: 

674 for trace in traces: 

675 await trace.send_connection_create_end() 

676 except BaseException: 

677 self._release_acquired(key, placeholder) 

678 raise 

679 else: 

680 if self._closed: 

681 proto.close() 

682 raise ClientConnectionError("Connector is closed.") 

683 

684 # The connection was successfully created, drop the placeholder 

685 # and add the real connection to the acquired set. There should 

686 # be no awaits after the proto is added to the acquired set 

687 # to ensure that the connection is not left in the acquired set 

688 # on cancellation. 

689 self._acquired.remove(placeholder) 

690 self._acquired.add(proto) 

691 if self._limit_per_host: 

692 acquired_per_host = self._acquired_per_host[key] 

693 acquired_per_host.remove(placeholder) 

694 acquired_per_host.add(proto) 

695 return Connection(self, key, proto, self._loop) 

696 

697 async def _wait_for_available_connection( 

698 self, key: "ConnectionKey", traces: List["Trace"] 

699 ) -> None: 

700 """Wait for an available connection slot.""" 

701 # We loop here because there is a race between 

702 # the connection limit check and the connection 

703 # being acquired. If the connection is acquired 

704 # between the check and the await statement, we 

705 # need to loop again to check if the connection 

706 # slot is still available. 

707 attempts = 0 

708 while True: 

709 fut: asyncio.Future[None] = self._loop.create_future() 

710 keyed_waiters = self._waiters[key] 

711 keyed_waiters[fut] = None 

712 if attempts: 

713 # If we have waited before, we need to move the waiter 

714 # to the front of the queue as otherwise we might get 

715 # starved and hit the timeout. 

716 keyed_waiters.move_to_end(fut, last=False) 

717 

718 try: 

719 # Traces happen in the try block to ensure that the 

720 # the waiter is still cleaned up if an exception is raised. 

721 if traces: 

722 for trace in traces: 

723 await trace.send_connection_queued_start() 

724 await fut 

725 if traces: 

726 for trace in traces: 

727 await trace.send_connection_queued_end() 

728 finally: 

729 # pop the waiter from the queue if its still 

730 # there and not already removed by _release_waiter 

731 keyed_waiters.pop(fut, None) 

732 if not self._waiters.get(key, True): 

733 del self._waiters[key] 

734 

735 if self._available_connections(key) > 0: 

736 break 

737 attempts += 1 

738 

739 async def _get( 

740 self, key: "ConnectionKey", traces: List["Trace"] 

741 ) -> Optional[Connection]: 

742 """Get next reusable connection for the key or None. 

743 

744 The connection will be marked as acquired. 

745 """ 

746 if (conns := self._conns.get(key)) is None: 

747 return None 

748 

749 t1 = monotonic() 

750 while conns: 

751 proto, t0 = conns.popleft() 

752 # We will we reuse the connection if its connected and 

753 # the keepalive timeout has not been exceeded 

754 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: 

755 if not conns: 

756 # The very last connection was reclaimed: drop the key 

757 del self._conns[key] 

758 self._acquired.add(proto) 

759 if self._limit_per_host: 

760 self._acquired_per_host[key].add(proto) 

761 if traces: 

762 for trace in traces: 

763 try: 

764 await trace.send_connection_reuseconn() 

765 except BaseException: 

766 self._release_acquired(key, proto) 

767 raise 

768 return Connection(self, key, proto, self._loop) 

769 

770 # Connection cannot be reused, close it 

771 transport = proto.transport 

772 proto.close() 

773 # only for SSL transports 

774 if not self._cleanup_closed_disabled and key.is_ssl: 

775 self._cleanup_closed_transports.append(transport) 

776 

777 # No more connections: drop the key 

778 del self._conns[key] 

779 return None 

780 

781 def _release_waiter(self) -> None: 

782 """ 

783 Iterates over all waiters until one to be released is found. 

784 

785 The one to be released is not finished and 

786 belongs to a host that has available connections. 

787 """ 

788 if not self._waiters: 

789 return 

790 

791 # Having the dict keys ordered this avoids to iterate 

792 # at the same order at each call. 

793 queues = list(self._waiters) 

794 random.shuffle(queues) 

795 

796 for key in queues: 

797 if self._available_connections(key) < 1: 

798 continue 

799 

800 waiters = self._waiters[key] 

801 while waiters: 

802 waiter, _ = waiters.popitem(last=False) 

803 if not waiter.done(): 

804 waiter.set_result(None) 

805 return 

806 

807 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

808 """Release acquired connection.""" 

809 if self._closed: 

810 # acquired connection is already released on connector closing 

811 return 

812 

813 self._acquired.discard(proto) 

814 if self._limit_per_host and (conns := self._acquired_per_host.get(key)): 

815 conns.discard(proto) 

816 if not conns: 

817 del self._acquired_per_host[key] 

818 self._release_waiter() 

819 

820 def _release( 

821 self, 

822 key: "ConnectionKey", 

823 protocol: ResponseHandler, 

824 *, 

825 should_close: bool = False, 

826 ) -> None: 

827 if self._closed: 

828 # acquired connection is already released on connector closing 

829 return 

830 

831 self._release_acquired(key, protocol) 

832 

833 if self._force_close or should_close or protocol.should_close: 

834 transport = protocol.transport 

835 protocol.close() 

836 

837 if key.is_ssl and not self._cleanup_closed_disabled: 

838 self._cleanup_closed_transports.append(transport) 

839 return 

840 

841 self._conns[key].append((protocol, monotonic())) 

842 

843 if self._cleanup_handle is None: 

844 self._cleanup_handle = helpers.weakref_handle( 

845 self, 

846 "_cleanup", 

847 self._keepalive_timeout, 

848 self._loop, 

849 timeout_ceil_threshold=self._timeout_ceil_threshold, 

850 ) 

851 

852 async def _create_connection( 

853 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

854 ) -> ResponseHandler: 

855 raise NotImplementedError() 

856 

857 

858class _DNSCacheTable: 

859 def __init__(self, ttl: Optional[float] = None) -> None: 

860 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {} 

861 self._timestamps: Dict[Tuple[str, int], float] = {} 

862 self._ttl = ttl 

863 

864 def __contains__(self, host: object) -> bool: 

865 return host in self._addrs_rr 

866 

867 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None: 

868 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

869 

870 if self._ttl is not None: 

871 self._timestamps[key] = monotonic() 

872 

873 def remove(self, key: Tuple[str, int]) -> None: 

874 self._addrs_rr.pop(key, None) 

875 

876 if self._ttl is not None: 

877 self._timestamps.pop(key, None) 

878 

879 def clear(self) -> None: 

880 self._addrs_rr.clear() 

881 self._timestamps.clear() 

882 

883 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]: 

884 loop, length = self._addrs_rr[key] 

885 addrs = list(islice(loop, length)) 

886 # Consume one more element to shift internal state of `cycle` 

887 next(loop) 

888 return addrs 

889 

890 def expired(self, key: Tuple[str, int]) -> bool: 

891 if self._ttl is None: 

892 return False 

893 

894 return self._timestamps[key] + self._ttl < monotonic() 

895 

896 

897def _make_ssl_context(verified: bool) -> SSLContext: 

898 """Create SSL context. 

899 

900 This method is not async-friendly and should be called from a thread 

901 because it will load certificates from disk and do other blocking I/O. 

902 """ 

903 if ssl is None: 

904 # No ssl support 

905 return None 

906 if verified: 

907 sslcontext = ssl.create_default_context() 

908 else: 

909 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

910 sslcontext.options |= ssl.OP_NO_SSLv2 

911 sslcontext.options |= ssl.OP_NO_SSLv3 

912 sslcontext.check_hostname = False 

913 sslcontext.verify_mode = ssl.CERT_NONE 

914 sslcontext.options |= ssl.OP_NO_COMPRESSION 

915 sslcontext.set_default_verify_paths() 

916 sslcontext.set_alpn_protocols(("http/1.1",)) 

917 return sslcontext 

918 

919 

920# The default SSLContext objects are created at import time 

921# since they do blocking I/O to load certificates from disk, 

922# and imports should always be done before the event loop starts 

923# or in a thread. 

924_SSL_CONTEXT_VERIFIED = _make_ssl_context(True) 

925_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False) 

926 

927 

928class TCPConnector(BaseConnector): 

929 """TCP connector. 

930 

931 verify_ssl - Set to True to check ssl certifications. 

932 fingerprint - Pass the binary sha256 

933 digest of the expected certificate in DER format to verify 

934 that the certificate the server presents matches. See also 

935 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning 

936 resolver - Enable DNS lookups and use this 

937 resolver 

938 use_dns_cache - Use memory cache for DNS lookups. 

939 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

940 family - socket address family 

941 local_addr - local tuple of (host, port) to bind socket to 

942 

943 keepalive_timeout - (optional) Keep-alive timeout. 

944 force_close - Set to True to force close and do reconnect 

945 after each request (and between redirects). 

946 limit - The total number of simultaneous connections. 

947 limit_per_host - Number of simultaneous connections to one host. 

948 enable_cleanup_closed - Enables clean-up closed ssl transports. 

949 Disabled by default. 

950 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

951 as defined in RFC 8305. To disable 

952 the happy eyeballs algorithm, set to None. 

953 interleave - “First Address Family Count” as defined in RFC 8305 

954 loop - Optional event loop. 

955 socket_factory - A SocketFactoryType function that, if supplied, 

956 will be used to create sockets given an 

957 AddrInfoType. 

958 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0. 

959 Grace period for SSL shutdown handshake on TLS 

960 connections. Default is 0 seconds (immediate abort). 

961 This parameter allowed for a clean SSL shutdown by 

962 notifying the remote peer of connection closure, 

963 while avoiding excessive delays during connector cleanup. 

964 Note: Only takes effect on Python 3.11+. 

965 """ 

966 

967 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"}) 

968 

969 def __init__( 

970 self, 

971 *, 

972 verify_ssl: bool = True, 

973 fingerprint: Optional[bytes] = None, 

974 use_dns_cache: bool = True, 

975 ttl_dns_cache: Optional[int] = 10, 

976 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC, 

977 ssl_context: Optional[SSLContext] = None, 

978 ssl: Union[bool, Fingerprint, SSLContext] = True, 

979 local_addr: Optional[Tuple[str, int]] = None, 

980 resolver: Optional[AbstractResolver] = None, 

981 keepalive_timeout: Union[None, float, object] = sentinel, 

982 force_close: bool = False, 

983 limit: int = 100, 

984 limit_per_host: int = 0, 

985 enable_cleanup_closed: bool = False, 

986 loop: Optional[asyncio.AbstractEventLoop] = None, 

987 timeout_ceil_threshold: float = 5, 

988 happy_eyeballs_delay: Optional[float] = 0.25, 

989 interleave: Optional[int] = None, 

990 socket_factory: Optional[SocketFactoryType] = None, 

991 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel, 

992 ): 

993 super().__init__( 

994 keepalive_timeout=keepalive_timeout, 

995 force_close=force_close, 

996 limit=limit, 

997 limit_per_host=limit_per_host, 

998 enable_cleanup_closed=enable_cleanup_closed, 

999 loop=loop, 

1000 timeout_ceil_threshold=timeout_ceil_threshold, 

1001 ) 

1002 

1003 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

1004 

1005 self._resolver: AbstractResolver 

1006 if resolver is None: 

1007 self._resolver = DefaultResolver(loop=self._loop) 

1008 self._resolver_owner = True 

1009 else: 

1010 self._resolver = resolver 

1011 self._resolver_owner = False 

1012 

1013 self._use_dns_cache = use_dns_cache 

1014 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 

1015 self._throttle_dns_futures: Dict[ 

1016 Tuple[str, int], Set["asyncio.Future[None]"] 

1017 ] = {} 

1018 self._family = family 

1019 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

1020 self._happy_eyeballs_delay = happy_eyeballs_delay 

1021 self._interleave = interleave 

1022 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set() 

1023 self._socket_factory = socket_factory 

1024 self._ssl_shutdown_timeout: Optional[float] 

1025 # Handle ssl_shutdown_timeout with warning for Python < 3.11 

1026 if ssl_shutdown_timeout is sentinel: 

1027 self._ssl_shutdown_timeout = 0 

1028 else: 

1029 # Deprecation warning for ssl_shutdown_timeout parameter 

1030 warnings.warn( 

1031 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0", 

1032 DeprecationWarning, 

1033 stacklevel=2, 

1034 ) 

1035 if ( 

1036 sys.version_info < (3, 11) 

1037 and ssl_shutdown_timeout is not None 

1038 and ssl_shutdown_timeout != 0 

1039 ): 

1040 warnings.warn( 

1041 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; " 

1042 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.", 

1043 RuntimeWarning, 

1044 stacklevel=2, 

1045 ) 

1046 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

1047 

1048 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

1049 """Close all ongoing DNS calls.""" 

1050 for fut in chain.from_iterable(self._throttle_dns_futures.values()): 

1051 fut.cancel() 

1052 

1053 waiters = super()._close(abort_ssl=abort_ssl) 

1054 

1055 for t in self._resolve_host_tasks: 

1056 t.cancel() 

1057 waiters.append(t) 

1058 

1059 return waiters 

1060 

1061 async def close(self, *, abort_ssl: bool = False) -> None: 

1062 """ 

1063 Close all opened transports. 

1064 

1065 :param abort_ssl: If True, SSL connections will be aborted immediately 

1066 without performing the shutdown handshake. If False (default), 

1067 the behavior is determined by ssl_shutdown_timeout: 

1068 - If ssl_shutdown_timeout=0: connections are aborted 

1069 - If ssl_shutdown_timeout>0: graceful shutdown is performed 

1070 """ 

1071 if self._resolver_owner: 

1072 await self._resolver.close() 

1073 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default 

1074 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0) 

1075 

1076 @property 

1077 def family(self) -> int: 

1078 """Socket family like AF_INET.""" 

1079 return self._family 

1080 

1081 @property 

1082 def use_dns_cache(self) -> bool: 

1083 """True if local DNS caching is enabled.""" 

1084 return self._use_dns_cache 

1085 

1086 def clear_dns_cache( 

1087 self, host: Optional[str] = None, port: Optional[int] = None 

1088 ) -> None: 

1089 """Remove specified host/port or clear all dns local cache.""" 

1090 if host is not None and port is not None: 

1091 self._cached_hosts.remove((host, port)) 

1092 elif host is not None or port is not None: 

1093 raise ValueError("either both host and port or none of them are allowed") 

1094 else: 

1095 self._cached_hosts.clear() 

1096 

1097 async def _resolve_host( 

1098 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None 

1099 ) -> List[ResolveResult]: 

1100 """Resolve host and return list of addresses.""" 

1101 if is_ip_address(host): 

1102 return [ 

1103 { 

1104 "hostname": host, 

1105 "host": host, 

1106 "port": port, 

1107 "family": self._family, 

1108 "proto": 0, 

1109 "flags": 0, 

1110 } 

1111 ] 

1112 

1113 if not self._use_dns_cache: 

1114 

1115 if traces: 

1116 for trace in traces: 

1117 await trace.send_dns_resolvehost_start(host) 

1118 

1119 res = await self._resolver.resolve(host, port, family=self._family) 

1120 

1121 if traces: 

1122 for trace in traces: 

1123 await trace.send_dns_resolvehost_end(host) 

1124 

1125 return res 

1126 

1127 key = (host, port) 

1128 if key in self._cached_hosts and not self._cached_hosts.expired(key): 

1129 # get result early, before any await (#4014) 

1130 result = self._cached_hosts.next_addrs(key) 

1131 

1132 if traces: 

1133 for trace in traces: 

1134 await trace.send_dns_cache_hit(host) 

1135 return result 

1136 

1137 futures: Set["asyncio.Future[None]"] 

1138 # 

1139 # If multiple connectors are resolving the same host, we wait 

1140 # for the first one to resolve and then use the result for all of them. 

1141 # We use a throttle to ensure that we only resolve the host once 

1142 # and then use the result for all the waiters. 

1143 # 

1144 if key in self._throttle_dns_futures: 

1145 # get futures early, before any await (#4014) 

1146 futures = self._throttle_dns_futures[key] 

1147 future: asyncio.Future[None] = self._loop.create_future() 

1148 futures.add(future) 

1149 if traces: 

1150 for trace in traces: 

1151 await trace.send_dns_cache_hit(host) 

1152 try: 

1153 await future 

1154 finally: 

1155 futures.discard(future) 

1156 return self._cached_hosts.next_addrs(key) 

1157 

1158 # update dict early, before any await (#4014) 

1159 self._throttle_dns_futures[key] = futures = set() 

1160 # In this case we need to create a task to ensure that we can shield 

1161 # the task from cancellation as cancelling this lookup should not cancel 

1162 # the underlying lookup or else the cancel event will get broadcast to 

1163 # all the waiters across all connections. 

1164 # 

1165 coro = self._resolve_host_with_throttle(key, host, port, futures, traces) 

1166 loop = asyncio.get_running_loop() 

1167 if sys.version_info >= (3, 12): 

1168 # Optimization for Python 3.12, try to send immediately 

1169 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) 

1170 else: 

1171 resolved_host_task = loop.create_task(coro) 

1172 

1173 if not resolved_host_task.done(): 

1174 self._resolve_host_tasks.add(resolved_host_task) 

1175 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard) 

1176 

1177 try: 

1178 return await asyncio.shield(resolved_host_task) 

1179 except asyncio.CancelledError: 

1180 

1181 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None: 

1182 with suppress(Exception, asyncio.CancelledError): 

1183 fut.result() 

1184 

1185 resolved_host_task.add_done_callback(drop_exception) 

1186 raise 

1187 

1188 async def _resolve_host_with_throttle( 

1189 self, 

1190 key: Tuple[str, int], 

1191 host: str, 

1192 port: int, 

1193 futures: Set["asyncio.Future[None]"], 

1194 traces: Optional[Sequence["Trace"]], 

1195 ) -> List[ResolveResult]: 

1196 """Resolve host and set result for all waiters. 

1197 

1198 This method must be run in a task and shielded from cancellation 

1199 to avoid cancelling the underlying lookup. 

1200 """ 

1201 try: 

1202 if traces: 

1203 for trace in traces: 

1204 await trace.send_dns_cache_miss(host) 

1205 

1206 for trace in traces: 

1207 await trace.send_dns_resolvehost_start(host) 

1208 

1209 addrs = await self._resolver.resolve(host, port, family=self._family) 

1210 if traces: 

1211 for trace in traces: 

1212 await trace.send_dns_resolvehost_end(host) 

1213 

1214 self._cached_hosts.add(key, addrs) 

1215 for fut in futures: 

1216 set_result(fut, None) 

1217 except BaseException as e: 

1218 # any DNS exception is set for the waiters to raise the same exception. 

1219 # This coro is always run in task that is shielded from cancellation so 

1220 # we should never be propagating cancellation here. 

1221 for fut in futures: 

1222 set_exception(fut, e) 

1223 raise 

1224 finally: 

1225 self._throttle_dns_futures.pop(key) 

1226 

1227 return self._cached_hosts.next_addrs(key) 

1228 

1229 async def _create_connection( 

1230 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1231 ) -> ResponseHandler: 

1232 """Create connection. 

1233 

1234 Has same keyword arguments as BaseEventLoop.create_connection. 

1235 """ 

1236 if req.proxy: 

1237 _, proto = await self._create_proxy_connection(req, traces, timeout) 

1238 else: 

1239 _, proto = await self._create_direct_connection(req, traces, timeout) 

1240 

1241 return proto 

1242 

1243 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]: 

1244 """Logic to get the correct SSL context 

1245 

1246 0. if req.ssl is false, return None 

1247 

1248 1. if ssl_context is specified in req, use it 

1249 2. if _ssl_context is specified in self, use it 

1250 3. otherwise: 

1251 1. if verify_ssl is not specified in req, use self.ssl_context 

1252 (will generate a default context according to self.verify_ssl) 

1253 2. if verify_ssl is True in req, generate a default SSL context 

1254 3. if verify_ssl is False in req, generate a SSL context that 

1255 won't verify 

1256 """ 

1257 if not req.is_ssl(): 

1258 return None 

1259 

1260 if ssl is None: # pragma: no cover 

1261 raise RuntimeError("SSL is not supported.") 

1262 sslcontext = req.ssl 

1263 if isinstance(sslcontext, ssl.SSLContext): 

1264 return sslcontext 

1265 if sslcontext is not True: 

1266 # not verified or fingerprinted 

1267 return _SSL_CONTEXT_UNVERIFIED 

1268 sslcontext = self._ssl 

1269 if isinstance(sslcontext, ssl.SSLContext): 

1270 return sslcontext 

1271 if sslcontext is not True: 

1272 # not verified or fingerprinted 

1273 return _SSL_CONTEXT_UNVERIFIED 

1274 return _SSL_CONTEXT_VERIFIED 

1275 

1276 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]: 

1277 ret = req.ssl 

1278 if isinstance(ret, Fingerprint): 

1279 return ret 

1280 ret = self._ssl 

1281 if isinstance(ret, Fingerprint): 

1282 return ret 

1283 return None 

1284 

1285 async def _wrap_create_connection( 

1286 self, 

1287 *args: Any, 

1288 addr_infos: List[AddrInfoType], 

1289 req: ClientRequest, 

1290 timeout: "ClientTimeout", 

1291 client_error: Type[Exception] = ClientConnectorError, 

1292 **kwargs: Any, 

1293 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1294 try: 

1295 async with ceil_timeout( 

1296 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1297 ): 

1298 sock = await aiohappyeyeballs.start_connection( 

1299 addr_infos=addr_infos, 

1300 local_addr_infos=self._local_addr_infos, 

1301 happy_eyeballs_delay=self._happy_eyeballs_delay, 

1302 interleave=self._interleave, 

1303 loop=self._loop, 

1304 socket_factory=self._socket_factory, 

1305 ) 

1306 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used 

1307 if ( 

1308 kwargs.get("ssl") 

1309 and self._ssl_shutdown_timeout 

1310 and sys.version_info >= (3, 11) 

1311 ): 

1312 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout 

1313 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

1314 except cert_errors as exc: 

1315 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1316 except ssl_errors as exc: 

1317 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1318 except OSError as exc: 

1319 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1320 raise 

1321 raise client_error(req.connection_key, exc) from exc 

1322 

1323 async def _wrap_existing_connection( 

1324 self, 

1325 *args: Any, 

1326 req: ClientRequest, 

1327 timeout: "ClientTimeout", 

1328 client_error: Type[Exception] = ClientConnectorError, 

1329 **kwargs: Any, 

1330 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1331 try: 

1332 async with ceil_timeout( 

1333 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1334 ): 

1335 return await self._loop.create_connection(*args, **kwargs) 

1336 except cert_errors as exc: 

1337 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1338 except ssl_errors as exc: 

1339 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1340 except OSError as exc: 

1341 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1342 raise 

1343 raise client_error(req.connection_key, exc) from exc 

1344 

1345 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None: 

1346 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``. 

1347 

1348 It is necessary for TLS-in-TLS so that it is possible to 

1349 send HTTPS queries through HTTPS proxies. 

1350 

1351 This doesn't affect regular HTTP requests, though. 

1352 """ 

1353 if not req.is_ssl(): 

1354 return 

1355 

1356 proxy_url = req.proxy 

1357 assert proxy_url is not None 

1358 if proxy_url.scheme != "https": 

1359 return 

1360 

1361 self._check_loop_for_start_tls() 

1362 

1363 def _check_loop_for_start_tls(self) -> None: 

1364 try: 

1365 self._loop.start_tls 

1366 except AttributeError as attr_exc: 

1367 raise RuntimeError( 

1368 "An HTTPS request is being sent through an HTTPS proxy. " 

1369 "This needs support for TLS in TLS but it is not implemented " 

1370 "in your runtime for the stdlib asyncio.\n\n" 

1371 "Please upgrade to Python 3.11 or higher. For more details, " 

1372 "please see:\n" 

1373 "* https://bugs.python.org/issue37179\n" 

1374 "* https://github.com/python/cpython/pull/28073\n" 

1375 "* https://docs.aiohttp.org/en/stable/" 

1376 "client_advanced.html#proxy-support\n" 

1377 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1378 ) from attr_exc 

1379 

1380 def _loop_supports_start_tls(self) -> bool: 

1381 try: 

1382 self._check_loop_for_start_tls() 

1383 except RuntimeError: 

1384 return False 

1385 else: 

1386 return True 

1387 

1388 def _warn_about_tls_in_tls( 

1389 self, 

1390 underlying_transport: asyncio.Transport, 

1391 req: ClientRequest, 

1392 ) -> None: 

1393 """Issue a warning if the requested URL has HTTPS scheme.""" 

1394 if req.request_info.url.scheme != "https": 

1395 return 

1396 

1397 # Check if uvloop is being used, which supports TLS in TLS, 

1398 # otherwise assume that asyncio's native transport is being used. 

1399 if type(underlying_transport).__module__.startswith("uvloop"): 

1400 return 

1401 

1402 # Support in asyncio was added in Python 3.11 (bpo-44011) 

1403 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr( 

1404 underlying_transport, 

1405 "_start_tls_compatible", 

1406 False, 

1407 ) 

1408 

1409 if asyncio_supports_tls_in_tls: 

1410 return 

1411 

1412 warnings.warn( 

1413 "An HTTPS request is being sent through an HTTPS proxy. " 

1414 "This support for TLS in TLS is known to be disabled " 

1415 "in the stdlib asyncio (Python <3.11). This is why you'll probably see " 

1416 "an error in the log below.\n\n" 

1417 "It is possible to enable it via monkeypatching. " 

1418 "For more details, see:\n" 

1419 "* https://bugs.python.org/issue37179\n" 

1420 "* https://github.com/python/cpython/pull/28073\n\n" 

1421 "You can temporarily patch this as follows:\n" 

1422 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1423 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1424 RuntimeWarning, 

1425 source=self, 

1426 # Why `4`? At least 3 of the calls in the stack originate 

1427 # from the methods in this class. 

1428 stacklevel=3, 

1429 ) 

1430 

1431 async def _start_tls_connection( 

1432 self, 

1433 underlying_transport: asyncio.Transport, 

1434 req: ClientRequest, 

1435 timeout: "ClientTimeout", 

1436 client_error: Type[Exception] = ClientConnectorError, 

1437 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1438 """Wrap the raw TCP transport with TLS.""" 

1439 tls_proto = self._factory() # Create a brand new proto for TLS 

1440 sslcontext = self._get_ssl_context(req) 

1441 if TYPE_CHECKING: 

1442 # _start_tls_connection is unreachable in the current code path 

1443 # if sslcontext is None. 

1444 assert sslcontext is not None 

1445 

1446 try: 

1447 async with ceil_timeout( 

1448 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1449 ): 

1450 try: 

1451 # ssl_shutdown_timeout is only available in Python 3.11+ 

1452 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout: 

1453 tls_transport = await self._loop.start_tls( 

1454 underlying_transport, 

1455 tls_proto, 

1456 sslcontext, 

1457 server_hostname=req.server_hostname or req.host, 

1458 ssl_handshake_timeout=timeout.total, 

1459 ssl_shutdown_timeout=self._ssl_shutdown_timeout, 

1460 ) 

1461 else: 

1462 tls_transport = await self._loop.start_tls( 

1463 underlying_transport, 

1464 tls_proto, 

1465 sslcontext, 

1466 server_hostname=req.server_hostname or req.host, 

1467 ssl_handshake_timeout=timeout.total, 

1468 ) 

1469 except BaseException: 

1470 # We need to close the underlying transport since 

1471 # `start_tls()` probably failed before it had a 

1472 # chance to do this: 

1473 if self._ssl_shutdown_timeout == 0: 

1474 underlying_transport.abort() 

1475 else: 

1476 underlying_transport.close() 

1477 raise 

1478 if isinstance(tls_transport, asyncio.Transport): 

1479 fingerprint = self._get_fingerprint(req) 

1480 if fingerprint: 

1481 try: 

1482 fingerprint.check(tls_transport) 

1483 except ServerFingerprintMismatch: 

1484 tls_transport.close() 

1485 if not self._cleanup_closed_disabled: 

1486 self._cleanup_closed_transports.append(tls_transport) 

1487 raise 

1488 except cert_errors as exc: 

1489 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1490 except ssl_errors as exc: 

1491 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1492 except OSError as exc: 

1493 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1494 raise 

1495 raise client_error(req.connection_key, exc) from exc 

1496 except TypeError as type_err: 

1497 # Example cause looks like this: 

1498 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1499 # object at 0x7f760615e460> is not supported by start_tls() 

1500 

1501 raise ClientConnectionError( 

1502 "Cannot initialize a TLS-in-TLS connection to host " 

1503 f"{req.host!s}:{req.port:d} through an underlying connection " 

1504 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1505 f"[{type_err!s}]" 

1506 ) from type_err 

1507 else: 

1508 if tls_transport is None: 

1509 msg = "Failed to start TLS (possibly caused by closing transport)" 

1510 raise client_error(req.connection_key, OSError(msg)) 

1511 tls_proto.connection_made( 

1512 tls_transport 

1513 ) # Kick the state machine of the new TLS protocol 

1514 

1515 return tls_transport, tls_proto 

1516 

1517 def _convert_hosts_to_addr_infos( 

1518 self, hosts: List[ResolveResult] 

1519 ) -> List[AddrInfoType]: 

1520 """Converts the list of hosts to a list of addr_infos. 

1521 

1522 The list of hosts is the result of a DNS lookup. The list of 

1523 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1524 """ 

1525 addr_infos: List[AddrInfoType] = [] 

1526 for hinfo in hosts: 

1527 host = hinfo["host"] 

1528 is_ipv6 = ":" in host 

1529 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1530 if self._family and self._family != family: 

1531 continue 

1532 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1533 addr_infos.append( 

1534 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1535 ) 

1536 return addr_infos 

1537 

1538 async def _create_direct_connection( 

1539 self, 

1540 req: ClientRequest, 

1541 traces: List["Trace"], 

1542 timeout: "ClientTimeout", 

1543 *, 

1544 client_error: Type[Exception] = ClientConnectorError, 

1545 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1546 sslcontext = self._get_ssl_context(req) 

1547 fingerprint = self._get_fingerprint(req) 

1548 

1549 host = req.url.raw_host 

1550 assert host is not None 

1551 # Replace multiple trailing dots with a single one. 

1552 # A trailing dot is only present for fully-qualified domain names. 

1553 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1554 if host.endswith(".."): 

1555 host = host.rstrip(".") + "." 

1556 port = req.port 

1557 assert port is not None 

1558 try: 

1559 # Cancelling this lookup should not cancel the underlying lookup 

1560 # or else the cancel event will get broadcast to all the waiters 

1561 # across all connections. 

1562 hosts = await self._resolve_host(host, port, traces=traces) 

1563 except OSError as exc: 

1564 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1565 raise 

1566 # in case of proxy it is not ClientProxyConnectionError 

1567 # it is problem of resolving proxy ip itself 

1568 raise ClientConnectorDNSError(req.connection_key, exc) from exc 

1569 

1570 last_exc: Optional[Exception] = None 

1571 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1572 while addr_infos: 

1573 # Strip trailing dots, certificates contain FQDN without dots. 

1574 # See https://github.com/aio-libs/aiohttp/issues/3636 

1575 server_hostname = ( 

1576 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1577 ) 

1578 

1579 try: 

1580 transp, proto = await self._wrap_create_connection( 

1581 self._factory, 

1582 timeout=timeout, 

1583 ssl=sslcontext, 

1584 addr_infos=addr_infos, 

1585 server_hostname=server_hostname, 

1586 req=req, 

1587 client_error=client_error, 

1588 ) 

1589 except (ClientConnectorError, asyncio.TimeoutError) as exc: 

1590 last_exc = exc 

1591 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1592 continue 

1593 

1594 if req.is_ssl() and fingerprint: 

1595 try: 

1596 fingerprint.check(transp) 

1597 except ServerFingerprintMismatch as exc: 

1598 transp.close() 

1599 if not self._cleanup_closed_disabled: 

1600 self._cleanup_closed_transports.append(transp) 

1601 last_exc = exc 

1602 # Remove the bad peer from the list of addr_infos 

1603 sock: socket.socket = transp.get_extra_info("socket") 

1604 bad_peer = sock.getpeername() 

1605 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1606 continue 

1607 

1608 return transp, proto 

1609 else: 

1610 assert last_exc is not None 

1611 raise last_exc 

1612 

1613 async def _create_proxy_connection( 

1614 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1615 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1616 self._fail_on_no_start_tls(req) 

1617 runtime_has_start_tls = self._loop_supports_start_tls() 

1618 proxy_req = self._update_proxy_auth_header_and_build_proxy_req(req) 

1619 

1620 # create connection to proxy server 

1621 transport, proto = await self._create_direct_connection( 

1622 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1623 ) 

1624 

1625 if req.is_ssl(): 

1626 if runtime_has_start_tls: 

1627 self._warn_about_tls_in_tls(transport, req) 

1628 

1629 # For HTTPS requests over HTTP proxy 

1630 # we must notify proxy to tunnel connection 

1631 # so we send CONNECT command: 

1632 # CONNECT www.python.org:443 HTTP/1.1 

1633 # Host: www.python.org 

1634 # 

1635 # next we must do TLS handshake and so on 

1636 # to do this we must wrap raw socket into secure one 

1637 # asyncio handles this perfectly 

1638 proxy_req.method = hdrs.METH_CONNECT 

1639 proxy_req.url = req.url 

1640 key = req.connection_key._replace( 

1641 proxy=None, proxy_auth=None, proxy_headers_hash=None 

1642 ) 

1643 conn = _ConnectTunnelConnection(self, key, proto, self._loop) 

1644 proxy_resp = await proxy_req.send(conn) 

1645 try: 

1646 protocol = conn._protocol 

1647 assert protocol is not None 

1648 

1649 # read_until_eof=True will ensure the connection isn't closed 

1650 # once the response is received and processed allowing 

1651 # START_TLS to work on the connection below. 

1652 protocol.set_response_params( 

1653 read_until_eof=runtime_has_start_tls, 

1654 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1655 ) 

1656 resp = await proxy_resp.start(conn) 

1657 except BaseException: 

1658 proxy_resp.close() 

1659 conn.close() 

1660 raise 

1661 else: 

1662 conn._protocol = None 

1663 try: 

1664 if resp.status != 200: 

1665 message = resp.reason 

1666 if message is None: 

1667 message = HTTPStatus(resp.status).phrase 

1668 raise ClientHttpProxyError( 

1669 proxy_resp.request_info, 

1670 resp.history, 

1671 status=resp.status, 

1672 message=message, 

1673 headers=resp.headers, 

1674 ) 

1675 if not runtime_has_start_tls: 

1676 rawsock = transport.get_extra_info("socket", default=None) 

1677 if rawsock is None: 

1678 raise RuntimeError( 

1679 "Transport does not expose socket instance" 

1680 ) 

1681 # Duplicate the socket, so now we can close proxy transport 

1682 rawsock = rawsock.dup() 

1683 except BaseException: 

1684 # It shouldn't be closed in `finally` because it's fed to 

1685 # `loop.start_tls()` and the docs say not to touch it after 

1686 # passing there. 

1687 transport.close() 

1688 raise 

1689 finally: 

1690 if not runtime_has_start_tls: 

1691 transport.close() 

1692 

1693 if not runtime_has_start_tls: 

1694 # HTTP proxy with support for upgrade to HTTPS 

1695 sslcontext = self._get_ssl_context(req) 

1696 return await self._wrap_existing_connection( 

1697 self._factory, 

1698 timeout=timeout, 

1699 ssl=sslcontext, 

1700 sock=rawsock, 

1701 server_hostname=req.host, 

1702 req=req, 

1703 ) 

1704 

1705 return await self._start_tls_connection( 

1706 # Access the old transport for the last time before it's 

1707 # closed and forgotten forever: 

1708 transport, 

1709 req=req, 

1710 timeout=timeout, 

1711 ) 

1712 finally: 

1713 proxy_resp.close() 

1714 

1715 return transport, proto 

1716 

1717 

1718class UnixConnector(BaseConnector): 

1719 """Unix socket connector. 

1720 

1721 path - Unix socket path. 

1722 keepalive_timeout - (optional) Keep-alive timeout. 

1723 force_close - Set to True to force close and do reconnect 

1724 after each request (and between redirects). 

1725 limit - The total number of simultaneous connections. 

1726 limit_per_host - Number of simultaneous connections to one host. 

1727 loop - Optional event loop. 

1728 """ 

1729 

1730 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"}) 

1731 

1732 def __init__( 

1733 self, 

1734 path: str, 

1735 force_close: bool = False, 

1736 keepalive_timeout: Union[object, float, None] = sentinel, 

1737 limit: int = 100, 

1738 limit_per_host: int = 0, 

1739 loop: Optional[asyncio.AbstractEventLoop] = None, 

1740 ) -> None: 

1741 super().__init__( 

1742 force_close=force_close, 

1743 keepalive_timeout=keepalive_timeout, 

1744 limit=limit, 

1745 limit_per_host=limit_per_host, 

1746 loop=loop, 

1747 ) 

1748 self._path = path 

1749 

1750 @property 

1751 def path(self) -> str: 

1752 """Path to unix socket.""" 

1753 return self._path 

1754 

1755 async def _create_connection( 

1756 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1757 ) -> ResponseHandler: 

1758 try: 

1759 async with ceil_timeout( 

1760 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1761 ): 

1762 _, proto = await self._loop.create_unix_connection( 

1763 self._factory, self._path 

1764 ) 

1765 except OSError as exc: 

1766 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1767 raise 

1768 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1769 

1770 return proto 

1771 

1772 

1773class NamedPipeConnector(BaseConnector): 

1774 """Named pipe connector. 

1775 

1776 Only supported by the proactor event loop. 

1777 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1778 

1779 path - Windows named pipe path. 

1780 keepalive_timeout - (optional) Keep-alive timeout. 

1781 force_close - Set to True to force close and do reconnect 

1782 after each request (and between redirects). 

1783 limit - The total number of simultaneous connections. 

1784 limit_per_host - Number of simultaneous connections to one host. 

1785 loop - Optional event loop. 

1786 """ 

1787 

1788 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"}) 

1789 

1790 def __init__( 

1791 self, 

1792 path: str, 

1793 force_close: bool = False, 

1794 keepalive_timeout: Union[object, float, None] = sentinel, 

1795 limit: int = 100, 

1796 limit_per_host: int = 0, 

1797 loop: Optional[asyncio.AbstractEventLoop] = None, 

1798 ) -> None: 

1799 super().__init__( 

1800 force_close=force_close, 

1801 keepalive_timeout=keepalive_timeout, 

1802 limit=limit, 

1803 limit_per_host=limit_per_host, 

1804 loop=loop, 

1805 ) 

1806 if not isinstance( 

1807 self._loop, 

1808 asyncio.ProactorEventLoop, # type: ignore[attr-defined] 

1809 ): 

1810 raise RuntimeError( 

1811 "Named Pipes only available in proactor loop under windows" 

1812 ) 

1813 self._path = path 

1814 

1815 @property 

1816 def path(self) -> str: 

1817 """Path to the named pipe.""" 

1818 return self._path 

1819 

1820 async def _create_connection( 

1821 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1822 ) -> ResponseHandler: 

1823 try: 

1824 async with ceil_timeout( 

1825 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1826 ): 

1827 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1828 self._factory, self._path 

1829 ) 

1830 # the drain is required so that the connection_made is called 

1831 # and transport is set otherwise it is not set before the 

1832 # `assert conn.transport is not None` 

1833 # in client.py's _request method 

1834 await asyncio.sleep(0) 

1835 # other option is to manually set transport like 

1836 # `proto.transport = trans` 

1837 except OSError as exc: 

1838 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1839 raise 

1840 raise ClientConnectorError(req.connection_key, exc) from exc 

1841 

1842 return cast(ResponseHandler, proto)