Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/aiohttp/connector.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

832 statements  

1import asyncio 

2import functools 

3import random 

4import socket 

5import sys 

6import traceback 

7import warnings 

8from collections import OrderedDict, defaultdict, deque 

9from contextlib import suppress 

10from http import HTTPStatus 

11from itertools import chain, cycle, islice 

12from time import monotonic 

13from types import TracebackType 

14from typing import ( 

15 TYPE_CHECKING, 

16 Any, 

17 Awaitable, 

18 Callable, 

19 DefaultDict, 

20 Deque, 

21 Dict, 

22 Iterator, 

23 List, 

24 Literal, 

25 Optional, 

26 Sequence, 

27 Set, 

28 Tuple, 

29 Type, 

30 Union, 

31 cast, 

32) 

33 

34import aiohappyeyeballs 

35from aiohappyeyeballs import AddrInfoType, SocketFactoryType 

36 

37from . import hdrs, helpers 

38from .abc import AbstractResolver, ResolveResult 

39from .client_exceptions import ( 

40 ClientConnectionError, 

41 ClientConnectorCertificateError, 

42 ClientConnectorDNSError, 

43 ClientConnectorError, 

44 ClientConnectorSSLError, 

45 ClientHttpProxyError, 

46 ClientProxyConnectionError, 

47 ServerFingerprintMismatch, 

48 UnixClientConnectorError, 

49 cert_errors, 

50 ssl_errors, 

51) 

52from .client_proto import ResponseHandler 

53from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params 

54from .helpers import ( 

55 _SENTINEL, 

56 ceil_timeout, 

57 is_ip_address, 

58 noop, 

59 sentinel, 

60 set_exception, 

61 set_result, 

62) 

63from .log import client_logger 

64from .resolver import DefaultResolver 

65 

66if sys.version_info >= (3, 12): 

67 from collections.abc import Buffer 

68else: 

69 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

70 

71if TYPE_CHECKING: 

72 import ssl 

73 

74 SSLContext = ssl.SSLContext 

75else: 

76 try: 

77 import ssl 

78 

79 SSLContext = ssl.SSLContext 

80 except ImportError: # pragma: no cover 

81 ssl = None # type: ignore[assignment] 

82 SSLContext = object # type: ignore[misc,assignment] 

83 

84EMPTY_SCHEMA_SET = frozenset({""}) 

85HTTP_SCHEMA_SET = frozenset({"http", "https"}) 

86WS_SCHEMA_SET = frozenset({"ws", "wss"}) 

87 

88HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET 

89HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET 

90 

91NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < ( 

92 3, 

93 13, 

94 1, 

95) or sys.version_info < (3, 12, 7) 

96# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 

97# which first appeared in Python 3.12.7 and 3.13.1 

98 

99 

100__all__ = ( 

101 "BaseConnector", 

102 "TCPConnector", 

103 "UnixConnector", 

104 "NamedPipeConnector", 

105 "AddrInfoType", 

106 "SocketFactoryType", 

107) 

108 

109 

110if TYPE_CHECKING: 

111 from .client import ClientTimeout 

112 from .client_reqrep import ConnectionKey 

113 from .tracing import Trace 

114 

115 

116class _DeprecationWaiter: 

117 __slots__ = ("_awaitable", "_awaited") 

118 

119 def __init__(self, awaitable: Awaitable[Any]) -> None: 

120 self._awaitable = awaitable 

121 self._awaited = False 

122 

123 def __await__(self) -> Any: 

124 self._awaited = True 

125 return self._awaitable.__await__() 

126 

127 def __del__(self) -> None: 

128 if not self._awaited: 

129 warnings.warn( 

130 "Connector.close() is a coroutine, " 

131 "please use await connector.close()", 

132 DeprecationWarning, 

133 ) 

134 

135 

136async def _wait_for_close(waiters: List[Awaitable[object]]) -> None: 

137 """Wait for all waiters to finish closing.""" 

138 results = await asyncio.gather(*waiters, return_exceptions=True) 

139 for res in results: 

140 if isinstance(res, Exception): 

141 client_logger.debug("Error while closing connector: %r", res) 

142 

143 

144class Connection: 

145 

146 _source_traceback = None 

147 

148 def __init__( 

149 self, 

150 connector: "BaseConnector", 

151 key: "ConnectionKey", 

152 protocol: ResponseHandler, 

153 loop: asyncio.AbstractEventLoop, 

154 ) -> None: 

155 self._key = key 

156 self._connector = connector 

157 self._loop = loop 

158 self._protocol: Optional[ResponseHandler] = protocol 

159 self._callbacks: List[Callable[[], None]] = [] 

160 

161 if loop.get_debug(): 

162 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

163 

164 def __repr__(self) -> str: 

165 return f"Connection<{self._key}>" 

166 

167 def __del__(self, _warnings: Any = warnings) -> None: 

168 if self._protocol is not None: 

169 kwargs = {"source": self} 

170 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs) 

171 if self._loop.is_closed(): 

172 return 

173 

174 self._connector._release(self._key, self._protocol, should_close=True) 

175 

176 context = {"client_connection": self, "message": "Unclosed connection"} 

177 if self._source_traceback is not None: 

178 context["source_traceback"] = self._source_traceback 

179 self._loop.call_exception_handler(context) 

180 

181 def __bool__(self) -> Literal[True]: 

182 """Force subclasses to not be falsy, to make checks simpler.""" 

183 return True 

184 

185 @property 

186 def loop(self) -> asyncio.AbstractEventLoop: 

187 warnings.warn( 

188 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2 

189 ) 

190 return self._loop 

191 

192 @property 

193 def transport(self) -> Optional[asyncio.Transport]: 

194 if self._protocol is None: 

195 return None 

196 return self._protocol.transport 

197 

198 @property 

199 def protocol(self) -> Optional[ResponseHandler]: 

200 return self._protocol 

201 

202 def add_callback(self, callback: Callable[[], None]) -> None: 

203 if callback is not None: 

204 self._callbacks.append(callback) 

205 

206 def _notify_release(self) -> None: 

207 callbacks, self._callbacks = self._callbacks[:], [] 

208 

209 for cb in callbacks: 

210 with suppress(Exception): 

211 cb() 

212 

213 def close(self) -> None: 

214 self._notify_release() 

215 

216 if self._protocol is not None: 

217 self._connector._release(self._key, self._protocol, should_close=True) 

218 self._protocol = None 

219 

220 def release(self) -> None: 

221 self._notify_release() 

222 

223 if self._protocol is not None: 

224 self._connector._release(self._key, self._protocol) 

225 self._protocol = None 

226 

227 @property 

228 def closed(self) -> bool: 

229 return self._protocol is None or not self._protocol.is_connected() 

230 

231 

232class _TransportPlaceholder: 

233 """placeholder for BaseConnector.connect function""" 

234 

235 __slots__ = ("closed", "transport") 

236 

237 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None: 

238 """Initialize a placeholder for a transport.""" 

239 self.closed = closed_future 

240 self.transport = None 

241 

242 def close(self) -> None: 

243 """Close the placeholder.""" 

244 

245 def abort(self) -> None: 

246 """Abort the placeholder (does nothing).""" 

247 

248 

249class BaseConnector: 

250 """Base connector class. 

251 

252 keepalive_timeout - (optional) Keep-alive timeout. 

253 force_close - Set to True to force close and do reconnect 

254 after each request (and between redirects). 

255 limit - The total number of simultaneous connections. 

256 limit_per_host - Number of simultaneous connections to one host. 

257 enable_cleanup_closed - Enables clean-up closed ssl transports. 

258 Disabled by default. 

259 timeout_ceil_threshold - Trigger ceiling of timeout values when 

260 it's above timeout_ceil_threshold. 

261 loop - Optional event loop. 

262 """ 

