Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

834 statements  

1import asyncio 

2import functools 

3import random 

4import socket 

5import sys 

6import traceback 

7import warnings 

8from collections import OrderedDict, defaultdict, deque 

9from contextlib import suppress 

10from http import HTTPStatus 

11from itertools import chain, cycle, islice 

12from time import monotonic 

13from types import TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Awaitable, 

18 Callable, 

19 DefaultDict, 

20 Deque, 

21 Dict, 

22 Iterator, 

23 List, 

24 Literal, 

25 Optional, 

26 Sequence, 

27 Set, 

28 Tuple, 

29 Type, 

30 Union, 

31 cast, 

32) 

33 

34import aiohappyeyeballs 

35from aiohappyeyeballs import AddrInfoType, SocketFactoryType 

36 

37from . import hdrs, helpers 

38from .abc import AbstractResolver, ResolveResult 

39from .client_exceptions import ( 

40 ClientConnectionError, 

41 ClientConnectorCertificateError, 

42 ClientConnectorDNSError, 

43 ClientConnectorError, 

44 ClientConnectorSSLError, 

45 ClientHttpProxyError, 

46 ClientProxyConnectionError, 

47 ServerFingerprintMismatch, 

48 UnixClientConnectorError, 

49 cert_errors, 

50 ssl_errors, 

51) 

52from .client_proto import ResponseHandler 

53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params 

54from .helpers import ( 

55 _SENTINEL, 

56 ceil_timeout, 

57 is_ip_address, 

58 noop, 

59 sentinel, 

60 set_exception, 

61 set_result, 

62) 

63from .log import client_logger 

64from .resolver import DefaultResolver 

65 

66if sys.version_info >= (3, 12): 

67 from collections.abc import Buffer 

68else: 

69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

70 

71if TYPE_CHECKING: 

72 import ssl 

73 

74 SSLContext = ssl.SSLContext 

75else: 

76 try: 

77 import ssl 

78 

79 SSLContext = ssl.SSLContext 

80 except ImportError: # pragma: no cover 

81 ssl = None # type: ignore[assignment] 

82 SSLContext = object # type: ignore[misc,assignment] 

83 

84EMPTY_SCHEMA_SET = frozenset({""}) 

85HTTP_SCHEMA_SET = frozenset({"http", "https"}) 

86WS_SCHEMA_SET = frozenset({"ws", "wss"}) 

87 

88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET 

89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET 

90 

91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < ( 

92 3, 

93 13, 

94 1, 

95) or sys.version_info < (3, 12, 7) 

96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 

97# which first appeared in Python 3.12.7 and 3.13.1 

98 

99 

100__all__ = ( 

101 "BaseConnector", 

102 "TCPConnector", 

103 "UnixConnector", 

104 "NamedPipeConnector", 

105 "AddrInfoType", 

106 "SocketFactoryType", 

107) 

108 

109 

110if TYPE_CHECKING: 

111 from .client import ClientTimeout 

112 from .client_reqrep import ConnectionKey 

113 from .tracing import Trace 

114 

115 

116class _DeprecationWaiter: 

117 __slots__ = ("_awaitable", "_awaited") 

118 

119 def __init__(self, awaitable: Awaitable[Any]) -> None: 

120 self._awaitable = awaitable 

121 self._awaited = False 

122 

123 def __await__(self) -> Any: 

124 self._awaited = True 

125 return self._awaitable.__await__() 

126 

127 def __del__(self) -> None: 

128 if not self._awaited: 

129 warnings.warn( 

130 "Connector.close() is a coroutine, " 

131 "please use await connector.close()", 

132 DeprecationWarning, 

133 ) 

134 

135 

136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None: 

137 """Wait for all waiters to finish closing.""" 

138 results = await asyncio.gather(*waiters, return_exceptions=True) 

139 for res in results: 

140 if isinstance(res, Exception): 

141 client_logger.debug("Error while closing connector: %r", res) 

142 

143 

144class Connection: 

145 

146 _source_traceback = None 

147 

148 def __init__( 

149 self, 

150 connector: "BaseConnector", 

151 key: "ConnectionKey", 

152 protocol: ResponseHandler, 

153 loop: asyncio.AbstractEventLoop, 

154 ) -> None: 

155 self._key = key 

156 self._connector = connector 

157 self._loop = loop 

158 self._protocol: Optional[ResponseHandler] = protocol 

159 self._callbacks: List[Callable[[], None]] = [] 

160 

161 if loop.get_debug(): 

162 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

163 

164 def __repr__(self) -> str: 

165 return f"Connection<{self._key}>" 

166 

167 def __del__(self, _warnings: Any = warnings) -> None: 

168 if self._protocol is not None: 

169 kwargs = {"source": self} 

170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs) 

171 if self._loop.is_closed(): 

172 return 

173 

174 self._connector._release(self._key, self._protocol, should_close=True) 

175 

176 context = {"client_connection": self, "message": "Unclosed connection"} 

177 if self._source_traceback is not None: 

178 context["source_traceback"] = self._source_traceback 

179 self._loop.call_exception_handler(context) 

180 

181 def __bool__(self) -> Literal[True]: 

182 """Force subclasses to not be falsy, to make checks simpler.""" 

183 return True 

184 

185 @property 

186 def loop(self) -> asyncio.AbstractEventLoop: 

187 warnings.warn( 

188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2 

189 ) 

190 return self._loop 

191 

192 @property 

193 def transport(self) -> Optional[asyncio.Transport]: 

194 if self._protocol is None: 

195 return None 

196 return self._protocol.transport 

197 

198 @property 

199 def protocol(self) -> Optional[ResponseHandler]: 

200 return self._protocol 

201 

202 def add_callback(self, callback: Callable[[], None]) -> None: 

203 if callback is not None: 

204 self._callbacks.append(callback) 

205 

206 def _notify_release(self) -> None: 

207 callbacks, self._callbacks = self._callbacks[:], [] 

208 

209 for cb in callbacks: 

210 with suppress(Exception): 

211 cb() 

212 

213 def close(self) -> None: 

214 self._notify_release() 

215 

216 if self._protocol is not None: 

217 self._connector._release(self._key, self._protocol, should_close=True) 

218 self._protocol = None 

219 

220 def release(self) -> None: 

221 self._notify_release() 

222 

223 if self._protocol is not None: 

224 self._connector._release(self._key, self._protocol) 

225 self._protocol = None 

226 

227 @property 

228 def closed(self) -> bool: 

229 return self._protocol is None or not self._protocol.is_connected() 

230 

231 

232class _ConnectTunnelConnection(Connection): 

233 """Special connection wrapper for CONNECT tunnels that must never be pooled. 

234 

235 This connection wraps the proxy connection that will be upgraded with TLS. 

236 It must never be released to the pool because: 

237 1. Its 'closed' future will never complete, causing session.close() to hang 

238 2. It represents an intermediate state, not a reusable connection 

239 3. The real connection (with TLS) will be created separately 

240 """ 

241 

242 def release(self) -> None: 

243 """Do nothing - don't pool or close the connection. 

244 

245 These connections are an intermediate state during the CONNECT tunnel 

246 setup and will be cleaned up naturally after the TLS upgrade. If they 

247 were to be pooled, they would never be properly closed, causing 

248 session.close() to wait forever for their 'closed' future. 

249 """ 

250 

251 

252class _TransportPlaceholder: 

253 """placeholder for BaseConnector.connect function""" 

254 

255 __slots__ = ("closed", "transport") 

256 

257 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None: 

258 """Initialize a placeholder for a transport.""" 

259 self.closed = closed_future 

260 self.transport = None 

261 

262 def close(self) -> None: 

263 """Close the placeholder.""" 