263 

264 _closed = True # prevent AttributeError in __del__ if ctor was failed 

265 _source_traceback = None 

266 

267 # abort transport after 2 seconds (cleanup broken connections) 

268 _cleanup_closed_period = 2.0 

269 

270 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET 

271 

272 def __init__( 

273 self, 

274 *, 

275 keepalive_timeout: Union[object, None, float] = sentinel, 

276 force_close: bool = False, 

277 limit: int = 100, 

278 limit_per_host: int = 0, 

279 enable_cleanup_closed: bool = False, 

280 loop: Optional[asyncio.AbstractEventLoop] = None, 

281 timeout_ceil_threshold: float = 5, 

282 ) -> None: 

283 

284 if force_close: 

285 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

286 raise ValueError( 

287 "keepalive_timeout cannot be set if force_close is True" 

288 ) 

289 else: 

290 if keepalive_timeout is sentinel: 

291 keepalive_timeout = 15.0 

292 

293 loop = loop or asyncio.get_running_loop() 

294 self._timeout_ceil_threshold = timeout_ceil_threshold 

295 

296 self._closed = False 

297 if loop.get_debug(): 

298 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

299 

300 # Connection pool of reusable connections. 

301 # We use a deque to store connections because it has O(1) popleft() 

302 # and O(1) append() operations to implement a FIFO queue. 

303 self._conns: DefaultDict[ 

304 ConnectionKey, Deque[Tuple[ResponseHandler, float]] 

305 ] = defaultdict(deque) 

306 self._limit = limit 

307 self._limit_per_host = limit_per_host 

308 self._acquired: Set[ResponseHandler] = set() 

309 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = ( 

310 defaultdict(set) 

311 ) 

312 self._keepalive_timeout = cast(float, keepalive_timeout) 

313 self._force_close = force_close 

314 

315 # {host_key: FIFO list of waiters} 

316 # The FIFO is implemented with an OrderedDict with None keys because 

317 # python does not have an ordered set. 

318 self._waiters: DefaultDict[ 

319 ConnectionKey, OrderedDict[asyncio.Future[None], None] 

320 ] = defaultdict(OrderedDict) 

321 

322 self._loop = loop 

323 self._factory = functools.partial(ResponseHandler, loop=loop) 

324 

325 # start keep-alive connection cleanup task 

326 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

327 

328 # start cleanup closed transports task 

329 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

330 

331 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED: 

332 warnings.warn( 

333 "enable_cleanup_closed ignored because " 

334 "https://github.com/python/cpython/pull/118960 is fixed " 

335 f"in Python version {sys.version_info}", 

336 DeprecationWarning, 

337 stacklevel=2, 

338 ) 

339 enable_cleanup_closed = False 

340 

341 self._cleanup_closed_disabled = not enable_cleanup_closed 

342 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

343 self._placeholder_future: asyncio.Future[Optional[Exception]] = ( 

344 loop.create_future() 

345 ) 

346 self._placeholder_future.set_result(None) 

347 self._cleanup_closed() 

348 

349 def __del__(self, _warnings: Any = warnings) -> None: 

350 if self._closed: 

351 return 

352 if not self._conns: 

353 return 

354 

355 conns = [repr(c) for c in self._conns.values()] 

356 

357 self._close() 

358 

359 kwargs = {"source": self} 

360 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs) 

361 context = { 

362 "connector": self, 

363 "connections": conns, 

364 "message": "Unclosed connector", 

365 } 

366 if self._source_traceback is not None: 

367 context["source_traceback"] = self._source_traceback 

368 self._loop.call_exception_handler(context) 

369 

370 def __enter__(self) -> "BaseConnector": 

371 warnings.warn( 

372 '"with Connector():" is deprecated, ' 

373 'use "async with Connector():" instead', 

374 DeprecationWarning, 

375 ) 

376 return self 

377 

378 def __exit__(self, *exc: Any) -> None: 

379 self._close() 

380 

381 async def __aenter__(self) -> "BaseConnector": 

382 return self 

383 

384 async def __aexit__( 

385 self, 

386 exc_type: Optional[Type[BaseException]] = None, 

387 exc_value: Optional[BaseException] = None, 

388 exc_traceback: Optional[TracebackType] = None, 

389 ) -> None: 

390 await self.close() 

391 

392 @property 

393 def force_close(self) -> bool: 

394 """Ultimately close connection on releasing if True.""" 

395 return self._force_close 

396 

397 @property 

398 def limit(self) -> int: 

399 """The total number for simultaneous connections. 

400 

401 If limit is 0 the connector has no limit. 

402 The default limit size is 100. 

403 """ 

404 return self._limit 

405 

406 @property 

407 def limit_per_host(self) -> int: 

408 """The limit for simultaneous connections to the same endpoint. 

409 

410 Endpoints are the same if they are have equal 

411 (host, port, is_ssl) triple. 

412 """ 

413 return self._limit_per_host 

414 

415 def _cleanup(self) -> None: 

416 """Cleanup unused transports.""" 

417 if self._cleanup_handle: 

418 self._cleanup_handle.cancel() 

419 # _cleanup_handle should be unset, otherwise _release() will not 

420 # recreate it ever! 

421 self._cleanup_handle = None 

422 

423 now = monotonic() 

424 timeout = self._keepalive_timeout 

425 

426 if self._conns: 

427 connections = defaultdict(deque) 

428 deadline = now - timeout 

429 for key, conns in self._conns.items(): 

430 alive: Deque[Tuple[ResponseHandler, float]] = deque() 

431 for proto, use_time in conns: 

432 if proto.is_connected() and use_time - deadline >= 0: 

433 alive.append((proto, use_time)) 

434 continue 

435 transport = proto.transport 

436 proto.close() 

437 if not self._cleanup_closed_disabled and key.is_ssl: 

438 self._cleanup_closed_transports.append(transport) 

439 

440 if alive: 

441 connections[key] = alive 

442 

443 self._conns = connections 

444 

445 if self._conns: 

446 self._cleanup_handle = helpers.weakref_handle( 

447 self, 

448 "_cleanup", 

449 timeout, 

450 self._loop, 

451 timeout_ceil_threshold=self._timeout_ceil_threshold, 

452 ) 

453 

454 def _cleanup_closed(self) -> None: 

455 """Double confirmation for transport close. 

456 

457 Some broken ssl servers may leave socket open without proper close. 

458 """ 

459 if self._cleanup_closed_handle: 

460 self._cleanup_closed_handle.cancel() 

461 

462 for transport in self._cleanup_closed_transports: 

463 if transport is not None: 

464 transport.abort() 

465 

466 self._cleanup_closed_transports = [] 

467 

468 if not self._cleanup_closed_disabled: 

469 self._cleanup_closed_handle = helpers.weakref_handle( 

470 self, 

471 "_cleanup_closed", 

472 self._cleanup_closed_period, 

473 self._loop, 

474 timeout_ceil_threshold=self._timeout_ceil_threshold, 

475 ) 

476 

477 def close(self, *, abort_ssl: bool = False) -> Awaitable[None]: 

478 """Close all opened transports. 

479 

480 :param abort_ssl: If True, SSL connections will be aborted immediately 

481 without performing the shutdown handshake. This provides 

482 faster cleanup at the cost of less graceful disconnection. 

483 """ 

484 if not (waiters := self._close(abort_ssl=abort_ssl)): 

485 # If there are no connections to close, we can return a noop 

486 # awaitable to avoid scheduling a task on the event loop. 

487 return _DeprecationWaiter(noop()) 

488 coro = _wait_for_close(waiters) 

489 if sys.version_info >= (3, 12): 

490 # Optimization for Python 3.12, try to close connections 

491 # immediately to avoid having to schedule the task on the event loop. 

492 task = asyncio.Task(coro, loop=self._loop, eager_start=True) 

493 else: 

494 task = self._loop.create_task(coro) 

495 return _DeprecationWaiter(task) 

496 

497 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

498 waiters: List[Awaitable[object]] = [] 

499 

500 if self._closed: 

501 return waiters 

502 

503 self._closed = True 

504 

505 try: 

506 if self._loop.is_closed(): 

507 return waiters 

508 

509 # cancel cleanup task 

510 if self._cleanup_handle: 

511 self._cleanup_handle.cancel() 

512 

513 # cancel cleanup close task 

514 if self._cleanup_closed_handle: 

515 self._cleanup_closed_handle.cancel() 

516 

517 for data in self._conns.values(): 

518 for proto, _ in data: 

519 if ( 

520 abort_ssl 

521 and proto.transport 

522 and proto.transport.get_extra_info("sslcontext") is not None 

523 ): 

524 proto.abort() 

525 else: 

526 proto.close() 

527 if closed := proto.closed: 

528 waiters.append(closed) 

529 

530 for proto in self._acquired: 

531 if ( 

532 abort_ssl 

533 and proto.transport 

534 and proto.transport.get_extra_info("sslcontext") is not None 

535 ): 

536 proto.abort() 

537 else: 

538 proto.close() 

539 if closed := proto.closed: 

540 waiters.append(closed) 

541 

542 for transport in self._cleanup_closed_transports: 

543 if transport is not None: 

544 transport.abort() 

545 

546 return waiters 

547 

548 finally: 

549 self._conns.clear() 

550 self._acquired.clear() 

551 for keyed_waiters in self._waiters.values(): 

552 for keyed_waiter in keyed_waiters: 

553 keyed_waiter.cancel() 

554 self._waiters.clear() 

555 self._cleanup_handle = None 

556 self._cleanup_closed_transports.clear() 

557 self._cleanup_closed_handle = None 

558 

559 @property 

560 def closed(self) -> bool: 

561 """Is connector closed. 

562 

563 A readonly property. 

564 """ 

565 return self._closed 

566 

567 def _available_connections(self, key: "ConnectionKey") -> int: 

568 """ 

569 Return number of available connections. 

570 

571 The limit, limit_per_host and the connection key are taken into account. 

572 

573 If it returns less than 1 means that there are no connections 

574 available. 

575 """ 

576 # check total available connections 

577 # If there are no limits, this will always return 1 

578 total_remain = 1 

579 

580 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: 

581 return total_remain 

582 

583 # check limit per host 

584 if host_remain := self._limit_per_host: 

585 if acquired := self._acquired_per_host.get(key): 

586 host_remain -= len(acquired) 

587 if total_remain > host_remain: 

588 return host_remain 

589 

590 return total_remain 

591 

592 async def connect( 

593 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

594 ) -> Connection: 

595 """Get from pool or create new connection.""" 

596 key = req.connection_key 

597 if (conn := await self._get(key, traces)) is not None: 

598 # If we do not have to wait and we can get a connection from the pool 

599 # we can avoid the timeout ceil logic and directly return the connection 

600 return conn 

601 

602 async with ceil_timeout(timeout.connect, timeout.ceil_threshold): 

603 if self._available_connections(key) <= 0: 

604 await self._wait_for_available_connection(key, traces) 

605 if (conn := await self._get(key, traces)) is not None: 

606 return conn 

607 

608 placeholder = cast( 

609 ResponseHandler, _TransportPlaceholder(self._placeholder_future) 

610 ) 

611 self._acquired.add(placeholder) 

612 if self._limit_per_host: 

613 self._acquired_per_host[key].add(placeholder) 

614 

615 try: 

616 # Traces are done inside the try block to ensure that the 

617 # that the placeholder is still cleaned up if an exception 

618 # is raised. 

619 if traces: 

620 for trace in traces: 

621 await trace.send_connection_create_start() 

622 proto = await self._create_connection(req, traces, timeout) 

623 if traces: 

624 for trace in traces: 

625 await trace.send_connection_create_end() 

626 except BaseException: 

627 self._release_acquired(key, placeholder) 

628 raise 

629 else: 

630 if self._closed: 

631 proto.close() 

632 raise ClientConnectionError("Connector is closed.") 

633 

634 # The connection was successfully created, drop the placeholder 

635 # and add the real connection to the acquired set. There should 

636 # be no awaits after the proto is added to the acquired set 

637 # to ensure that the connection is not left in the acquired set 

638 # on cancellation. 

639 self._acquired.remove(placeholder) 

640 self._acquired.add(proto) 

641 if self._limit_per_host: 

642 acquired_per_host = self._acquired_per_host[key] 

643 acquired_per_host.remove(placeholder) 

644 acquired_per_host.add(proto) 

645 return Connection(self, key, proto, self._loop) 

646 

647 async def _wait_for_available_connection( 

648 self, key: "ConnectionKey", traces: List["Trace"] 

649 ) -> None: 

650 """Wait for an available connection slot.""" 

651 # We loop here because there is a race between 

652 # the connection limit check and the connection 

653 # being acquired. If the connection is acquired 

654 # between the check and the await statement, we 

655 # need to loop again to check if the connection 

656 # slot is still available. 

657 attempts = 0 

658 while True: 

659 fut: asyncio.Future[None] = self._loop.create_future() 

660 keyed_waiters = self._waiters[key] 

661 keyed_waiters[fut] = None 

662 if attempts: 

663 # If we have waited before, we need to move the waiter 

664 # to the front of the queue as otherwise we might get 

665 # starved and hit the timeout. 

666 keyed_waiters.move_to_end(fut, last=False) 

667 

668 try: 

669 # Traces happen in the try block to ensure that the 

670 # the waiter is still cleaned up if an exception is raised. 

671 if traces: 

672 for trace in traces: 

673 await trace.send_connection_queued_start() 

674 await fut 

675 if traces: 

676 for trace in traces: 

677 await trace.send_connection_queued_end() 

678 finally: 

679 # pop the waiter from the queue if its still 

680 # there and not already removed by _release_waiter 

681 keyed_waiters.pop(fut, None) 

682 if not self._waiters.get(key, True): 

683 del self._waiters[key] 

684 

685 if self._available_connections(key) > 0: 

686 break 

687 attempts += 1 

688 

689 async def _get( 

690 self, key: "ConnectionKey", traces: List["Trace"] 

691 ) -> Optional[Connection]: 

692 """Get next reusable connection for the key or None. 

693 

694 The connection will be marked as acquired. 

695 """ 

696 if (conns := self._conns.get(key)) is None: 

697 return None 

698 

699 t1 = monotonic() 

700 while conns: 

701 proto, t0 = conns.popleft() 

702 # We will we reuse the connection if its connected and 

703 # the keepalive timeout has not been exceeded 

704 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: 

705 if not conns: 

706 # The very last connection was reclaimed: drop the key 

707 del self._conns[key] 

708 self._acquired.add(proto) 

709 if self._limit_per_host: 

710 self._acquired_per_host[key].add(proto) 

711 if traces: 

712 for trace in traces: 

713 try: 

714 await trace.send_connection_reuseconn() 

715 except BaseException: 

716 self._release_acquired(key, proto) 

717 raise 

718 return Connection(self, key, proto, self._loop) 

719 

720 # Connection cannot be reused, close it 

721 transport = proto.transport 

722 proto.close() 

723 # only for SSL transports 

724 if not self._cleanup_closed_disabled and key.is_ssl: 

725 self._cleanup_closed_transports.append(transport) 

726 

727 # No more connections: drop the key 

728 del self._conns[key] 