264 

265 def abort(self) -> None: 

266 """Abort the placeholder (does nothing).""" 

267 

268 

269class BaseConnector: 

270 """Base connector class. 

271 

272 keepalive_timeout - (optional) Keep-alive timeout. 

273 force_close - Set to True to force close and do reconnect 

274 after each request (and between redirects). 

275 limit - The total number of simultaneous connections. 

276 limit_per_host - Number of simultaneous connections to one host. 

277 enable_cleanup_closed - Enables clean-up closed ssl transports. 

278 Disabled by default. 

279 timeout_ceil_threshold - Trigger ceiling of timeout values when 

280 it's above timeout_ceil_threshold. 

281 loop - Optional event loop. 

282 """ 

283 

284 _closed = True # prevent AttributeError in __del__ if ctor was failed 

285 _source_traceback = None 

286 

287 # abort transport after 2 seconds (cleanup broken connections) 

288 _cleanup_closed_period = 2.0 

289 

290 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET 

291 

292 def __init__( 

293 self, 

294 *, 

295 keepalive_timeout: Union[object, None, float] = sentinel, 

296 force_close: bool = False, 

297 limit: int = 100, 

298 limit_per_host: int = 0, 

299 enable_cleanup_closed: bool = False, 

300 loop: Optional[asyncio.AbstractEventLoop] = None, 

301 timeout_ceil_threshold: float = 5, 

302 ) -> None: 

303 

304 if force_close: 

305 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

306 raise ValueError( 

307 "keepalive_timeout cannot be set if force_close is True" 

308 ) 

309 else: 

310 if keepalive_timeout is sentinel: 

311 keepalive_timeout = 15.0 

312 

313 loop = loop or asyncio.get_running_loop() 

314 self._timeout_ceil_threshold = timeout_ceil_threshold 

315 

316 self._closed = False 

317 if loop.get_debug(): 

318 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

319 

320 # Connection pool of reusable connections. 

321 # We use a deque to store connections because it has O(1) popleft() 

322 # and O(1) append() operations to implement a FIFO queue. 

323 self._conns: DefaultDict[ 

324 ConnectionKey, Deque[Tuple[ResponseHandler, float]] 

325 ] = defaultdict(deque) 

326 self._limit = limit 

327 self._limit_per_host = limit_per_host 

328 self._acquired: Set[ResponseHandler] = set() 

329 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = ( 

330 defaultdict(set) 

331 ) 

332 self._keepalive_timeout = cast(float, keepalive_timeout) 

333 self._force_close = force_close 

334 

335 # {host_key: FIFO list of waiters} 

336 # The FIFO is implemented with an OrderedDict with None keys because 

337 # python does not have an ordered set. 

338 self._waiters: DefaultDict[ 

339 ConnectionKey, OrderedDict[asyncio.Future[None], None] 

340 ] = defaultdict(OrderedDict) 

341 

342 self._loop = loop 

343 self._factory = functools.partial(ResponseHandler, loop=loop) 

344 

345 # start keep-alive connection cleanup task 

346 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

347 

348 # start cleanup closed transports task 

349 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

350 

351 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED: 

352 warnings.warn( 

353 "enable_cleanup_closed ignored because " 

354 "https://github.com/python/cpython/pull/118960 is fixed " 

355 f"in Python version {sys.version_info}", 

356 DeprecationWarning, 

357 stacklevel=2, 

358 ) 

359 enable_cleanup_closed = False 

360 

361 self._cleanup_closed_disabled = not enable_cleanup_closed 

362 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

363 self._placeholder_future: asyncio.Future[Optional[Exception]] = ( 

364 loop.create_future() 

365 ) 

366 self._placeholder_future.set_result(None) 

367 self._cleanup_closed() 

368 

369 def __del__(self, _warnings: Any = warnings) -> None: 

370 if self._closed: 

371 return 

372 if not self._conns: 

373 return 

374 

375 conns = [repr(c) for c in self._conns.values()] 

376 

377 self._close() 

378 

379 kwargs = {"source": self} 

380 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs) 

381 context = { 

382 "connector": self, 

383 "connections": conns, 

384 "message": "Unclosed connector", 

385 } 

386 if self._source_traceback is not None: 

387 context["source_traceback"] = self._source_traceback 

388 self._loop.call_exception_handler(context) 

389 

390 def __enter__(self) -> "BaseConnector": 

391 warnings.warn( 

392 '"with Connector():" is deprecated, ' 

393 'use "async with Connector():" instead', 

394 DeprecationWarning, 

395 ) 

396 return self 

397 

398 def __exit__(self, *exc: Any) -> None: 

399 self._close() 

400 

401 async def __aenter__(self) -> "BaseConnector": 

402 return self 

403 

404 async def __aexit__( 

405 self, 

406 exc_type: Optional[Type[BaseException]] = None, 

407 exc_value: Optional[BaseException] = None, 

408 exc_traceback: Optional[TracebackType] = None, 

409 ) -> None: 

410 await self.close() 

411 

412 @property 

413 def force_close(self) -> bool: 

414 """Ultimately close connection on releasing if True.""" 

415 return self._force_close 

416 

417 @property 

418 def limit(self) -> int: 

419 """The total number for simultaneous connections. 

420 

421 If limit is 0 the connector has no limit. 

422 The default limit size is 100. 

423 """ 

424 return self._limit 

425 

426 @property 

427 def limit_per_host(self) -> int: 

428 """The limit for simultaneous connections to the same endpoint. 

429 

430 Endpoints are the same if they are have equal 

431 (host, port, is_ssl) triple. 

432 """ 

433 return self._limit_per_host 

434 

435 def _cleanup(self) -> None: 

436 """Cleanup unused transports.""" 

437 if self._cleanup_handle: 

438 self._cleanup_handle.cancel() 

439 # _cleanup_handle should be unset, otherwise _release() will not 

440 # recreate it ever! 

441 self._cleanup_handle = None 

442 

443 now = monotonic() 

444 timeout = self._keepalive_timeout 

445 

446 if self._conns: 

447 connections = defaultdict(deque) 

448 deadline = now - timeout 

449 for key, conns in self._conns.items(): 

450 alive: Deque[Tuple[ResponseHandler, float]] = deque() 

451 for proto, use_time in conns: 

452 if proto.is_connected() and use_time - deadline >= 0: 

453 alive.append((proto, use_time)) 

454 continue 

455 transport = proto.transport 

456 proto.close() 

457 if not self._cleanup_closed_disabled and key.is_ssl: 

458 self._cleanup_closed_transports.append(transport) 

459 

460 if alive: 

461 connections[key] = alive 

462 

463 self._conns = connections 

464 

465 if self._conns: 

466 self._cleanup_handle = helpers.weakref_handle( 

467 self, 

468 "_cleanup", 

469 timeout, 

470 self._loop, 

471 timeout_ceil_threshold=self._timeout_ceil_threshold, 

472 ) 

473 

474 def _cleanup_closed(self) -> None: 

475 """Double confirmation for transport close. 

476 

477 Some broken ssl servers may leave socket open without proper close. 

478 """ 

479 if self._cleanup_closed_handle: 

480 self._cleanup_closed_handle.cancel() 

481 

482 for transport in self._cleanup_closed_transports: 

483 if transport is not None: 

484 transport.abort() 

485 

486 self._cleanup_closed_transports = [] 

487 

488 if not self._cleanup_closed_disabled: 

489 self._cleanup_closed_handle = helpers.weakref_handle( 

490 self, 

491 "_cleanup_closed", 

492 self._cleanup_closed_period, 

493 self._loop, 

494 timeout_ceil_threshold=self._timeout_ceil_threshold, 

495 ) 

496 

497 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]: 

498 """Close all opened transports. 

499 

500 :param abort_ssl: If True, SSL connections will be aborted immediately 

501 without performing the shutdown handshake. This provides 

502 faster cleanup at the cost of less graceful disconnection. 

503 """ 

504 if not (waiters := self._close(abort_ssl=abort_ssl)): 

505 # If there are no connections to close, we can return a noop 

506 # awaitable to avoid scheduling a task on the event loop. 

507 return _DeprecationWaiter(noop()) 

508 coro = _wait_for_close(waiters) 

509 if sys.version_info >= (3, 12): 

510 # Optimization for Python 3.12, try to close connections 

511 # immediately to avoid having to schedule the task on the event loop. 

512 task = asyncio.Task(coro, loop=self._loop, eager_start=True) 

513 else: 

514 task = self._loop.create_task(coro) 

515 return _DeprecationWaiter(task) 

516 

517 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

518 waiters: List[Awaitable[object]] = [] 

519 

520 if self._closed: 

521 return waiters 

522 

523 self._closed = True 

524 

525 try: 

526 if self._loop.is_closed(): 

527 return waiters 

528 

529 # cancel cleanup task 

530 if self._cleanup_handle: 

531 self._cleanup_handle.cancel() 

532 

533 # cancel cleanup close task 

534 if self._cleanup_closed_handle: 

535 self._cleanup_closed_handle.cancel() 

536 

537 for data in self._conns.values(): 

538 for proto, _ in data: 

539 if ( 

540 abort_ssl 

541 and proto.transport 

542 and proto.transport.get_extra_info("sslcontext") is not None 

543 ): 

544 proto.abort() 

545 else: 

546 proto.close() 

547 if closed := proto.closed: 

548 waiters.append(closed) 

549 

550 for proto in self._acquired: 

551 if ( 

552 abort_ssl 

553 and proto.transport 

554 and proto.transport.get_extra_info("sslcontext") is not None 

555 ): 

556 proto.abort() 

557 else: 

558 proto.close() 

559 if closed := proto.closed: 

560 waiters.append(closed) 

561 

562 for transport in self._cleanup_closed_transports: 

563 if transport is not None: 

564 transport.abort() 

565 

566 return waiters 

567 

568 finally: 

569 self._conns.clear() 

570 self._acquired.clear() 

571 for keyed_waiters in self._waiters.values(): 

572 for keyed_waiter in keyed_waiters: 

573 keyed_waiter.cancel() 

574 self._waiters.clear() 

575 self._cleanup_handle = None 

576 self._cleanup_closed_transports.clear() 

577 self._cleanup_closed_handle = None 

578 

579 @property 

580 def closed(self) -> bool: 

581 """Is connector closed. 

582 

583 A readonly property. 

584 """ 

585 return self._closed 

586 

587 def _available_connections(self, key: "ConnectionKey") -> int: 

588 """ 

589 Return number of available connections. 

590 

591 The limit, limit_per_host and the connection key are taken into account. 

592 

593 If it returns less than 1 means that there are no connections 

594 available. 

595 """ 

596 # check total available connections 

597 # If there are no limits, this will always return 1 

598 total_remain = 1 

599 

600 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: 

601 return total_remain 

602 

603 # check limit per host 

604 if host_remain := self._limit_per_host: 

605 if acquired := self._acquired_per_host.get(key): 

606 host_remain -= len(acquired) 

607 if total_remain > host_remain: 

608 return host_remain 

609 

610 return total_remain 

611 

612 async def connect( 

613 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

614 ) -> Connection: 

615 """Get from pool or create new connection.""" 

616 key = req.connection_key 

617 if (conn := await self._get(key, traces)) is not None: 

618 # If we do not have to wait and we can get a connection from the pool 

619 # we can avoid the timeout ceil logic and directly return the connection 

620 return conn 

621 

622 async with ceil_timeout(timeout.connect, timeout.ceil_threshold): 

623 if self._available_connections(key) <= 0: 

624 await self._wait_for_available_connection(key, traces) 

625 if (conn := await self._get(key, traces)) is not None: 

626 return conn 

627 

628 placeholder = cast( 

629 ResponseHandler, _TransportPlaceholder(self._placeholder_future) 

630 ) 

631 self._acquired.add(placeholder) 

632 if self._limit_per_host: 

633 self._acquired_per_host[key].add(placeholder) 

634 

635 try: 

636 # Traces are done inside the try block to ensure that the 

637 # that the placeholder is still cleaned up if an exception 

638 # is raised. 

639 if traces: 

640 for trace in traces: 

641 await trace.send_connection_create_start() 

642 proto = await self._create_connection(req, traces, timeout) 

643 if traces: 

644 for trace in traces: 

645 await trace.send_connection_create_end() 

646 except BaseException: 

647 self._release_acquired(key, placeholder) 

648 raise 

649 else: 

650 if self._closed: 

651 proto.close() 

652 raise ClientConnectionError("Connector is closed.") 

653 

654 # The connection was successfully created, drop the placeholder 

655 # and add the real connection to the acquired set. There should 

656 # be no awaits after the proto is added to the acquired set 

657 # to ensure that the connection is not left in the acquired set 

658 # on cancellation. 

659 self._acquired.remove(placeholder) 

660 self._acquired.add(proto) 

661 if self._limit_per_host: 

662 acquired_per_host = self._acquired_per_host[key] 

663 acquired_per_host.remove(placeholder) 

664 acquired_per_host.add(proto) 

665 return Connection(self, key, proto, self._loop) 

666 

667 async def _wait_for_available_connection( 

668 self, key: "ConnectionKey", traces: List["Trace"] 

669 ) -> None: 

670 """Wait for an available connection slot.""" 

671 # We loop here because there is a race between 

672 # the connection limit check and the connection 

673 # being acquired. If the connection is acquired 

674 # between the check and the await statement, we 

675 # need to loop again to check if the connection 

676 # slot is still available. 

677 attempts = 0 

678 while True: 

679 fut: asyncio.Future[None] = self._loop.create_future() 

680 keyed_waiters = self._waiters[key] 

681 keyed_waiters[fut] = None 

682 if attempts: 

683 # If we have waited before, we need to move the waiter 

684 # to the front of the queue as otherwise we might get 

685 # starved and hit the timeout. 

686 keyed_waiters.move_to_end(fut, last=False) 

687 

688 try: 

689 # Traces happen in the try block to ensure that the 

690 # the waiter is still cleaned up if an exception is raised. 

691 if traces: 

692 for trace in traces: 

693 await trace.send_connection_queued_start() 

694 await fut 

695 if traces: 

696 for trace in traces: 

697 await trace.send_connection_queued_end() 

698 finally: 

699 # pop the waiter from the queue if its still 

700 # there and not already removed by _release_waiter 

701 keyed_waiters.pop(fut, None) 

702 if not self._waiters.get(key, True): 

703 del self._waiters[key] 

704 

705 if self._available_connections(key) > 0: 

706 break 

707 attempts += 1 

708 

709 async def _get( 

710 self, key: "ConnectionKey", traces: List["Trace"] 

711 ) -> Optional[Connection]: 

712 """Get next reusable connection for the key or None. 

713 

714 The connection will be marked as acquired. 

715 """ 

716 if (conns := self._conns.get(key)) is None: 

717 return None 

718 

719 t1 = monotonic() 

720 while conns: 

721 proto, t0 = conns.popleft() 

722 # We will we reuse the connection if its connected and 

723 # the keepalive timeout has not been exceeded 

724 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: 

725 if not conns: 

726 # The very last connection was reclaimed: drop the key 

727 del self._conns[key] 

728 self._acquired.add(proto) 