729 return None 

730 

731 def _release_waiter(self) -> None: 

732 """ 

733 Iterates over all waiters until one to be released is found. 

734 

735 The one to be released is not finished and 

736 belongs to a host that has available connections. 

737 """ 

738 if not self._waiters: 

739 return 

740 

741 # Having the dict keys ordered this avoids to iterate 

742 # at the same order at each call. 

743 queues = list(self._waiters) 

744 random.shuffle(queues) 

745 

746 for key in queues: 

747 if self._available_connections(key) < 1: 

748 continue 

749 

750 waiters = self._waiters[key] 

751 while waiters: 

752 waiter, _ = waiters.popitem(last=False) 

753 if not waiter.done(): 

754 waiter.set_result(None) 

755 return 

756 

757 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

758 """Release acquired connection.""" 

759 if self._closed: 

760 # acquired connection is already released on connector closing 

761 return 

762 

763 self._acquired.discard(proto) 

764 if self._limit_per_host and (conns := self._acquired_per_host.get(key)): 

765 conns.discard(proto) 

766 if not conns: 

767 del self._acquired_per_host[key] 

768 self._release_waiter() 

769 

770 def _release( 

771 self, 

772 key: "ConnectionKey", 

773 protocol: ResponseHandler, 

774 *, 

775 should_close: bool = False, 

776 ) -> None: 

777 if self._closed: 

778 # acquired connection is already released on connector closing 

779 return 

780 

781 self._release_acquired(key, protocol) 

782 

783 if self._force_close or should_close or protocol.should_close: 

784 transport = protocol.transport 

785 protocol.close() 

786 

787 if key.is_ssl and not self._cleanup_closed_disabled: 

788 self._cleanup_closed_transports.append(transport) 

789 return 

790 

791 self._conns[key].append((protocol, monotonic())) 

792 

793 if self._cleanup_handle is None: 

794 self._cleanup_handle = helpers.weakref_handle( 

795 self, 

796 "_cleanup", 

797 self._keepalive_timeout, 

798 self._loop, 

799 timeout_ceil_threshold=self._timeout_ceil_threshold, 

800 ) 

801 

802 async def _create_connection( 

803 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

804 ) -> ResponseHandler: 

805 raise NotImplementedError() 

806 

807 

808class _DNSCacheTable: 

809 def __init__(self, ttl: Optional[float] = None) -> None: 

810 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {} 

811 self._timestamps: Dict[Tuple[str, int], float] = {} 

812 self._ttl = ttl 

813 

814 def __contains__(self, host: object) -> bool: 

815 return host in self._addrs_rr 

816 

817 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None: 

818 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

819 

820 if self._ttl is not None: 

821 self._timestamps[key] = monotonic() 

822 

823 def remove(self, key: Tuple[str, int]) -> None: 

824 self._addrs_rr.pop(key, None) 

825 

826 if self._ttl is not None: 

827 self._timestamps.pop(key, None) 

828 

829 def clear(self) -> None: 

830 self._addrs_rr.clear() 

831 self._timestamps.clear() 

832 

833 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]: 

834 loop, length = self._addrs_rr[key] 

835 addrs = list(islice(loop, length)) 

836 # Consume one more element to shift internal state of `cycle` 

837 next(loop) 

838 return addrs 

839 

840 def expired(self, key: Tuple[str, int]) -> bool: 

841 if self._ttl is None: 

842 return False 

843 

844 return self._timestamps[key] + self._ttl < monotonic() 

845 

846 

847def _make_ssl_context(verified: bool) -> SSLContext: 

848 """Create SSL context. 

849 

850 This method is not async-friendly and should be called from a thread 

851 because it will load certificates from disk and do other blocking I/O. 

852 """ 

853 if ssl is None: 

854 # No ssl support 

855 return None 

856 if verified: 

857 sslcontext = ssl.create_default_context() 

858 else: 

859 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

860 sslcontext.options |= ssl.OP_NO_SSLv2 

861 sslcontext.options |= ssl.OP_NO_SSLv3 

862 sslcontext.check_hostname = False 

863 sslcontext.verify_mode = ssl.CERT_NONE 

864 sslcontext.options |= ssl.OP_NO_COMPRESSION 

865 sslcontext.set_default_verify_paths() 

866 sslcontext.set_alpn_protocols(("http/1.1",)) 

867 return sslcontext 

868 

869 

870# The default SSLContext objects are created at import time 

871# since they do blocking I/O to load certificates from disk, 

872# and imports should always be done before the event loop starts 

873# or in a thread. 

874_SSL_CONTEXT_VERIFIED = _make_ssl_context(True) 

875_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False) 

876 

877 

878class TCPConnector(BaseConnector): 

879 """TCP connector. 

880 

881 verify_ssl - Set to True to check ssl certifications. 

882 fingerprint - Pass the binary sha256 

883 digest of the expected certificate in DER format to verify 

884 that the certificate the server presents matches. See also 

885 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning 

886 resolver - Enable DNS lookups and use this 

887 resolver 

888 use_dns_cache - Use memory cache for DNS lookups. 

889 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

890 family - socket address family 

891 local_addr - local tuple of (host, port) to bind socket to 

892 

893 keepalive_timeout - (optional) Keep-alive timeout. 

894 force_close - Set to True to force close and do reconnect 

895 after each request (and between redirects). 

896 limit - The total number of simultaneous connections. 

897 limit_per_host - Number of simultaneous connections to one host. 

898 enable_cleanup_closed - Enables clean-up closed ssl transports. 

899 Disabled by default. 

900 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

901 as defined in RFC 8305. To disable 

902 the happy eyeballs algorithm, set to None. 

903 interleave - “First Address Family Count” as defined in RFC 8305 

904 loop - Optional event loop. 

905 socket_factory - A SocketFactoryType function that, if supplied, 

906 will be used to create sockets given an 

907 AddrInfoType. 

908 ssl_shutdown_timeout - DEPRECATED. Will be removed in aiohttp 4.0. 

909 Grace period for SSL shutdown handshake on TLS 

910 connections. Default is 0 seconds (immediate abort). 

911 This parameter allowed for a clean SSL shutdown by 

912 notifying the remote peer of connection closure, 

913 while avoiding excessive delays during connector cleanup. 

914 Note: Only takes effect on Python 3.11+. 

915 """ 

916 

917 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"}) 

918 

919 def __init__( 

920 self, 

921 *, 

922 verify_ssl: bool = True, 

923 fingerprint: Optional[bytes] = None, 

924 use_dns_cache: bool = True, 

925 ttl_dns_cache: Optional[int] = 10, 

926 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC, 

927 ssl_context: Optional[SSLContext] = None, 

928 ssl: Union[bool, Fingerprint, SSLContext] = True, 

929 local_addr: Optional[Tuple[str, int]] = None, 

930 resolver: Optional[AbstractResolver] = None, 

931 keepalive_timeout: Union[None, float, object] = sentinel, 

932 force_close: bool = False, 

933 limit: int = 100, 

934 limit_per_host: int = 0, 

935 enable_cleanup_closed: bool = False, 

936 loop: Optional[asyncio.AbstractEventLoop] = None, 

937 timeout_ceil_threshold: float = 5, 

938 happy_eyeballs_delay: Optional[float] = 0.25, 

939 interleave: Optional[int] = None, 

940 socket_factory: Optional[SocketFactoryType] = None, 

941 ssl_shutdown_timeout: Union[_SENTINEL, None, float] = sentinel, 

942 ): 

943 super().__init__( 

944 keepalive_timeout=keepalive_timeout, 

945 force_close=force_close, 

946 limit=limit, 

947 limit_per_host=limit_per_host, 

948 enable_cleanup_closed=enable_cleanup_closed, 

949 loop=loop, 

950 timeout_ceil_threshold=timeout_ceil_threshold, 

951 ) 

952 

953 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

954 

955 self._resolver: AbstractResolver 

956 if resolver is None: 

957 self._resolver = DefaultResolver(loop=self._loop) 

958 self._resolver_owner = True 

959 else: 

960 self._resolver = resolver 

961 self._resolver_owner = False 

962 

963 self._use_dns_cache = use_dns_cache 

964 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 

965 self._throttle_dns_futures: Dict[ 

966 Tuple[str, int], Set["asyncio.Future[None]"] 

967 ] = {} 

968 self._family = family 

969 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

970 self._happy_eyeballs_delay = happy_eyeballs_delay 

971 self._interleave = interleave 

972 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set() 

973 self._socket_factory = socket_factory 

974 self._ssl_shutdown_timeout: Optional[float] 

975 # Handle ssl_shutdown_timeout with warning for Python < 3.11 

976 if ssl_shutdown_timeout is sentinel: 

977 self._ssl_shutdown_timeout = 0 

978 else: 

979 # Deprecation warning for ssl_shutdown_timeout parameter 

980 warnings.warn( 

981 "The ssl_shutdown_timeout parameter is deprecated and will be removed in aiohttp 4.0", 

982 DeprecationWarning, 

983 stacklevel=2, 

984 ) 

985 if ( 

986 sys.version_info < (3, 11) 

987 and ssl_shutdown_timeout is not None 

988 and ssl_shutdown_timeout != 0 

989 ): 

990 warnings.warn( 

991 f"ssl_shutdown_timeout={ssl_shutdown_timeout} is ignored on Python < 3.11; " 

992 "only ssl_shutdown_timeout=0 is supported. The timeout will be ignored.", 

993 RuntimeWarning, 

994 stacklevel=2, 

995 ) 

996 self._ssl_shutdown_timeout = ssl_shutdown_timeout 

997 

998 def _close(self, *, abort_ssl: bool = False) -> List[Awaitable[object]]: 

999 """Close all ongoing DNS calls.""" 

1000 for fut in chain.from_iterable(self._throttle_dns_futures.values()): 

1001 fut.cancel() 

1002 

1003 waiters = super()._close(abort_ssl=abort_ssl) 

1004 

1005 for t in self._resolve_host_tasks: 

1006 t.cancel() 

1007 waiters.append(t) 

1008 

1009 return waiters 

1010 

1011 async def close(self, *, abort_ssl: bool = False) -> None: 

1012 """ 

1013 Close all opened transports. 

1014 

1015 :param abort_ssl: If True, SSL connections will be aborted immediately 

1016 without performing the shutdown handshake. If False (default), 

1017 the behavior is determined by ssl_shutdown_timeout: 

1018 - If ssl_shutdown_timeout=0: connections are aborted 

1019 - If ssl_shutdown_timeout>0: graceful shutdown is performed 

1020 """ 

1021 if self._resolver_owner: 

1022 await self._resolver.close() 

1023 # Use abort_ssl param if explicitly set, otherwise use ssl_shutdown_timeout default 

1024 await super().close(abort_ssl=abort_ssl or self._ssl_shutdown_timeout == 0) 

1025 

1026 @property 

1027 def family(self) -> int: 

1028 """Socket family like AF_INET.""" 

1029 return self._family 

1030 

1031 @property 

1032 def use_dns_cache(self) -> bool: 

1033 """True if local DNS caching is enabled.""" 

1034 return self._use_dns_cache 

1035 

1036 def clear_dns_cache( 

1037 self, host: Optional[str] = None, port: Optional[int] = None 

1038 ) -> None: 

1039 """Remove specified host/port or clear all dns local cache.""" 

1040 if host is not None and port is not None: 

1041 self._cached_hosts.remove((host, port)) 

1042 elif host is not None or port is not None: 

1043 raise ValueError("either both host and port or none of them are allowed") 

1044 else: 

1045 self._cached_hosts.clear() 

1046 

1047 async def _resolve_host( 

1048 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None 

1049 ) -> List[ResolveResult]: 

1050 """Resolve host and return list of addresses.""" 

1051 if is_ip_address(host): 

1052 return [ 

1053 { 

1054 "hostname": host, 

1055 "host": host, 

1056 "port": port, 

1057 "family": self._family, 

1058 "proto": 0, 

1059 "flags": 0, 

1060 } 

1061 ] 

1062 

1063 if not self._use_dns_cache: 

1064 

1065 if traces: 

1066 for trace in traces: 

1067 await trace.send_dns_resolvehost_start(host) 

1068 

1069 res = await self._resolver.resolve(host, port, family=self._family) 

1070 

1071 if traces: 

1072 for trace in traces: 

1073 await trace.send_dns_resolvehost_end(host) 

1074 

1075 return res 

1076 

1077 key = (host, port) 

1078 if key in self._cached_hosts and not self._cached_hosts.expired(key): 

1079 # get result early, before any await (#4014) 

1080 result = self._cached_hosts.next_addrs(key) 

1081 

1082 if traces: 

1083 for trace in traces: 

1084 await trace.send_dns_cache_hit(host) 

1085 return result 

1086 

1087 futures: Set["asyncio.Future[None]"] 

1088 # 

1089 # If multiple connectors are resolving the same host, we wait 

1090 # for the first one to resolve and then use the result for all of them. 

1091 # We use a throttle to ensure that we only resolve the host once 

1092 # and then use the result for all the waiters. 

1093 # 

1094 if key in self._throttle_dns_futures: 

1095 # get futures early, before any await (#4014) 

1096 futures = self._throttle_dns_futures[key] 

1097 future: asyncio.Future[None] = self._loop.create_future() 

1098 futures.add(future) 

1099 if traces: 

1100 for trace in traces: 

1101 await trace.send_dns_cache_hit(host) 

1102 try: 

1103 await future 

1104 finally: 

1105 futures.discard(future) 

1106 return self._cached_hosts.next_addrs(key) 

1107 

1108 # update dict early, before any await (#4014) 

1109 self._throttle_dns_futures[key] = futures = set() 

1110 # In this case we need to create a task to ensure that we can shield 

1111 # the task from cancellation as cancelling this lookup should not cancel 

1112 # the underlying lookup or else the cancel event will get broadcast to 

1113 # all the waiters across all connections. 

1114 # 

1115 coro = self._resolve_host_with_throttle(key, host, port, futures, traces) 

1116 loop = asyncio.get_running_loop() 

1117 if sys.version_info >= (3, 12): 

1118 # Optimization for Python 3.12, try to send immediately 

1119 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) 

1120 else: 

1121 resolved_host_task = loop.create_task(coro) 

1122 

1123 if not resolved_host_task.done(): 

1124 self._resolve_host_tasks.add(resolved_host_task) 

1125 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard) 

1126 

1127 try: 

1128 return await asyncio.shield(resolved_host_task) 

1129 except asyncio.CancelledError: 

1130 

1131 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None: 

1132 with suppress(Exception, asyncio.CancelledError): 

1133 fut.result() 

1134 

1135 resolved_host_task.add_done_callback(drop_exception) 

1136 raise 

1137 

1138 async def _resolve_host_with_throttle( 

1139 self, 

1140 key: Tuple[str, int], 

1141 host: str, 

1142 port: int, 

1143 futures: Set["asyncio.Future[None]"], 

1144 traces: Optional[Sequence["Trace"]], 

1145 ) -> List[ResolveResult]: 

1146 """Resolve host and set result for all waiters. 

1147 

1148 This method must be run in a task and shielded from cancellation 

1149 to avoid cancelling the underlying lookup. 

1150 """ 

1151 try: 

1152 if traces: 

1153 for trace in traces: 