729 if self._limit_per_host: 

730 self._acquired_per_host[key].add(proto) 

731 if traces: 

732 for trace in traces: 

733 try: 

734 await trace.send_connection_reuseconn() 

735 except BaseException: 

736 self._release_acquired(key, proto) 

737 raise 

738 return Connection(self, key, proto, self._loop) 

739 

740 # Connection cannot be reused, close it 

741 transport = proto.transport 

742 proto.close() 

743 # only for SSL transports 

744 if not self._cleanup_closed_disabled and key.is_ssl: 

745 self._cleanup_closed_transports.append(transport) 

746 

747 # No more connections: drop the key 

748 del self._conns[key] 

749 return None 

750 

751 def _release_waiter(self) -> None: 

752 """ 

753 Iterates over all waiters until one to be released is found. 

754 

755 The one to be released is not finished and 

756 belongs to a host that has available connections. 

757 """ 

758 if not self._waiters: 

759 return 

760 

761 # Having the dict keys ordered this avoids to iterate 

762 # at the same order at each call. 

763 queues = list(self._waiters) 

764 random.shuffle(queues) 

765 

766 for key in queues: 

767 if self._available_connections(key) < 1: 

768 continue 

769 

770 waiters = self._waiters[key] 

771 while waiters: 

772 waiter, _ = waiters.popitem(last=False) 

773 if not waiter.done(): 

774 waiter.set_result(None) 

775 return 

776 

777 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

778 """Release acquired connection.""" 

779 if self._closed: 

780 # acquired connection is already released on connector closing 

781 return 

782 

783 self._acquired.discard(proto) 

784 if self._limit_per_host and (conns := self._acquired_per_host.get(key)): 

785 conns.discard(proto) 

786 if not conns: 

787 del self._acquired_per_host[key] 

788 self._release_waiter() 

789 

790 def _release( 

791 self, 

792 key: "ConnectionKey", 

793 protocol: ResponseHandler, 

794 *, 

795 should_close: bool = False, 

796 ) -> None: 

797 if self._closed: 

798 # acquired connection is already released on connector closing 

799 return 

800 

801 self._release_acquired(key, protocol) 

802 

803 if self._force_close or should_close or protocol.should_close: 

804 transport = protocol.transport 

805 protocol.close() 

806 

807 if key.is_ssl and not self._cleanup_closed_disabled: 

808 self._cleanup_closed_transports.append(transport) 

809 return 

810 

811 self._conns[key].append((protocol, monotonic())) 

812 

813 if self._cleanup_handle is None: 

814 self._cleanup_handle = helpers.weakref_handle( 

815 self, 

816 "_cleanup", 

817 self._keepalive_timeout, 

818 self._loop, 

819 timeout_ceil_threshold=self._timeout_ceil_threshold, 

820 ) 

821 

822 async def _create_connection( 

823 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

824 ) -> ResponseHandler: 

825 raise NotImplementedError() 

826 

827 

828class _DNSCacheTable: 

829 def __init__(self, ttl: Optional[float] = None) -> None: 

830 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {} 

831 self._timestamps: Dict[Tuple[str, int], float] = {} 

832 self._ttl = ttl 

833 

834 def __contains__(self, host: object) -> bool: 

835 return host in self._addrs_rr 

836 

837 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None: 

838 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

839 

840 if self._ttl is not None: 

841 self._timestamps[key] = monotonic() 

842 

843 def remove(self, key: Tuple[str, int]) -> None: 

844 self._addrs_rr.pop(key, None) 

845 

846 if self._ttl is not None: 

847 self._timestamps.pop(key, None) 

848 

849 def clear(self) -> None: 

850 self._addrs_rr.clear() 

851 self._timestamps.clear() 

852 

853 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]: 

854 loop, length = self._addrs_rr[key] 

855 addrs = list(islice(loop, length)) 

856 # Consume one more element to shift internal state of `cycle` 

857 next(loop) 

858 return addrs 

859 

860 def expired(self, key: Tuple[str, int]) -> bool: 

861 if self._ttl is None: 

862 return False 

863 

864 return self._timestamps[key] + self._ttl < monotonic() 

865 

866 

867def _make_ssl_context(verified: bool) -> SSLContext: 

868 """Create SSL context. 

869 

870 This method is not async-friendly and should be called from a thread 

871 because it will load certificates from disk and do other blocking I/O. 

872 """ 

873 if ssl is None: 

874 # No ssl support 

875 return None 

876 if verified: 

877 sslcontext = ssl.create_default_context() 

878 else: 

879 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

880 sslcontext.options |= ssl.OP_NO_SSLv2 

881 sslcontext.options |= ssl.OP_NO_SSLv3 

882 sslcontext.check_hostname = False 

883 sslcontext.verify_mode = ssl.CERT_NONE 

884 sslcontext.options |= ssl.OP_NO_COMPRESSION 

885 sslcontext.set_default_verify_paths() 

886 sslcontext.set_alpn_protocols(("http/1.1",)) 

887 return sslcontext 

888 

889 

890# The default SSLContext objects are created at import time 

891# since they do blocking I/O to load certificates from disk, 

892# and imports should always be done before the event loop starts 

893# or in a thread. 

894_SSL_CONTEXT_VERIFIED = _make_ssl_context(True) 

895_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False) 

896 

897 

898class TCPConnector(BaseConnector): 

899 """TCP connector. 

900 

901 verify_ssl - Set to True to check ssl certifications. 

902 fingerprint - Pass the binary sha256 

903 digest of the expected certificate in DER format to verify 

904 that the certificate the server presents matches. See also 

905 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning 

906 resolver - Enable DNS lookups and use this 

907 resolver 

908 use_dns_cache - Use memory cache for DNS lookups. 

909 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

910 family - socket address family 

911 local_addr - local tuple of (host, port) to bind socket to 

912 

913 keepalive_timeout - (optional) Keep-alive timeout. 

914 force_close - Set to True to force close and do reconnect 

915 after each request (and between redirects). 

916 limit - The total number of simultaneous connections. 

917 limit_per_host - Number of simultaneous connections to one host. 

918 enable_cleanup_closed - Enables clean-up closed ssl transports. 

919 Disabled by default. 

920 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

921 as defined in RFC 8305. To disable 

922 the happy eyeballs algorithm, set to None. 

923 interleave - “First Address Family Count” as defined in RFC 8305 

924 loop - Optional event loop. 

925 socket_factory - A SocketFactoryType function that, if supplied, 

926 will be used to create sockets given an 

927 AddrInfoType. 

928 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0. 

929 Grace period for SSL shutdown handshake on TLS 

930 connections. Default is 0 seconds (immediate abort). 

931 This parameter allowed for a clean SSL shutdown by 

932 notifying the remote peer of connection closure, 

933 while avoiding excessive delays during connector cleanup. 

934 Note: Only takes effect on Python 3.11+. 

935 """ 

936 

937 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"}) 

938 

939 def __init__( 

940 self, 

941 *, 

942 verify_ssl: bool = True, 

943 fingerprint: Optional[bytes] = None, 

944 use_dns_cache: bool = True, 

945 ttl_dns_cache: Optional[int] = 10, 

946 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC, 

947 ssl_context: Optional[SSLContext] = None, 

948 ssl: Union[bool, Fingerprint, SSLContext] = True, 

949 local_addr: Optional[Tuple[str, int]] = None, 

950 resolver: Optional[AbstractResolver] = None, 

951 keepalive_timeout: Union[None, float, object] = sentinel, 

952 force_close: bool = False, 

953 limit: int = 100, 

954 limit_per_host: int = 0, 

955 enable_cleanup_closed: bool = False, 

956 loop: Optional[asyncio.AbstractEventLoop] = None, 

957 timeout_ceil_threshold: float = 5, 

958 happy_eyeballs_delay: Optional[float] = 0.25, 

959 interleave: Optional[int] = None, 

960 socket_factory: Optional[SocketFactoryType] = None, 

961 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel, 

962 ): 

963 super().__init__( 

964 keepalive_timeout=keepalive_timeout, 

965 force_close=force_close, 

966 limit=limit, 

967 limit_per_host=limit_per_host, 

968 enable_cleanup_closed=enable_cleanup_closed, 

969 loop=loop, 

970 timeout_ceil_threshold=timeout_ceil_threshold, 

971 ) 

972 

973 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

974 

975 self._resolver: AbstractResolver 

976 if resolver is None: 

977 self._resolver = DefaultResolver(loop=self._loop) 

978 self._resolver_owner = True 

979 else: 

980 self._resolver = resolver 

981 self._resolver_owner = False 

982 

983 self._use_dns_cache = use_dns_cache 

984 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 

985 self._throttle_dns_futures: Dict[ 

986 Tuple[str, int], Set["asyncio.Future[None]"] 

987 ] = {} 

988 self._family = family 

989 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

990 self._happy_eyeballs_delay = happy_eyeballs_delay 

991 self._interleave = interleave 

992 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set() 

993 self._socket_factory = socket_factory 

994 self._ssl_shutdown_timeout: Optional[float] 

995 # Handle ssl_shutdown_timeout with warning for Python < 3.11 

996 if ssl_shutdown_timeout is sentinel: 

997 self._ssl_shutdown_timeout = 0 

998 else: 

999 # Deprecation warning for ssl_shutdown_timeout parameter 

1000 warnings.warn( 

1001 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0", 

1002 DeprecationWarning, 

1003 stacklevel=2, 

1004 ) 

1005 if ( 

1006 sys.version_info < (3, 11) 

1007 and ssl_shutdown_timeout is not None 

1008 and ssl_shutdown_timeout != 0 

1009 ): 

1010 warnings.warn( 

1011 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; " 

1012 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.", 

1013 RuntimeWarning, 

1014 stacklevel=2, 

1015 ) 

1016 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

1017 

1018 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

1019 """Close all ongoing DNS calls.""" 

1020 for fut in chain.from_iterable(self._throttle_dns_futures.values()): 

1021 fut.cancel() 

1022 

1023 waiters = super()._close(abort_ssl=abort_ssl) 

1024 

1025 for t in self._resolve_host_tasks: 

1026 t.cancel() 

1027 waiters.append(t) 

1028 

1029 return waiters 

1030 

1031 async def close(self, *, abort_ssl: bool = False) -> None: 

1032 """ 

1033 Close all opened transports. 

1034 

1035 :param abort_ssl: If True, SSL connections will be aborted immediately 

1036 without performing the shutdown handshake. If False (default), 

1037 the behavior is determined by ssl_shutdown_timeout: 

1038 - If ssl_shutdown_timeout=0: connections are aborted 

1039 - If ssl_shutdown_timeout>0: graceful shutdown is performed 

1040 """ 

1041 if self._resolver_owner: 

1042 await self._resolver.close() 

1043 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default 

1044 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0) 

1045 

1046 @property 

1047 def family(self) -> int: 

1048 """Socket family like AF_INET.""" 

1049 return self._family 

1050 

1051 @property 

1052 def use_dns_cache(self) -> bool: 

1053 """True if local DNS caching is enabled.""" 

1054 return self._use_dns_cache 

1055 

1056 def clear_dns_cache( 

1057 self, host: Optional[str] = None, port: Optional[int] = None 

1058 ) -> None: 

1059 """Remove specified host/port or clear all dns local cache.""" 

1060 if host is not None and port is not None: 

1061 self._cached_hosts.remove((host, port)) 

1062 elif host is not None or port is not None: 

1063 raise ValueError("either both host and port or none of them are allowed") 

1064 else: 

1065 self._cached_hosts.clear() 

1066 

1067 async def _resolve_host( 

1068 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None 

1069 ) -> List[ResolveResult]: 

1070 """Resolve host and return list of addresses.""" 

1071 if is_ip_address(host): 

1072 return [ 

1073 { 

1074 "hostname": host, 

1075 "host": host, 

1076 "port": port, 

1077 "family": self._family, 

1078 "proto": 0, 

1079 "flags": 0, 

1080 } 

1081 ] 

1082 

1083 if not self._use_dns_cache: 

1084 

1085 if traces: 

1086 for trace in traces: 

1087 await trace.send_dns_resolvehost_start(host) 

1088 

1089 res = await self._resolver.resolve(host, port, family=self._family) 

1090 

1091 if traces: 

1092 for trace in traces: 

1093 await trace.send_dns_resolvehost_end(host) 

1094 

1095 return res 

1096 

1097 key = (host, port) 

1098 if key in self._cached_hosts and not self._cached_hosts.expired(key): 

1099 # get result early, before any await (#4014) 

1100 result = self._cached_hosts.next_addrs(key) 

1101 

1102 if traces: 

1103 for trace in traces: 

1104 await trace.send_dns_cache_hit(host) 

1105 return result 

1106 

1107 futures: Set["asyncio.Future[None]"] 

1108 # 

1109 # If multiple connectors are resolving the same host, we wait 

1110 # for the first one to resolve and then use the result for all of them. 

1111 # We use a throttle to ensure that we only resolve the host once 

1112 # and then use the result for all the waiters. 

1113 # 

1114 if key in self._throttle_dns_futures: 

1115 # get futures early, before any await (#4014) 

1116 futures = self._throttle_dns_futures[key] 

1117 future: asyncio.Future[None] = self._loop.create_future() 

1118 futures.add(future) 

1119 if traces: 

1120 for trace in traces: 

1121 await trace.send_dns_cache_hit(host) 

1122 try: 

1123 await future 

1124 finally: 

1125 futures.discard(future) 

1126 return self._cached_hosts.next_addrs(key) 

1127 

1128 # update dict early, before any await (#4014) 

1129 self._throttle_dns_futures[key] = futures = set() 

1130 # In this case we need to create a task to ensure that we can shield 

1131 # the task from cancellation as cancelling this lookup should not cancel 

1132 # the underlying lookup or else the cancel event will get broadcast to 

1133 # all the waiters across all connections. 

1134 # 

1135 coro = self._resolve_host_with_throttle(key, host, port, futures, traces) 

1136 loop = asyncio.get_running_loop() 

1137 if sys.version_info >= (3, 12): 

1138 # Optimization for Python 3.12, try to send immediately 

1139 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) 

1140 else: 

1141 resolved_host_task = loop.create_task(coro) 

1142 

1143 if not resolved_host_task.done(): 

1144 self._resolve_host_tasks.add(resolved_host_task) 

1145 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard) 

1146 

1147 try: 

1148 return await asyncio.shield(resolved_host_task) 

1149 except asyncio.CancelledError: 

1150 

1151 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None: 

1152 with suppress(Exception, asyncio.CancelledError): 

1153 fut.result() 

1154 

1155 resolved_host_task.add_done_callback(drop_exception) 

1156 raise 

1157 

1158 async def _resolve_host_with_throttle( 

1159 self, 

1160 key: Tuple[str, int], 

1161 host: str, 

1162 port: int, 

1163 futures: Set["asyncio.Future[None]"], 

1164 traces: Optional[Sequence["Trace"]], 

1165 ) -> List[ResolveResult]: 

1166 """Resolve host and set result for all waiters. 

1167 

1168 This method must be run in a task and shielded from cancellation 

1169 to avoid cancelling the underlying lookup. 

1170 """ 