1154 await trace.send_dns_cache_miss(host) 

1155 

1156 for trace in traces: 

1157 await trace.send_dns_resolvehost_start(host) 

1158 

1159 addrs = await self._resolver.resolve(host, port, family=self._family) 

1160 if traces: 

1161 for trace in traces: 

1162 await trace.send_dns_resolvehost_end(host) 

1163 

1164 self._cached_hosts.add(key, addrs) 

1165 for fut in futures: 

1166 set_result(fut, None) 

1167 except BaseException as e: 

1168 # any DNS exception is set for the waiters to raise the same exception. 

1169 # This coro is always run in task that is shielded from cancellation so 

1170 # we should never be propagating cancellation here. 

1171 for fut in futures: 

1172 set_exception(fut, e) 

1173 raise 

1174 finally: 

1175 self._throttle_dns_futures.pop(key) 

1176 

1177 return self._cached_hosts.next_addrs(key) 

1178 

1179 async def _create_connection( 

1180 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1181 ) -> ResponseHandler: 

1182 """Create connection. 

1183 

1184 Has same keyword arguments as BaseEventLoop.create_connection. 

1185 """ 

1186 if req.proxy: 

1187 _, proto = await self._create_proxy_connection(req, traces, timeout) 

1188 else: 

1189 _, proto = await self._create_direct_connection(req, traces, timeout) 

1190 

1191 return proto 

1192 

1193 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]: 

1194 """Logic to get the correct SSL context 

1195 

1196 0. if req.ssl is false, return None 

1197 

1198 1. if ssl_context is specified in req, use it 

1199 2. if _ssl_context is specified in self, use it 

1200 3. otherwise: 

1201 1. if verify_ssl is not specified in req, use self.ssl_context 

1202 (will generate a default context according to self.verify_ssl) 

1203 2. if verify_ssl is True in req, generate a default SSL context 

1204 3. if verify_ssl is False in req, generate a SSL context that 

1205 won't verify 

1206 """ 

1207 if not req.is_ssl(): 

1208 return None 

1209 

1210 if ssl is None: # pragma: no cover 

1211 raise RuntimeError("SSL is not supported.") 

1212 sslcontext = req.ssl 

1213 if isinstance(sslcontext, ssl.SSLContext): 

1214 return sslcontext 

1215 if sslcontext is not True: 

1216 # not verified or fingerprinted 

1217 return _SSL_CONTEXT_UNVERIFIED 

1218 sslcontext = self._ssl 

1219 if isinstance(sslcontext, ssl.SSLContext): 

1220 return sslcontext 

1221 if sslcontext is not True: 

1222 # not verified or fingerprinted 

1223 return _SSL_CONTEXT_UNVERIFIED 

1224 return _SSL_CONTEXT_VERIFIED 

1225 

1226 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]: 

1227 ret = req.ssl 

1228 if isinstance(ret, Fingerprint): 

1229 return ret 

1230 ret = self._ssl 

1231 if isinstance(ret, Fingerprint): 

1232 return ret 

1233 return None 

1234 