1171 try: 

1172 if traces: 

1173 for trace in traces: 

1174 await trace.send_dns_cache_miss(host) 

1175 

1176 for trace in traces: 

1177 await trace.send_dns_resolvehost_start(host) 

1178 

1179 addrs = await self._resolver.resolve(host, port, family=self._family) 

1180 if traces: 

1181 for trace in traces: 

1182 await trace.send_dns_resolvehost_end(host) 

1183 

1184 self._cached_hosts.add(key, addrs) 

1185 for fut in futures: 

1186 set_result(fut, None) 

1187 except BaseException as e: 

1188 # any DNS exception is set for the waiters to raise the same exception. 

1189 # This coro is always run in task that is shielded from cancellation so 

1190 # we should never be propagating cancellation here. 

1191 for fut in futures: 

1192 set_exception(fut, e) 

1193 raise 

1194 finally: 

1195 self._throttle_dns_futures.pop(key) 

1196 

1197 return self._cached_hosts.next_addrs(key) 

1198 

1199 async def _create_connection( 

1200 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1201 ) -> ResponseHandler: 

1202 """Create connection. 

1203 

1204 Has same keyword arguments as BaseEventLoop.create_connection. 

1205 """ 

1206 if req.proxy: 

1207 _, proto = await self._create_proxy_connection(req, traces, timeout) 

1208 else: 

1209 _, proto = await self._create_direct_connection(req, traces, timeout) 

1210 

1211 return proto 

1212 

1213 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]: 

1214 """Logic to get the correct SSL context 

1215 

1216 0. if req.ssl is false, return None 

1217 

1218 1. if ssl_context is specified in req, use it 

1219 2. if _ssl_context is specified in self, use it 

1220 3. otherwise: 

1221 1. if verify_ssl is not specified in req, use self.ssl_context 

1222 (will generate a default context according to self.verify_ssl) 

1223 2. if verify_ssl is True in req, generate a default SSL context 

1224 3. if verify_ssl is False in req, generate a SSL context that 

1225 won't verify 

1226 """ 

1227 if not req.is_ssl(): 

1228 return None 

1229 

1230 if ssl is None: # pragma: no cover 

1231 raise RuntimeError("SSL is not supported.") 

1232 sslcontext = req.ssl 

1233 if isinstance(sslcontext, ssl.SSLContext): 

1234 return sslcontext 

1235 if sslcontext is not True: 

1236 # not verified or fingerprinted 

1237 return _SSL_CONTEXT_UNVERIFIED 

1238 sslcontext = self._ssl 

1239 if isinstance(sslcontext, ssl.SSLContext): 

1240 return sslcontext 

1241 if sslcontext is not True: 

1242 # not verified or fingerprinted 

1243 return _SSL_CONTEXT_UNVERIFIED 

1244 return _SSL_CONTEXT_VERIFIED 

1245 

1246 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]: 

1247 ret = req.ssl 

1248 if isinstance(ret, Fingerprint): 

1249 return ret 

1250 ret = self._ssl 

1251 if isinstance(ret, Fingerprint): 

1252 return ret 

1253 return None 

1254 

1255 async def _wrap_create_connection( 

1256 self, 

1257 *args: Any, 

1258 addr_infos: List[AddrInfoType], 

1259 req: ClientRequest, 

1260 timeout: "ClientTimeout", 

1261 client_error: Type[Exception] = ClientConnectorError, 

1262 **kwargs: Any, 

1263 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1264 try: 

1265 async with ceil_timeout( 

1266 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1267 ): 

1268 sock = await aiohappyeyeballs.start_connection( 

1269 addr_infos=addr_infos, 

1270 local_addr_infos=self._local_addr_infos, 

1271 happy_eyeballs_delay=self._happy_eyeballs_delay, 

1272 interleave=self._interleave, 

1273 loop=self._loop, 

1274 socket_factory=self._socket_factory, 

1275 ) 

1276 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used 

1277 if ( 

1278 kwargs.get("ssl") 

1279 and self._ssl_shutdown_timeout 

1280 and sys.version_info >= (3, 11) 

1281 ): 

1282 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout 

1283 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

1284 except cert_errors as exc: 

1285 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1286 except ssl_errors as exc: 

1287 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1288 except OSError as exc: 

1289 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1290 raise 

1291 raise client_error(req.connection_key, exc) from exc 

1292 

1293 async def _wrap_existing_connection( 

1294 self, 

1295 *args: Any, 

1296 req: ClientRequest, 

1297 timeout: "ClientTimeout", 

1298 client_error: Type[Exception] = ClientConnectorError, 

1299 **kwargs: Any, 

1300 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1301 try: 

1302 async with ceil_timeout( 

1303 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1304 ): 

1305 return await self._loop.create_connection(*args, **kwargs) 

1306 except cert_errors as exc: 

1307 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1308 except ssl_errors as exc: 

1309 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1310 except OSError as exc: 

1311 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1312 raise 

1313 raise client_error(req.connection_key, exc) from exc 

1314 

1315 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None: 

1316 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``. 

1317 

1318 It is necessary for TLS-in-TLS so that it is possible to 

1319 send HTTPS queries through HTTPS proxies. 

1320 

1321 This doesn't affect regular HTTP requests, though. 

1322 """ 

1323 if not req.is_ssl(): 

1324 return 

1325 

1326 proxy_url = req.proxy 

1327 assert proxy_url is not None 

1328 if proxy_url.scheme != "https": 

1329 return 

1330 

1331 self._check_loop_for_start_tls() 

1332 

1333 def _check_loop_for_start_tls(self) -> None: 

1334 try: 

1335 self._loop.start_tls 

1336 except AttributeError as attr_exc: 

1337 raise RuntimeError( 

1338 "An HTTPS request is being sent through an HTTPS proxy. " 

1339 "This needs support for TLS in TLS but it is not implemented " 

1340 "in your runtime for the stdlib asyncio.\n\n" 

1341 "Please upgrade to Python 3.11 or higher. For more details, " 

1342 "please see:\n" 

1343 "* https://bugs.python.org/issue37179\n" 

1344 "* https://github.com/python/cpython/pull/28073\n" 

1345 "* https://docs.aiohttp.org/en/stable/" 

1346 "client_advanced.html#proxy-support\n" 

1347 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1348 ) from attr_exc 

1349 

1350 def _loop_supports_start_tls(self) -> bool: 

1351 try: 

1352 self._check_loop_for_start_tls() 

1353 except RuntimeError: 

1354 return False 

1355 else: 

1356 return True 

1357 

1358 def _warn_about_tls_in_tls( 

1359 self, 

1360 underlying_transport: asyncio.Transport, 

1361 req: ClientRequest, 

1362 ) -> None: 

1363 """Issue a warning if the requested URL has HTTPS scheme.""" 

1364 if req.request_info.url.scheme != "https": 

1365 return 

1366 

1367 # Check if uvloop is being used, which supports TLS in TLS, 

1368 # otherwise assume that asyncio's native transport is being used. 

1369 if type(underlying_transport).__module__.startswith("uvloop"): 

1370 return 

1371 

1372 # Support in asyncio was added in Python 3.11 (bpo-44011) 

1373 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr( 

1374 underlying_transport, 

1375 "_start_tls_compatible", 

1376 False, 

1377 ) 

1378 

1379 if asyncio_supports_tls_in_tls: 

1380 return 

1381 

1382 warnings.warn( 

1383 "An HTTPS request is being sent through an HTTPS proxy. " 

1384 "This support for TLS in TLS is known to be disabled " 

1385 "in the stdlib asyncio (Python <3.11). This is why you'll probably see " 

1386 "an error in the log below.\n\n" 

1387 "It is possible to enable it via monkeypatching. " 

1388 "For more details, see:\n" 

1389 "* https://bugs.python.org/issue37179\n" 

1390 "* https://github.com/python/cpython/pull/28073\n\n" 

1391 "You can temporarily patch this as follows:\n" 

1392 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1393 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1394 RuntimeWarning, 

1395 source=self, 

1396 # Why `4`? At least 3 of the calls in the stack originate 

1397 # from the methods in this class. 

1398 stacklevel=3, 

1399 ) 

1400 

1401 async def _start_tls_connection( 

1402 self, 

1403 underlying_transport: asyncio.Transport, 

1404 req: ClientRequest, 

1405 timeout: "ClientTimeout", 

1406 client_error: Type[Exception] = ClientConnectorError, 

1407 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1408 """Wrap the raw TCP transport with TLS.""" 

1409 tls_proto = self._factory() # Create a brand new proto for TLS 

1410 sslcontext = self._get_ssl_context(req) 

1411 if TYPE_CHECKING: 

1412 # _start_tls_connection is unreachable in the current code path 

1413 # if sslcontext is None. 

1414 assert sslcontext is not None 

1415 

1416 try: 

1417 async with ceil_timeout( 

1418 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1419 ): 

1420 try: 

1421 # ssl_shutdown_timeout is only available in Python 3.11+ 

1422 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout: 

1423 tls_transport = await self._loop.start_tls( 

1424 underlying_transport, 

1425 tls_proto, 

1426 sslcontext, 

1427 server_hostname=req.server_hostname or req.host, 

1428 ssl_handshake_timeout=timeout.total, 

1429 ssl_shutdown_timeout=self._ssl_shutdown_timeout, 

1430 ) 

1431 else: 

1432 tls_transport = await self._loop.start_tls( 

1433 underlying_transport, 

1434 tls_proto, 

1435 sslcontext, 

1436 server_hostname=req.server_hostname or req.host, 

1437 ssl_handshake_timeout=timeout.total, 

1438 ) 

1439 except BaseException: 

1440 # We need to close the underlying transport since 

1441 # `start_tls()` probably failed before it had a 

1442 # chance to do this: 

1443 if self._ssl_shutdown_timeout == 0: 

1444 underlying_transport.abort() 

1445 else: 

1446 underlying_transport.close() 

1447 raise 

1448 if isinstance(tls_transport, asyncio.Transport): 

1449 fingerprint = self._get_fingerprint(req) 

1450 if fingerprint: 

1451 try: 

1452 fingerprint.check(tls_transport) 

1453 except ServerFingerprintMismatch: 

1454 tls_transport.close() 

1455 if not self._cleanup_closed_disabled: 

1456 self._cleanup_closed_transports.append(tls_transport) 

1457 raise 

1458 except cert_errors as exc: 

1459 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1460 except ssl_errors as exc: 

1461 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1462 except OSError as exc: 

1463 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1464 raise 

1465 raise client_error(req.connection_key, exc) from exc 

1466 except TypeError as type_err: 

1467 # Example cause looks like this: 

1468 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1469 # object at 0x7f760615e460> is not supported by start_tls() 

1470 

1471 raise ClientConnectionError( 

1472 "Cannot initialize a TLS-in-TLS connection to host " 

1473 f"{req.host!s}:{req.port:d} through an underlying connection " 

1474 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1475 f"[{type_err!s}]" 

1476 ) from type_err 

1477 else: 

1478 if tls_transport is None: 

1479 msg = "Failed to start TLS (possibly caused by closing transport)" 

1480 raise client_error(req.connection_key, OSError(msg)) 

1481 tls_proto.connection_made( 

1482 tls_transport 

1483 ) # Kick the state machine of the new TLS protocol 

1484 

1485 return tls_transport, tls_proto 

1486 

1487 def _convert_hosts_to_addr_infos( 

1488 self, hosts: List[ResolveResult] 

1489 ) -> List[AddrInfoType]: 

1490 """Converts the list of hosts to a list of addr_infos. 

1491 

1492 The list of hosts is the result of a DNS lookup. The list of 

1493 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1494 """ 

1495 addr_infos: List[AddrInfoType] = [] 

1496 for hinfo in hosts: 

1497 host = hinfo["host"] 

1498 is_ipv6 = ":" in host 

1499 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1500 if self._family and self._family != family: 

1501 continue 

1502 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1503 addr_infos.append( 

1504 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1505 ) 

1506 return addr_infos 

1507 

1508 async def _create_direct_connection( 

1509 self, 

1510 req: ClientRequest, 

1511 traces: List["Trace"], 

1512 timeout: "ClientTimeout", 

1513 *, 

1514 client_error: Type[Exception] = ClientConnectorError, 

1515 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1516 sslcontext = self._get_ssl_context(req) 

1517 fingerprint = self._get_fingerprint(req) 

1518 

1519 host = req.url.raw_host 

1520 assert host is not None 

1521 # Replace multiple trailing dots with a single one. 

1522 # A trailing dot is only present for fully-qualified domain names. 

1523 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1524 if host.endswith(".."): 

1525 host = host.rstrip(".") + "." 

1526 port = req.port 

1527 assert port is not None 

1528 try: 

1529 # Cancelling this lookup should not cancel the underlying lookup 

1530 # or else the cancel event will get broadcast to all the waiters 

1531 # across all connections. 

1532 hosts = await self._resolve_host(host, port, traces=traces) 

1533 except OSError as exc: 

1534 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1535 raise 

1536 # in case of proxy it is not ClientProxyConnectionError 

1537 # it is problem of resolving proxy ip itself 

1538 raise ClientConnectorDNSError(req.connection_key, exc) from exc 

1539 

1540 last_exc: Optional[Exception] = None 

1541 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1542 while addr_infos: 

1543 # Strip trailing dots, certificates contain FQDN without dots. 

1544 # See https://github.com/aio-libs/aiohttp/issues/3636 

1545 server_hostname = ( 

1546 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1547 ) 

1548 

1549 try: 

1550 transp, proto = await self._wrap_create_connection( 

1551 self._factory, 

1552 timeout=timeout, 

1553 ssl=sslcontext, 

1554 addr_infos=addr_infos, 

1555 server_hostname=server_hostname, 

1556 req=req, 

1557 client_error=client_error, 

1558 ) 

1559 except (ClientConnectorError, asyncio.TimeoutError) as exc: 

1560 last_exc = exc 

1561 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1562 continue 

1563 

1564 if req.is_ssl() and fingerprint: 

1565 try: 

1566 fingerprint.check(transp) 

1567 except ServerFingerprintMismatch as exc: 

1568 transp.close() 

1569 if not self._cleanup_closed_disabled: 

1570 self._cleanup_closed_transports.append(transp) 

1571 last_exc = exc 

1572 # Remove the bad peer from the list of addr_infos 

1573 sock: socket.socket = transp.get_extra_info("socket") 

1574 bad_peer = sock.getpeername() 

1575 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1576 continue 

1577 

1578 return transp, proto 

1579 else: 

1580 assert last_exc is not None 

1581 raise last_exc 

1582 

1583 async def _create_proxy_connection( 

1584 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1585 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1586 self._fail_on_no_start_tls(req) 

1587 runtime_has_start_tls = self._loop_supports_start_tls() 

1588 

1589 headers: Dict[str, str] = {} 

1590 if req.proxy_headers is not None: 

1591 headers = req.proxy_headers # type: ignore[assignment] 

1592 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

1593 

1594 url = req.proxy 

1595 assert url is not None 

1596 proxy_req = ClientRequest( 

1597 hdrs.METH_GET, 

1598 url, 

1599 headers=headers, 

1600 auth=req.proxy_auth, 

1601 loop=self._loop, 

1602 ssl=req.ssl, 

1603 ) 

1604 

1605 # create connection to proxy server 

1606 transport, proto = await self._create_direct_connection( 

1607 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1608 ) 

1609 

1610 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

1611 if auth is not None: 

1612 if not req.is_ssl(): 

1613 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1614 else: 

1615 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1616 

1617 if req.is_ssl(): 

1618 if runtime_has_start_tls: 

1619 self._warn_about_tls_in_tls(transport, req) 

1620 

1621 # For HTTPS requests over HTTP proxy 

1622 # we must notify proxy to tunnel connection 

1623 # so we send CONNECT command: 

1624 # CONNECT www.python.org:443 HTTP/1.1 

1625 # Host: www.python.org 

1626 # 

1627 # next we must do TLS handshake and so on 

1628 # to do this we must wrap raw socket into secure one 

1629 # asyncio handles this perfectly 

1630 proxy_req.method = hdrs.METH_CONNECT 

1631 proxy_req.url = req.url 

1632 key = req.connection_key._replace( 

1633 proxy=None, proxy_auth=None, proxy_headers_hash=None 

1634 ) 

1635 conn = _ConnectTunnelConnection(self, key, proto, self._loop) 

1636 proxy_resp = await proxy_req.send(conn) 

1637 try: 

1638 protocol = conn._protocol 

1639 assert protocol is not None 

1640 

1641 # read_until_eof=True will ensure the connection isn't closed 

1642 # once the response is received and processed allowing 

1643 # START_TLS to work on the connection below. 

1644 protocol.set_response_params( 

1645 read_until_eof=runtime_has_start_tls, 

1646 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1647 ) 

1648 resp = await proxy_resp.start(conn) 

1649 except BaseException: 

1650 proxy_resp.close() 

1651 conn.close() 

1652 raise 

1653 else: 

1654 conn._protocol = None 

1655 try: 

1656 if resp.status != 200: 

1657 message = resp.reason 

1658 if message is None: 

1659 message = HTTPStatus(resp.status).phrase 

1660 raise ClientHttpProxyError( 

1661 proxy_resp.request_info, 

1662 resp.history, 

1663 status=resp.status, 

1664 message=message, 

1665 headers=resp.headers, 

1666 ) 

1667 if not runtime_has_start_tls: 

1668 rawsock = transport.get_extra_info("socket", default=None) 

1669 if rawsock is None: 

1670 raise RuntimeError( 

1671 "Transport does not expose socket instance" 

1672 ) 

1673 # Duplicate the socket, so now we can close proxy transport 

1674 rawsock = rawsock.dup() 

1675 except BaseException: 

1676 # It shouldn't be closed in `finally` because it's fed to 

1677 # `loop.start_tls()` and the docs say not to touch it after 

1678 # passing there. 

1679 transport.close() 

1680 raise 

1681 finally: 

1682 if not runtime_has_start_tls: 

1683 transport.close() 

1684 

1685 if not runtime_has_start_tls: 

1686 # HTTP proxy with support for upgrade to HTTPS 

1687 sslcontext = self._get_ssl_context(req) 

1688 return await self._wrap_existing_connection( 

1689 self._factory, 

1690 timeout=timeout, 

1691 ssl=sslcontext, 

1692 sock=rawsock, 

1693 server_hostname=req.host, 

1694 req=req, 

1695 ) 

1696 

1697 return await self._start_tls_connection( 

1698 # Access the old transport for the last time before it's 

1699 # closed and forgotten forever: 

1700 transport, 

1701 req=req, 

1702 timeout=timeout, 

1703 ) 

1704 finally: 

1705 proxy_resp.close() 

1706 

1707 return transport, proto 

1708 

1709 

1710class UnixConnector(BaseConnector): 

1711 """Unix socket connector. 

1712 

1713 path - Unix socket path. 

1714 keepalive_timeout - (optional) Keep-alive timeout. 

1715 force_close - Set to True to force close and do reconnect 

1716 after each request (and between redirects). 

1717 limit - The total number of simultaneous connections. 

1718 limit_per_host - Number of simultaneous connections to one host. 

1719 loop - Optional event loop. 

1720 """ 

1721 

1722 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"}) 

1723 

1724 def __init__( 

1725 self, 

1726 path: str, 

1727 force_close: bool = False, 

1728 keepalive_timeout: Union[object, float, None] = sentinel, 

1729 limit: int = 100, 

1730 limit_per_host: int = 0, 

1731 loop: Optional[asyncio.AbstractEventLoop] = None, 

1732 ) -> None: 

1733 super().__init__( 

1734 force_close=force_close, 

1735 keepalive_timeout=keepalive_timeout, 

1736 limit=limit, 

1737 limit_per_host=limit_per_host, 

1738 loop=loop, 

1739 ) 

1740 self._path = path 

1741 

1742 @property 

1743 def path(self) -> str: 

1744 """Path to unix socket.""" 

1745 return self._path 

1746 

1747 async def _create_connection( 

1748 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1749 ) -> ResponseHandler: 

1750 try: 

1751 async with ceil_timeout( 

1752 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1753 ): 

1754 _, proto = await self._loop.create_unix_connection( 

1755 self._factory, self._path 

1756 ) 

1757 except OSError as exc: 

1758 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1759 raise 

1760 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1761 

1762 return proto 

1763 

1764 

1765class NamedPipeConnector(BaseConnector): 

1766 """Named pipe connector. 

1767 

1768 Only supported by the proactor event loop. 

1769 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1770 

1771 path - Windows named pipe path. 

1772 keepalive_timeout - (optional) Keep-alive timeout. 

1773 force_close - Set to True to force close and do reconnect 

1774 after each request (and between redirects). 

1775 limit - The total number of simultaneous connections. 

1776 limit_per_host - Number of simultaneous connections to one host. 

1777 loop - Optional event loop. 

1778 """ 

1779 

1780 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"}) 

1781 

1782 def __init__( 

1783 self, 

1784 path: str, 

1785 force_close: bool = False, 

1786 keepalive_timeout: Union[object, float, None] = sentinel, 

1787 limit: int = 100, 

1788 limit_per_host: int = 0, 

1789 loop: Optional[asyncio.AbstractEventLoop] = None, 

1790 ) -> None: 

1791 super().__init__( 

1792 force_close=force_close, 

1793 keepalive_timeout=keepalive_timeout, 

1794 limit=limit, 

1795 limit_per_host=limit_per_host, 

1796 loop=loop, 

1797 ) 

1798 if not isinstance( 

1799 self._loop, 

1800 asyncio.ProactorEventLoop, # type: ignore[attr-defined] 

1801 ): 

1802 raise RuntimeError( 

1803 "Named Pipes only available in proactor loop under windows" 

1804 ) 

1805 self._path = path 

1806 

1807 @property 

1808 def path(self) -> str: 

1809 """Path to the named pipe.""" 

1810 return self._path 

1811 

1812 async def _create_connection( 

1813 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1814 ) -> ResponseHandler: 

1815 try: 

1816 async with ceil_timeout( 

1817 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1818 ): 

1819 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1820 self._factory, self._path 

1821 ) 

1822 # the drain is required so that the connection_made is called 

1823 # and transport is set otherwise it is not set before the 

1824 # `assert conn.transport is not None` 

1825 # in client.py's _request method 

1826 await asyncio.sleep(0) 

1827 # other option is to manually set transport like 

1828 # `proto.transport = trans` 

1829 except OSError as exc: 

1830 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1831 raise 

1832 raise ClientConnectorError(req.connection_key, exc) from exc 

1833 

1834 return cast(ResponseHandler, proto)