1235 async def _wrap_create_connection( 

1236 self, 

1237 *args: Any, 

1238 addr_infos: List[AddrInfoType], 

1239 req: ClientRequest, 

1240 timeout: "ClientTimeout", 

1241 client_error: Type[Exception] = ClientConnectorError, 

1242 **kwargs: Any, 

1243 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1244 try: 

1245 async with ceil_timeout( 

1246 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1247 ): 

1248 sock = await aiohappyeyeballs.start_connection( 

1249 addr_infos=addr_infos, 

1250 local_addr_infos=self._local_addr_infos, 

1251 happy_eyeballs_delay=self._happy_eyeballs_delay, 

1252 interleave=self._interleave, 

1253 loop=self._loop, 

1254 socket_factory=self._socket_factory, 

1255 ) 

1256 # Add ssl_shutdown_timeout for Python 3.11+ when SSL is used 

1257 if ( 

1258 kwargs.get("ssl") 

1259 and self._ssl_shutdown_timeout 

1260 and sys.version_info >= (3, 11) 

1261 ): 

1262 kwargs["ssl_shutdown_timeout"] = self._ssl_shutdown_timeout 

1263 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

1264 except cert_errors as exc: 

1265 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1266 except ssl_errors as exc: 

1267 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1268 except OSError as exc: 

1269 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1270 raise 

1271 raise client_error(req.connection_key, exc) from exc 

1272 

1273 async def _wrap_existing_connection( 

1274 self, 

1275 *args: Any, 

1276 req: ClientRequest, 

1277 timeout: "ClientTimeout", 

1278 client_error: Type[Exception] = ClientConnectorError, 

1279 **kwargs: Any, 

1280 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1281 try: 

1282 async with ceil_timeout( 

1283 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1284 ): 

1285 return await self._loop.create_connection(*args, **kwargs) 

1286 except cert_errors as exc: 

1287 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1288 except ssl_errors as exc: 

1289 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1290 except OSError as exc: 

1291 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1292 raise 

1293 raise client_error(req.connection_key, exc) from exc 

1294 

1295 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None: 

1296 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``. 

1297 

1298 It is necessary for TLS-in-TLS so that it is possible to 

1299 send HTTPS queries through HTTPS proxies. 

1300 

1301 This doesn't affect regular HTTP requests, though. 

1302 """ 

1303 if not req.is_ssl(): 

1304 return 

1305 

1306 proxy_url = req.proxy 

1307 assert proxy_url is not None 

1308 if proxy_url.scheme != "https": 

1309 return 

1310 

1311 self._check_loop_for_start_tls() 

1312 

1313 def _check_loop_for_start_tls(self) -> None: 

1314 try: 

1315 self._loop.start_tls 

1316 except AttributeError as attr_exc: 

1317 raise RuntimeError( 

1318 "An HTTPS request is being sent through an HTTPS proxy. " 

1319 "This needs support for TLS in TLS but it is not implemented " 

1320 "in your runtime for the stdlib asyncio.\n\n" 

1321 "Please upgrade to Python 3.11 or higher. For more details, " 

1322 "please see:\n" 

1323 "* https://bugs.python.org/issue37179\n" 

1324 "* https://github.com/python/cpython/pull/28073\n" 

1325 "* https://docs.aiohttp.org/en/stable/" 

1326 "client_advanced.html#proxy-support\n" 

1327 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1328 ) from attr_exc 

1329 

1330 def _loop_supports_start_tls(self) -> bool: 

1331 try: 

1332 self._check_loop_for_start_tls() 

1333 except RuntimeError: 

1334 return False 

1335 else: 

1336 return True 

1337 

1338 def _warn_about_tls_in_tls( 

1339 self, 

1340 underlying_transport: asyncio.Transport, 

1341 req: ClientRequest, 

1342 ) -> None: 

1343 """Issue a warning if the requested URL has HTTPS scheme.""" 

1344 if req.request_info.url.scheme != "https": 

1345 return 

1346 

1347 # Check if uvloop is being used, which supports TLS in TLS, 

1348 # otherwise assume that asyncio's native transport is being used. 

1349 if type(underlying_transport).__module__.startswith("uvloop"): 

1350 return 

1351 

1352 # Support in asyncio was added in Python 3.11 (bpo-44011) 

1353 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr( 

1354 underlying_transport, 

1355 "_start_tls_compatible", 

1356 False, 

1357 ) 

1358 

1359 if asyncio_supports_tls_in_tls: 

1360 return 

1361 

1362 warnings.warn( 

1363 "An HTTPS request is being sent through an HTTPS proxy. " 

1364 "This support for TLS in TLS is known to be disabled " 

1365 "in the stdlib asyncio (Python <3.11). This is why you'll probably see " 

1366 "an error in the log below.\n\n" 

1367 "It is possible to enable it via monkeypatching. " 

1368 "For more details, see:\n" 

1369 "* https://bugs.python.org/issue37179\n" 

1370 "* https://github.com/python/cpython/pull/28073\n\n" 

1371 "You can temporarily patch this as follows:\n" 

1372 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1373 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1374 RuntimeWarning, 

1375 source=self, 

1376 # Why `4`? At least 3 of the calls in the stack originate 

1377 # from the methods in this class. 

1378 stacklevel=3, 

1379 ) 

1380 

1381 async def _start_tls_connection( 

1382 self, 

1383 underlying_transport: asyncio.Transport, 

1384 req: ClientRequest, 

1385 timeout: "ClientTimeout", 

1386 client_error: Type[Exception] = ClientConnectorError, 

1387 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1388 """Wrap the raw TCP transport with TLS.""" 

1389 tls_proto = self._factory() # Create a brand new proto for TLS 

1390 sslcontext = self._get_ssl_context(req) 

1391 if TYPE_CHECKING: 

1392 # _start_tls_connection is unreachable in the current code path 

1393 # if sslcontext is None. 

1394 assert sslcontext is not None 

1395 

1396 try: 

1397 async with ceil_timeout( 

1398 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1399 ): 

1400 try: 

1401 # ssl_shutdown_timeout is only available in Python 3.11+ 

1402 if sys.version_info >= (3, 11) and self._ssl_shutdown_timeout: 

1403 tls_transport = await self._loop.start_tls( 

1404 underlying_transport, 

1405 tls_proto, 

1406 sslcontext, 

1407 server_hostname=req.server_hostname or req.host, 

1408 ssl_handshake_timeout=timeout.total, 

1409 ssl_shutdown_timeout=self._ssl_shutdown_timeout, 

1410 ) 

1411 else: 

1412 tls_transport = await self._loop.start_tls( 

1413 underlying_transport, 

1414 tls_proto, 

1415 sslcontext, 

1416 server_hostname=req.server_hostname or req.host, 

1417 ssl_handshake_timeout=timeout.total, 

1418 ) 

1419 except BaseException: 

1420 # We need to close the underlying transport since 

1421 # `start_tls()` probably failed before it had a 

1422 # chance to do this: 

1423 if self._ssl_shutdown_timeout == 0: 

1424 underlying_transport.abort() 

1425 else: 

1426 underlying_transport.close() 

1427 raise 

1428 if isinstance(tls_transport, asyncio.Transport): 

1429 fingerprint = self._get_fingerprint(req) 

1430 if fingerprint: 

1431 try: 

1432 fingerprint.check(tls_transport) 

1433 except ServerFingerprintMismatch: 

1434 tls_transport.close() 

1435 if not self._cleanup_closed_disabled: 

1436 self._cleanup_closed_transports.append(tls_transport) 

1437 raise 

1438 except cert_errors as exc: 

1439 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1440 except ssl_errors as exc: 

1441 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1442 except OSError as exc: 

1443 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1444 raise 

1445 raise client_error(req.connection_key, exc) from exc 

1446 except TypeError as type_err: 

1447 # Example cause looks like this: 

1448 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1449 # object at 0x7f760615e460> is not supported by start_tls() 

1450 

1451 raise ClientConnectionError( 

1452 "Cannot initialize a TLS-in-TLS connection to host " 

1453 f"{req.host!s}:{req.port:d} through an underlying connection " 

1454 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1455 f"[{type_err!s}]" 

1456 ) from type_err 

1457 else: 

1458 if tls_transport is None: 

1459 msg = "Failed to start TLS (possibly caused by closing transport)" 

1460 raise client_error(req.connection_key, OSError(msg)) 

1461 tls_proto.connection_made( 

1462 tls_transport 

1463 ) # Kick the state machine of the new TLS protocol 

1464 

1465 return tls_transport, tls_proto 

1466 

1467 def _convert_hosts_to_addr_infos( 

1468 self, hosts: List[ResolveResult] 

1469 ) -> List[AddrInfoType]: 

1470 """Converts the list of hosts to a list of addr_infos. 

1471 

1472 The list of hosts is the result of a DNS lookup. The list of 

1473 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1474 """ 

1475 addr_infos: List[AddrInfoType] = [] 

1476 for hinfo in hosts: 

1477 host = hinfo["host"] 

1478 is_ipv6 = ":" in host 

1479 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1480 if self._family and self._family != family: 

1481 continue 

1482 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1483 addr_infos.append( 

1484 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1485 ) 

1486 return addr_infos 

1487 

1488 async def _create_direct_connection( 

1489 self, 

1490 req: ClientRequest, 

1491 traces: List["Trace"], 

1492 timeout: "ClientTimeout", 

1493 *, 

1494 client_error: Type[Exception] = ClientConnectorError, 

1495 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1496 sslcontext = self._get_ssl_context(req) 

1497 fingerprint = self._get_fingerprint(req) 

1498 

1499 host = req.url.raw_host 

1500 assert host is not None 

1501 # Replace multiple trailing dots with a single one. 

1502 # A trailing dot is only present for fully-qualified domain names. 

1503 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1504 if host.endswith(".."): 

1505 host = host.rstrip(".") + "." 

1506 port = req.port 

1507 assert port is not None 

1508 try: 

1509 # Cancelling this lookup should not cancel the underlying lookup 

1510 # or else the cancel event will get broadcast to all the waiters 

1511 # across all connections. 

1512 hosts = await self._resolve_host(host, port, traces=traces) 

1513 except OSError as exc: 

1514 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1515 raise 

1516 # in case of proxy it is not ClientProxyConnectionError 

1517 # it is problem of resolving proxy ip itself 

1518 raise ClientConnectorDNSError(req.connection_key, exc) from exc 

1519 

1520 last_exc: Optional[Exception] = None 

1521 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1522 while addr_infos: 

1523 # Strip trailing dots, certificates contain FQDN without dots. 

1524 # See https://github.com/aio-libs/aiohttp/issues/3636 

1525 server_hostname = ( 

1526 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1527 ) 

1528 

1529 try: 

1530 transp, proto = await self._wrap_create_connection( 

1531 self._factory, 

1532 timeout=timeout, 

1533 ssl=sslcontext, 

1534 addr_infos=addr_infos, 

1535 server_hostname=server_hostname, 

1536 req=req, 

1537 client_error=client_error, 

1538 ) 

1539 except (ClientConnectorError, asyncio.TimeoutError) as exc: 

1540 last_exc = exc 

1541 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1542 continue 

1543 

1544 if req.is_ssl() and fingerprint: 

1545 try: 

1546 fingerprint.check(transp) 

1547 except ServerFingerprintMismatch as exc: 

1548 transp.close() 

1549 if not self._cleanup_closed_disabled: 

1550 self._cleanup_closed_transports.append(transp) 

1551 last_exc = exc 

1552 # Remove the bad peer from the list of addr_infos 

1553 sock: socket.socket = transp.get_extra_info("socket") 

1554 bad_peer = sock.getpeername() 

1555 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1556 continue 

1557 

1558 return transp, proto 

1559 else: 

1560 assert last_exc is not None 

1561 raise last_exc 

1562 

1563 async def _create_proxy_connection( 

1564 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1565 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1566 self._fail_on_no_start_tls(req) 

1567 runtime_has_start_tls = self._loop_supports_start_tls() 

1568 

1569 headers: Dict[str, str] = {} 

1570 if req.proxy_headers is not None: 

1571 headers = req.proxy_headers # type: ignore[assignment] 

1572 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

1573 

1574 url = req.proxy 

1575 assert url is not None 

1576 proxy_req = ClientRequest( 

1577 hdrs.METH_GET, 

1578 url, 

1579 headers=headers, 

1580 auth=req.proxy_auth, 

1581 loop=self._loop, 

1582 ssl=req.ssl, 

1583 ) 

1584 

1585 # create connection to proxy server 

1586 transport, proto = await self._create_direct_connection( 

1587 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1588 ) 

1589 

1590 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

1591 if auth is not None: 

1592 if not req.is_ssl(): 

1593 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1594 else: 

1595 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1596 

1597 if req.is_ssl(): 

1598 if runtime_has_start_tls: 

1599 self._warn_about_tls_in_tls(transport, req) 

1600 

1601 # For HTTPS requests over HTTP proxy 

1602 # we must notify proxy to tunnel connection 

1603 # so we send CONNECT command: 

1604 # CONNECT www.python.org:443 HTTP/1.1 

1605 # Host: www.python.org 

1606 # 

1607 # next we must do TLS handshake and so on 

1608 # to do this we must wrap raw socket into secure one 

1609 # asyncio handles this perfectly 

1610 proxy_req.method = hdrs.METH_CONNECT 

1611 proxy_req.url = req.url 

1612 key = req.connection_key._replace( 

1613 proxy=None, proxy_auth=None, proxy_headers_hash=None 

1614 ) 

1615 conn = Connection(self, key, proto, self._loop) 

1616 proxy_resp = await proxy_req.send(conn) 

1617 try: 

1618 protocol = conn._protocol 

1619 assert protocol is not None 

1620 

1621 # read_until_eof=True will ensure the connection isn't closed 

1622 # once the response is received and processed allowing 

1623 # START_TLS to work on the connection below. 

1624 protocol.set_response_params( 

1625 read_until_eof=runtime_has_start_tls, 

1626 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1627 ) 

1628 resp = await proxy_resp.start(conn) 

1629 except BaseException: 

1630 proxy_resp.close() 

1631 conn.close() 

1632 raise 

1633 else: 

1634 conn._protocol = None 

1635 try: 

1636 if resp.status != 200: 

1637 message = resp.reason 

1638 if message is None: 

1639 message = HTTPStatus(resp.status).phrase 

1640 raise ClientHttpProxyError( 

1641 proxy_resp.request_info, 

1642 resp.history, 

1643 status=resp.status, 

1644 message=message, 

1645 headers=resp.headers, 

1646 ) 

1647 if not runtime_has_start_tls: 

1648 rawsock = transport.get_extra_info("socket", default=None) 

1649 if rawsock is None: 

1650 raise RuntimeError( 

1651 "Transport does not expose socket instance" 

1652 ) 

1653 # Duplicate the socket, so now we can close proxy transport 

1654 rawsock = rawsock.dup() 

1655 except BaseException: 

1656 # It shouldn't be closed in `finally` because it's fed to 

1657 # `loop.start_tls()` and the docs say not to touch it after 

1658 # passing there. 

1659 transport.close() 

1660 raise 

1661 finally: 

1662 if not runtime_has_start_tls: 

1663 transport.close() 

1664 

1665 if not runtime_has_start_tls: 

1666 # HTTP proxy with support for upgrade to HTTPS 

1667 sslcontext = self._get_ssl_context(req) 

1668 return await self._wrap_existing_connection( 

1669 self._factory, 

1670 timeout=timeout, 

1671 ssl=sslcontext, 

1672 sock=rawsock, 

1673 server_hostname=req.host, 

1674 req=req, 

1675 ) 

1676 

1677 return await self._start_tls_connection( 

1678 # Access the old transport for the last time before it's 

1679 # closed and forgotten forever: 

1680 transport, 

1681 req=req, 

1682 timeout=timeout, 

1683 ) 

1684 finally: 

1685 proxy_resp.close() 

1686 

1687 return transport, proto 

1688 

1689 

1690class UnixConnector(BaseConnector): 

1691 """Unix socket connector. 

1692 

1693 path - Unix socket path. 

1694 keepalive_timeout - (optional) Keep-alive timeout. 

1695 force_close - Set to True to force close and do reconnect 

1696 after each request (and between redirects). 

1697 limit - The total number of simultaneous connections. 

1698 limit_per_host - Number of simultaneous connections to one host. 

1699 loop - Optional event loop. 

1700 """ 

1701 

1702 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"}) 

1703 

1704 def __init__( 

1705 self, 

1706 path: str, 

1707 force_close: bool = False, 

1708 keepalive_timeout: Union[object, float, None] = sentinel, 

1709 limit: int = 100, 

1710 limit_per_host: int = 0, 

1711 loop: Optional[asyncio.AbstractEventLoop] = None, 

1712 ) -> None: 

1713 super().__init__( 

1714 force_close=force_close, 

1715 keepalive_timeout=keepalive_timeout, 

1716 limit=limit, 

1717 limit_per_host=limit_per_host, 

1718 loop=loop, 

1719 ) 

1720 self._path = path 

1721 

1722 @property 

1723 def path(self) -> str: 

1724 """Path to unix socket.""" 

1725 return self._path 

1726 

1727 async def _create_connection( 

1728 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1729 ) -> ResponseHandler: 

1730 try: 

1731 async with ceil_timeout( 

1732 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1733 ): 

1734 _, proto = await self._loop.create_unix_connection( 

1735 self._factory, self._path 

1736 ) 

1737 except OSError as exc: 

1738 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1739 raise 

1740 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1741 

1742 return proto 

1743 

1744 

1745class NamedPipeConnector(BaseConnector): 

1746 """Named pipe connector. 

1747 

1748 Only supported by the proactor event loop. 

1749 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1750 

1751 path - Windows named pipe path. 

1752 keepalive_timeout - (optional) Keep-alive timeout. 

1753 force_close - Set to True to force close and do reconnect 

1754 after each request (and between redirects). 

1755 limit - The total number of simultaneous connections. 

1756 limit_per_host - Number of simultaneous connections to one host. 

1757 loop - Optional event loop. 

1758 """ 

1759 

1760 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"}) 

1761 

1762 def __init__( 

1763 self, 

1764 path: str, 

1765 force_close: bool = False, 

1766 keepalive_timeout: Union[object, float, None] = sentinel, 

1767 limit: int = 100, 

1768 limit_per_host: int = 0, 

1769 loop: Optional[asyncio.AbstractEventLoop] = None, 

1770 ) -> None: 

1771 super().__init__( 

1772 force_close=force_close, 

1773 keepalive_timeout=keepalive_timeout, 

1774 limit=limit, 

1775 limit_per_host=limit_per_host, 

1776 loop=loop, 

1777 ) 

1778 if not isinstance( 

1779 self._loop, 

1780 asyncio.ProactorEventLoop, # type: ignore[attr-defined] 

1781 ): 

1782 raise RuntimeError( 

1783 "Named Pipes only available in proactor loop under windows" 

1784 ) 

1785 self._path = path 

1786 

1787 @property 

1788 def path(self) -> str: 

1789 """Path to the named pipe.""" 

1790 return self._path 

1791 

1792 async def _create_connection( 

1793 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1794 ) -> ResponseHandler: 

1795 try: 

1796 async with ceil_timeout( 

1797 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1798 ): 

1799 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1800 self._factory, self._path 

1801 ) 

1802 # the drain is required so that the connection_made is called 

1803 # and transport is set otherwise it is not set before the 

1804 # `assert conn.transport is not None` 

1805 # in client.py's _request method 

1806 await asyncio.sleep(0) 

1807 # other option is to manually set transport like 

1808 # `proto.transport = trans` 

1809 except OSError as exc: 

1810 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1811 raise 

1812 raise ClientConnectorError(req.connection_key, exc) from exc 

1813 

1814 return cast(ResponseHandler, proto)