Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

734 statements  

1import asyncio 

2import functools 

3import logging 

4import random 

5import socket 

6import sys 

7import traceback 

8import warnings 

9from collections import OrderedDict, defaultdict, deque 

10from contextlib import suppress 

11from http import HTTPStatus 

12from itertools import chain, cycle, islice 

13from time import monotonic 

14from types import TracebackType 

15from typing import ( 

16 TYPE_CHECKING, 

17 Any, 

18 Awaitable, 

19 Callable, 

20 DefaultDict, 

21 Deque, 

22 Dict, 

23 Iterator, 

24 List, 

25 Literal, 

26 Optional, 

27 Sequence, 

28 Set, 

29 Tuple, 

30 Type, 

31 Union, 

32 cast, 

33) 

34 

35import aiohappyeyeballs 

36from aiohappyeyeballs import AddrInfoType, SocketFactoryType 

37 

38from . import hdrs, helpers 

39from .abc import AbstractResolver, ResolveResult 

40from .client_exceptions import ( 

41 ClientConnectionError, 

42 ClientConnectorCertificateError, 

43 ClientConnectorDNSError, 

44 ClientConnectorError, 

45 ClientConnectorSSLError, 

46 ClientHttpProxyError, 

47 ClientProxyConnectionError, 

48 ServerFingerprintMismatch, 

49 UnixClientConnectorError, 

50 cert_errors, 

51 ssl_errors, 

52) 

53from .client_proto import ResponseHandler 

54from .client_reqrep import SSL_ALLOWED_TYPES, ClientRequest, Fingerprint 

55from .helpers import ( 

56 _SENTINEL, 

57 ceil_timeout, 

58 is_ip_address, 

59 sentinel, 

60 set_exception, 

61 set_result, 

62) 

63from .resolver import DefaultResolver 

64 

65if sys.version_info >= (3, 12): 

66 from collections.abc import Buffer 

67else: 

68 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"] 

69 

70if TYPE_CHECKING: 

71 import ssl 

72 

73 SSLContext = ssl.SSLContext 

74else: 

75 try: 

76 import ssl 

77 

78 SSLContext = ssl.SSLContext 

79 except ImportError: # pragma: no cover 

80 ssl = None # type: ignore[assignment] 

81 SSLContext = object # type: ignore[misc,assignment] 

82 

83EMPTY_SCHEMA_SET = frozenset({""}) 

84HTTP_SCHEMA_SET = frozenset({"http", "https"}) 

85WS_SCHEMA_SET = frozenset({"ws", "wss"}) 

86 

87HTTP_AND_EMPTY_SCHEMA_SET = HTTP_SCHEMA_SET | EMPTY_SCHEMA_SET 

88HIGH_LEVEL_SCHEMA_SET = HTTP_AND_EMPTY_SCHEMA_SET | WS_SCHEMA_SET 

89 

90NEEDS_CLEANUP_CLOSED = (3, 13, 0) <= sys.version_info < ( 

91 3, 

92 13, 

93 1, 

94) or sys.version_info < (3, 12, 7) 

95# Cleanup closed is no longer needed after https://github.com/python/cpython/pull/118960 

96# which first appeared in Python 3.12.7 and 3.13.1 

97 

98 

99__all__ = ( 

100 "BaseConnector", 

101 "TCPConnector", 

102 "UnixConnector", 

103 "NamedPipeConnector", 

104 "AddrInfoType", 

105 "SocketFactoryType", 

106) 

107 

108 

109if TYPE_CHECKING: 

110 from .client import ClientTimeout 

111 from .client_reqrep import ConnectionKey 

112 from .tracing import Trace 

113 

114 

115class Connection: 

116 """Represents a single connection.""" 

117 

118 __slots__ = ( 

119 "_key", 

120 "_connector", 

121 "_loop", 

122 "_protocol", 

123 "_callbacks", 

124 "_source_traceback", 

125 ) 

126 

127 def __init__( 

128 self, 

129 connector: "BaseConnector", 

130 key: "ConnectionKey", 

131 protocol: ResponseHandler, 

132 loop: asyncio.AbstractEventLoop, 

133 ) -> None: 

134 self._key = key 

135 self._connector = connector 

136 self._loop = loop 

137 self._protocol: Optional[ResponseHandler] = protocol 

138 self._callbacks: List[Callable[[], None]] = [] 

139 self._source_traceback = ( 

140 traceback.extract_stack(sys._getframe(1)) if loop.get_debug() else None 

141 ) 

142 

143 def __repr__(self) -> str: 

144 return f"Connection<{self._key}>" 

145 

146 def __del__(self, _warnings: Any = warnings) -> None: 

147 if self._protocol is not None: 

148 _warnings.warn( 

149 f"Unclosed connection {self!r}", ResourceWarning, source=self 

150 ) 

151 if self._loop.is_closed(): 

152 return 

153 

154 self._connector._release(self._key, self._protocol, should_close=True) 

155 

156 context = {"client_connection": self, "message": "Unclosed connection"} 

157 if self._source_traceback is not None: 

158 context["source_traceback"] = self._source_traceback 

159 self._loop.call_exception_handler(context) 

160 

161 def __bool__(self) -> Literal[True]: 

162 """Force subclasses to not be falsy, to make checks simpler.""" 

163 return True 

164 

165 @property 

166 def transport(self) -> Optional[asyncio.Transport]: 

167 if self._protocol is None: 

168 return None 

169 return self._protocol.transport 

170 

171 @property 

172 def protocol(self) -> Optional[ResponseHandler]: 

173 return self._protocol 

174 

175 def add_callback(self, callback: Callable[[], None]) -> None: 

176 if callback is not None: 

177 self._callbacks.append(callback) 

178 

179 def _notify_release(self) -> None: 

180 callbacks, self._callbacks = self._callbacks[:], [] 

181 

182 for cb in callbacks: 

183 with suppress(Exception): 

184 cb() 

185 

186 def close(self) -> None: 

187 self._notify_release() 

188 

189 if self._protocol is not None: 

190 self._connector._release(self._key, self._protocol, should_close=True) 

191 self._protocol = None 

192 

193 def release(self) -> None: 

194 self._notify_release() 

195 

196 if self._protocol is not None: 

197 self._connector._release(self._key, self._protocol) 

198 self._protocol = None 

199 

200 @property 

201 def closed(self) -> bool: 

202 return self._protocol is None or not self._protocol.is_connected() 

203 

204 

205class _TransportPlaceholder: 

206 """placeholder for BaseConnector.connect function""" 

207 

208 __slots__ = ("closed",) 

209 

210 def __init__(self, closed_future: asyncio.Future[Optional[Exception]]) -> None: 

211 """Initialize a placeholder for a transport.""" 

212 self.closed = closed_future 

213 

214 def close(self) -> None: 

215 """Close the placeholder.""" 

216 

217 

218class BaseConnector: 

219 """Base connector class. 

220 

221 keepalive_timeout - (optional) Keep-alive timeout. 

222 force_close - Set to True to force close and do reconnect 

223 after each request (and between redirects). 

224 limit - The total number of simultaneous connections. 

225 limit_per_host - Number of simultaneous connections to one host. 

226 enable_cleanup_closed - Enables clean-up closed ssl transports. 

227 Disabled by default. 

228 timeout_ceil_threshold - Trigger ceiling of timeout values when 

229 it's above timeout_ceil_threshold. 

230 loop - Optional event loop. 

231 """ 

232 

233 _closed = True # prevent AttributeError in __del__ if ctor was failed 

234 _source_traceback = None 

235 

236 # abort transport after 2 seconds (cleanup broken connections) 

237 _cleanup_closed_period = 2.0 

238 

239 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET 

240 

241 def __init__( 

242 self, 

243 *, 

244 keepalive_timeout: Union[_SENTINEL, None, float] = sentinel, 

245 force_close: bool = False, 

246 limit: int = 100, 

247 limit_per_host: int = 0, 

248 enable_cleanup_closed: bool = False, 

249 timeout_ceil_threshold: float = 5, 

250 ) -> None: 

251 if force_close: 

252 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

253 raise ValueError( 

254 "keepalive_timeout cannot be set if force_close is True" 

255 ) 

256 else: 

257 if keepalive_timeout is sentinel: 

258 keepalive_timeout = 15.0 

259 

260 self._timeout_ceil_threshold = timeout_ceil_threshold 

261 

262 loop = asyncio.get_running_loop() 

263 

264 self._closed = False 

265 if loop.get_debug(): 

266 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

267 

268 # Connection pool of reusable connections. 

269 # We use a deque to store connections because it has O(1) popleft() 

270 # and O(1) append() operations to implement a FIFO queue. 

271 self._conns: DefaultDict[ 

272 ConnectionKey, Deque[Tuple[ResponseHandler, float]] 

273 ] = defaultdict(deque) 

274 self._limit = limit 

275 self._limit_per_host = limit_per_host 

276 self._acquired: Set[ResponseHandler] = set() 

277 self._acquired_per_host: DefaultDict[ConnectionKey, Set[ResponseHandler]] = ( 

278 defaultdict(set) 

279 ) 

280 self._keepalive_timeout = cast(float, keepalive_timeout) 

281 self._force_close = force_close 

282 

283 # {host_key: FIFO list of waiters} 

284 # The FIFO is implemented with an OrderedDict with None keys because 

285 # python does not have an ordered set. 

286 self._waiters: DefaultDict[ 

287 ConnectionKey, OrderedDict[asyncio.Future[None], None] 

288 ] = defaultdict(OrderedDict) 

289 

290 self._loop = loop 

291 self._factory = functools.partial(ResponseHandler, loop=loop) 

292 

293 # start keep-alive connection cleanup task 

294 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

295 

296 # start cleanup closed transports task 

297 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

298 

299 if enable_cleanup_closed and not NEEDS_CLEANUP_CLOSED: 

300 warnings.warn( 

301 "enable_cleanup_closed ignored because " 

302 "https://github.com/python/cpython/pull/118960 is fixed " 

303 f"in Python version {sys.version_info}", 

304 DeprecationWarning, 

305 stacklevel=2, 

306 ) 

307 enable_cleanup_closed = False 

308 

309 self._cleanup_closed_disabled = not enable_cleanup_closed 

310 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

311 

312 self._placeholder_future: asyncio.Future[Optional[Exception]] = ( 

313 loop.create_future() 

314 ) 

315 self._placeholder_future.set_result(None) 

316 self._cleanup_closed() 

317 

318 def __del__(self, _warnings: Any = warnings) -> None: 

319 if self._closed: 

320 return 

321 if not self._conns: 

322 return 

323 

324 conns = [repr(c) for c in self._conns.values()] 

325 

326 self._close_immediately() 

327 

328 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self) 

329 context = { 

330 "connector": self, 

331 "connections": conns, 

332 "message": "Unclosed connector", 

333 } 

334 if self._source_traceback is not None: 

335 context["source_traceback"] = self._source_traceback 

336 self._loop.call_exception_handler(context) 

337 

338 async def __aenter__(self) -> "BaseConnector": 

339 return self 

340 

341 async def __aexit__( 

342 self, 

343 exc_type: Optional[Type[BaseException]] = None, 

344 exc_value: Optional[BaseException] = None, 

345 exc_traceback: Optional[TracebackType] = None, 

346 ) -> None: 

347 await self.close() 

348 

349 @property 

350 def force_close(self) -> bool: 

351 """Ultimately close connection on releasing if True.""" 

352 return self._force_close 

353 

354 @property 

355 def limit(self) -> int: 

356 """The total number for simultaneous connections. 

357 

358 If limit is 0 the connector has no limit. 

359 The default limit size is 100. 

360 """ 

361 return self._limit 

362 

363 @property 

364 def limit_per_host(self) -> int: 

365 """The limit for simultaneous connections to the same endpoint. 

366 

367 Endpoints are the same if they are have equal 

368 (host, port, is_ssl) triple. 

369 """ 

370 return self._limit_per_host 

371 

372 def _cleanup(self) -> None: 

373 """Cleanup unused transports.""" 

374 if self._cleanup_handle: 

375 self._cleanup_handle.cancel() 

376 # _cleanup_handle should be unset, otherwise _release() will not 

377 # recreate it ever! 

378 self._cleanup_handle = None 

379 

380 now = monotonic() 

381 timeout = self._keepalive_timeout 

382 

383 if self._conns: 

384 connections = defaultdict(deque) 

385 deadline = now - timeout 

386 for key, conns in self._conns.items(): 

387 alive: Deque[Tuple[ResponseHandler, float]] = deque() 

388 for proto, use_time in conns: 

389 if proto.is_connected() and use_time - deadline >= 0: 

390 alive.append((proto, use_time)) 

391 continue 

392 transport = proto.transport 

393 proto.close() 

394 if not self._cleanup_closed_disabled and key.is_ssl: 

395 self._cleanup_closed_transports.append(transport) 

396 

397 if alive: 

398 connections[key] = alive 

399 

400 self._conns = connections 

401 

402 if self._conns: 

403 self._cleanup_handle = helpers.weakref_handle( 

404 self, 

405 "_cleanup", 

406 timeout, 

407 self._loop, 

408 timeout_ceil_threshold=self._timeout_ceil_threshold, 

409 ) 

410 

411 def _cleanup_closed(self) -> None: 

412 """Double confirmation for transport close. 

413 

414 Some broken ssl servers may leave socket open without proper close. 

415 """ 

416 if self._cleanup_closed_handle: 

417 self._cleanup_closed_handle.cancel() 

418 

419 for transport in self._cleanup_closed_transports: 

420 if transport is not None: 

421 transport.abort() 

422 

423 self._cleanup_closed_transports = [] 

424 

425 if not self._cleanup_closed_disabled: 

426 self._cleanup_closed_handle = helpers.weakref_handle( 

427 self, 

428 "_cleanup_closed", 

429 self._cleanup_closed_period, 

430 self._loop, 

431 timeout_ceil_threshold=self._timeout_ceil_threshold, 

432 ) 

433 

434 async def close(self) -> None: 

435 """Close all opened transports.""" 

436 waiters = self._close_immediately() 

437 if waiters: 

438 results = await asyncio.gather(*waiters, return_exceptions=True) 

439 for res in results: 

440 if isinstance(res, Exception): 

441 err_msg = "Error while closing connector: " + repr(res) 

442 logging.error(err_msg) 

443 

444 def _close_immediately(self) -> List[Awaitable[object]]: 

445 waiters: List[Awaitable[object]] = [] 

446 

447 if self._closed: 

448 return waiters 

449 

450 self._closed = True 

451 

452 try: 

453 if self._loop.is_closed(): 

454 return waiters 

455 

456 # cancel cleanup task 

457 if self._cleanup_handle: 

458 self._cleanup_handle.cancel() 

459 

460 # cancel cleanup close task 

461 if self._cleanup_closed_handle: 

462 self._cleanup_closed_handle.cancel() 

463 

464 for data in self._conns.values(): 

465 for proto, t0 in data: 

466 proto.close() 

467 waiters.append(proto.closed) 

468 

469 for proto in self._acquired: 

470 proto.close() 

471 waiters.append(proto.closed) 

472 

473 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures 

474 for transport in self._cleanup_closed_transports: 

475 if transport is not None: 

476 transport.abort() 

477 

478 return waiters 

479 

480 finally: 

481 self._conns.clear() 

482 self._acquired.clear() 

483 for keyed_waiters in self._waiters.values(): 

484 for keyed_waiter in keyed_waiters: 

485 keyed_waiter.cancel() 

486 self._waiters.clear() 

487 self._cleanup_handle = None 

488 self._cleanup_closed_transports.clear() 

489 self._cleanup_closed_handle = None 

490 

491 @property 

492 def closed(self) -> bool: 

493 """Is connector closed. 

494 

495 A readonly property. 

496 """ 

497 return self._closed 

498 

499 def _available_connections(self, key: "ConnectionKey") -> int: 

500 """ 

501 Return number of available connections. 

502 

503 The limit, limit_per_host and the connection key are taken into account. 

504 

505 If it returns less than 1 means that there are no connections 

506 available. 

507 """ 

508 # check total available connections 

509 # If there are no limits, this will always return 1 

510 total_remain = 1 

511 

512 if self._limit and (total_remain := self._limit - len(self._acquired)) <= 0: 

513 return total_remain 

514 

515 # check limit per host 

516 if host_remain := self._limit_per_host: 

517 if acquired := self._acquired_per_host.get(key): 

518 host_remain -= len(acquired) 

519 if total_remain > host_remain: 

520 return host_remain 

521 

522 return total_remain 

523 

524 async def connect( 

525 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

526 ) -> Connection: 

527 """Get from pool or create new connection.""" 

528 key = req.connection_key 

529 if (conn := await self._get(key, traces)) is not None: 

530 # If we do not have to wait and we can get a connection from the pool 

531 # we can avoid the timeout ceil logic and directly return the connection 

532 return conn 

533 

534 async with ceil_timeout(timeout.connect, timeout.ceil_threshold): 

535 if self._available_connections(key) <= 0: 

536 await self._wait_for_available_connection(key, traces) 

537 if (conn := await self._get(key, traces)) is not None: 

538 return conn 

539 

540 placeholder = cast( 

541 ResponseHandler, _TransportPlaceholder(self._placeholder_future) 

542 ) 

543 self._acquired.add(placeholder) 

544 if self._limit_per_host: 

545 self._acquired_per_host[key].add(placeholder) 

546 

547 try: 

548 # Traces are done inside the try block to ensure that the 

549 # that the placeholder is still cleaned up if an exception 

550 # is raised. 

551 if traces: 

552 for trace in traces: 

553 await trace.send_connection_create_start() 

554 proto = await self._create_connection(req, traces, timeout) 

555 if traces: 

556 for trace in traces: 

557 await trace.send_connection_create_end() 

558 except BaseException: 

559 self._release_acquired(key, placeholder) 

560 raise 

561 else: 

562 if self._closed: 

563 proto.close() 

564 raise ClientConnectionError("Connector is closed.") 

565 

566 # The connection was successfully created, drop the placeholder 

567 # and add the real connection to the acquired set. There should 

568 # be no awaits after the proto is added to the acquired set 

569 # to ensure that the connection is not left in the acquired set 

570 # on cancellation. 

571 self._acquired.remove(placeholder) 

572 self._acquired.add(proto) 

573 if self._limit_per_host: 

574 acquired_per_host = self._acquired_per_host[key] 

575 acquired_per_host.remove(placeholder) 

576 acquired_per_host.add(proto) 

577 return Connection(self, key, proto, self._loop) 

578 

579 async def _wait_for_available_connection( 

580 self, key: "ConnectionKey", traces: List["Trace"] 

581 ) -> None: 

582 """Wait for an available connection slot.""" 

583 # We loop here because there is a race between 

584 # the connection limit check and the connection 

585 # being acquired. If the connection is acquired 

586 # between the check and the await statement, we 

587 # need to loop again to check if the connection 

588 # slot is still available. 

589 attempts = 0 

590 while True: 

591 fut: asyncio.Future[None] = self._loop.create_future() 

592 keyed_waiters = self._waiters[key] 

593 keyed_waiters[fut] = None 

594 if attempts: 

595 # If we have waited before, we need to move the waiter 

596 # to the front of the queue as otherwise we might get 

597 # starved and hit the timeout. 

598 keyed_waiters.move_to_end(fut, last=False) 

599 

600 try: 

601 # Traces happen in the try block to ensure that the 

602 # the waiter is still cleaned up if an exception is raised. 

603 if traces: 

604 for trace in traces: 

605 await trace.send_connection_queued_start() 

606 await fut 

607 if traces: 

608 for trace in traces: 

609 await trace.send_connection_queued_end() 

610 finally: 

611 # pop the waiter from the queue if its still 

612 # there and not already removed by _release_waiter 

613 keyed_waiters.pop(fut, None) 

614 if not self._waiters.get(key, True): 

615 del self._waiters[key] 

616 

617 if self._available_connections(key) > 0: 

618 break 

619 attempts += 1 

620 

621 async def _get( 

622 self, key: "ConnectionKey", traces: List["Trace"] 

623 ) -> Optional[Connection]: 

624 """Get next reusable connection for the key or None. 

625 

626 The connection will be marked as acquired. 

627 """ 

628 if (conns := self._conns.get(key)) is None: 

629 return None 

630 

631 t1 = monotonic() 

632 while conns: 

633 proto, t0 = conns.popleft() 

634 # We will we reuse the connection if its connected and 

635 # the keepalive timeout has not been exceeded 

636 if proto.is_connected() and t1 - t0 <= self._keepalive_timeout: 

637 if not conns: 

638 # The very last connection was reclaimed: drop the key 

639 del self._conns[key] 

640 self._acquired.add(proto) 

641 if self._limit_per_host: 

642 self._acquired_per_host[key].add(proto) 

643 if traces: 

644 for trace in traces: 

645 try: 

646 await trace.send_connection_reuseconn() 

647 except BaseException: 

648 self._release_acquired(key, proto) 

649 raise 

650 return Connection(self, key, proto, self._loop) 

651 

652 # Connection cannot be reused, close it 

653 transport = proto.transport 

654 proto.close() 

655 # only for SSL transports 

656 if not self._cleanup_closed_disabled and key.is_ssl: 

657 self._cleanup_closed_transports.append(transport) 

658 

659 # No more connections: drop the key 

660 del self._conns[key] 

661 return None 

662 

663 def _release_waiter(self) -> None: 

664 """ 

665 Iterates over all waiters until one to be released is found. 

666 

667 The one to be released is not finished and 

668 belongs to a host that has available connections. 

669 """ 

670 if not self._waiters: 

671 return 

672 

673 # Having the dict keys ordered this avoids to iterate 

674 # at the same order at each call. 

675 queues = list(self._waiters) 

676 random.shuffle(queues) 

677 

678 for key in queues: 

679 if self._available_connections(key) < 1: 

680 continue 

681 

682 waiters = self._waiters[key] 

683 while waiters: 

684 waiter, _ = waiters.popitem(last=False) 

685 if not waiter.done(): 

686 waiter.set_result(None) 

687 return 

688 

689 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

690 """Release acquired connection.""" 

691 if self._closed: 

692 # acquired connection is already released on connector closing 

693 return 

694 

695 self._acquired.discard(proto) 

696 if self._limit_per_host and (conns := self._acquired_per_host.get(key)): 

697 conns.discard(proto) 

698 if not conns: 

699 del self._acquired_per_host[key] 

700 self._release_waiter() 

701 

702 def _release( 

703 self, 

704 key: "ConnectionKey", 

705 protocol: ResponseHandler, 

706 *, 

707 should_close: bool = False, 

708 ) -> None: 

709 if self._closed: 

710 # acquired connection is already released on connector closing 

711 return 

712 

713 self._release_acquired(key, protocol) 

714 

715 if self._force_close or should_close or protocol.should_close: 

716 transport = protocol.transport 

717 protocol.close() 

718 if key.is_ssl and not self._cleanup_closed_disabled: 

719 self._cleanup_closed_transports.append(transport) 

720 return 

721 

722 self._conns[key].append((protocol, monotonic())) 

723 

724 if self._cleanup_handle is None: 

725 self._cleanup_handle = helpers.weakref_handle( 

726 self, 

727 "_cleanup", 

728 self._keepalive_timeout, 

729 self._loop, 

730 timeout_ceil_threshold=self._timeout_ceil_threshold, 

731 ) 

732 

733 async def _create_connection( 

734 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

735 ) -> ResponseHandler: 

736 raise NotImplementedError() 

737 

738 

739class _DNSCacheTable: 

740 def __init__(self, ttl: Optional[float] = None) -> None: 

741 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[ResolveResult], int]] = {} 

742 self._timestamps: Dict[Tuple[str, int], float] = {} 

743 self._ttl = ttl 

744 

745 def __contains__(self, host: object) -> bool: 

746 return host in self._addrs_rr 

747 

748 def add(self, key: Tuple[str, int], addrs: List[ResolveResult]) -> None: 

749 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

750 

751 if self._ttl is not None: 

752 self._timestamps[key] = monotonic() 

753 

754 def remove(self, key: Tuple[str, int]) -> None: 

755 self._addrs_rr.pop(key, None) 

756 

757 if self._ttl is not None: 

758 self._timestamps.pop(key, None) 

759 

760 def clear(self) -> None: 

761 self._addrs_rr.clear() 

762 self._timestamps.clear() 

763 

764 def next_addrs(self, key: Tuple[str, int]) -> List[ResolveResult]: 

765 loop, length = self._addrs_rr[key] 

766 addrs = list(islice(loop, length)) 

767 # Consume one more element to shift internal state of `cycle` 

768 next(loop) 

769 return addrs 

770 

771 def expired(self, key: Tuple[str, int]) -> bool: 

772 if self._ttl is None: 

773 return False 

774 

775 return self._timestamps[key] + self._ttl < monotonic() 

776 

777 

778def _make_ssl_context(verified: bool) -> SSLContext: 

779 """Create SSL context. 

780 

781 This method is not async-friendly and should be called from a thread 

782 because it will load certificates from disk and do other blocking I/O. 

783 """ 

784 if ssl is None: 

785 # No ssl support 

786 return None # type: ignore[unreachable] 

787 if verified: 

788 sslcontext = ssl.create_default_context() 

789 else: 

790 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

791 sslcontext.options |= ssl.OP_NO_SSLv2 

792 sslcontext.options |= ssl.OP_NO_SSLv3 

793 sslcontext.check_hostname = False 

794 sslcontext.verify_mode = ssl.CERT_NONE 

795 sslcontext.options |= ssl.OP_NO_COMPRESSION 

796 sslcontext.set_default_verify_paths() 

797 sslcontext.set_alpn_protocols(("http/1.1",)) 

798 return sslcontext 

799 

800 

801# The default SSLContext objects are created at import time 

802# since they do blocking I/O to load certificates from disk, 

803# and imports should always be done before the event loop starts 

804# or in a thread. 

805_SSL_CONTEXT_VERIFIED = _make_ssl_context(True) 

806_SSL_CONTEXT_UNVERIFIED = _make_ssl_context(False) 

807 

808 

809class TCPConnector(BaseConnector): 

810 """TCP connector. 

811 

812 verify_ssl - Set to True to check ssl certifications. 

813 fingerprint - Pass the binary sha256 

814 digest of the expected certificate in DER format to verify 

815 that the certificate the server presents matches. See also 

816 https://en.wikipedia.org/wiki/HTTP_Public_Key_Pinning 

817 resolver - Enable DNS lookups and use this 

818 resolver 

819 use_dns_cache - Use memory cache for DNS lookups. 

820 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

821 family - socket address family 

822 local_addr - local tuple of (host, port) to bind socket to 

823 

824 keepalive_timeout - (optional) Keep-alive timeout. 

825 force_close - Set to True to force close and do reconnect 

826 after each request (and between redirects). 

827 limit - The total number of simultaneous connections. 

828 limit_per_host - Number of simultaneous connections to one host. 

829 enable_cleanup_closed - Enables clean-up closed ssl transports. 

830 Disabled by default. 

831 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

832 as defined in RFC 8305. To disable 

833 the happy eyeballs algorithm, set to None. 

834 interleave - “First Address Family Count” as defined in RFC 8305 

835 loop - Optional event loop. 

836 socket_factory - A SocketFactoryType function that, if supplied, 

837 will be used to create sockets given an 

838 AddrInfoType. 

839 """ 

840 

841 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"tcp"}) 

842 

843 def __init__( 

844 self, 

845 *, 

846 use_dns_cache: bool = True, 

847 ttl_dns_cache: Optional[int] = 10, 

848 family: socket.AddressFamily = socket.AddressFamily.AF_UNSPEC, 

849 ssl: Union[bool, Fingerprint, SSLContext] = True, 

850 local_addr: Optional[Tuple[str, int]] = None, 

851 resolver: Optional[AbstractResolver] = None, 

852 keepalive_timeout: Union[None, float, _SENTINEL] = sentinel, 

853 force_close: bool = False, 

854 limit: int = 100, 

855 limit_per_host: int = 0, 

856 enable_cleanup_closed: bool = False, 

857 timeout_ceil_threshold: float = 5, 

858 happy_eyeballs_delay: Optional[float] = 0.25, 

859 interleave: Optional[int] = None, 

860 socket_factory: Optional[SocketFactoryType] = None, 

861 ): 

862 super().__init__( 

863 keepalive_timeout=keepalive_timeout, 

864 force_close=force_close, 

865 limit=limit, 

866 limit_per_host=limit_per_host, 

867 enable_cleanup_closed=enable_cleanup_closed, 

868 timeout_ceil_threshold=timeout_ceil_threshold, 

869 ) 

870 

871 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

872 raise TypeError( 

873 "ssl should be SSLContext, Fingerprint, or bool, " 

874 "got {!r} instead.".format(ssl) 

875 ) 

876 self._ssl = ssl 

877 if resolver is None: 

878 resolver = DefaultResolver() 

879 self._resolver: AbstractResolver = resolver 

880 

881 self._use_dns_cache = use_dns_cache 

882 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 

883 self._throttle_dns_futures: Dict[Tuple[str, int], Set[asyncio.Future[None]]] = ( 

884 {} 

885 ) 

886 self._family = family 

887 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

888 self._happy_eyeballs_delay = happy_eyeballs_delay 

889 self._interleave = interleave 

890 self._resolve_host_tasks: Set["asyncio.Task[List[ResolveResult]]"] = set() 

891 self._socket_factory = socket_factory 

892 

893 def _close_immediately(self) -> List[Awaitable[object]]: 

894 for fut in chain.from_iterable(self._throttle_dns_futures.values()): 

895 fut.cancel() 

896 

897 waiters = super()._close_immediately() 

898 

899 for t in self._resolve_host_tasks: 

900 t.cancel() 

901 waiters.append(t) 

902 

903 return waiters 

904 

905 @property 

906 def family(self) -> int: 

907 """Socket family like AF_INET.""" 

908 return self._family 

909 

910 @property 

911 def use_dns_cache(self) -> bool: 

912 """True if local DNS caching is enabled.""" 

913 return self._use_dns_cache 

914 

915 def clear_dns_cache( 

916 self, host: Optional[str] = None, port: Optional[int] = None 

917 ) -> None: 

918 """Remove specified host/port or clear all dns local cache.""" 

919 if host is not None and port is not None: 

920 self._cached_hosts.remove((host, port)) 

921 elif host is not None or port is not None: 

922 raise ValueError("either both host and port or none of them are allowed") 

923 else: 

924 self._cached_hosts.clear() 

925 

926 async def _resolve_host( 

927 self, host: str, port: int, traces: Optional[Sequence["Trace"]] = None 

928 ) -> List[ResolveResult]: 

929 """Resolve host and return list of addresses.""" 

930 if is_ip_address(host): 

931 return [ 

932 { 

933 "hostname": host, 

934 "host": host, 

935 "port": port, 

936 "family": self._family, 

937 "proto": 0, 

938 "flags": 0, 

939 } 

940 ] 

941 

942 if not self._use_dns_cache: 

943 if traces: 

944 for trace in traces: 

945 await trace.send_dns_resolvehost_start(host) 

946 

947 res = await self._resolver.resolve(host, port, family=self._family) 

948 

949 if traces: 

950 for trace in traces: 

951 await trace.send_dns_resolvehost_end(host) 

952 

953 return res 

954 

955 key = (host, port) 

956 if key in self._cached_hosts and not self._cached_hosts.expired(key): 

957 # get result early, before any await (#4014) 

958 result = self._cached_hosts.next_addrs(key) 

959 

960 if traces: 

961 for trace in traces: 

962 await trace.send_dns_cache_hit(host) 

963 return result 

964 

965 futures: Set[asyncio.Future[None]] 

966 # 

967 # If multiple connectors are resolving the same host, we wait 

968 # for the first one to resolve and then use the result for all of them. 

969 # We use a throttle to ensure that we only resolve the host once 

970 # and then use the result for all the waiters. 

971 # 

972 if key in self._throttle_dns_futures: 

973 # get futures early, before any await (#4014) 

974 futures = self._throttle_dns_futures[key] 

975 future: asyncio.Future[None] = self._loop.create_future() 

976 futures.add(future) 

977 if traces: 

978 for trace in traces: 

979 await trace.send_dns_cache_hit(host) 

980 try: 

981 await future 

982 finally: 

983 futures.discard(future) 

984 return self._cached_hosts.next_addrs(key) 

985 

986 # update dict early, before any await (#4014) 

987 self._throttle_dns_futures[key] = futures = set() 

988 # In this case we need to create a task to ensure that we can shield 

989 # the task from cancellation as cancelling this lookup should not cancel 

990 # the underlying lookup or else the cancel event will get broadcast to 

991 # all the waiters across all connections. 

992 # 

993 coro = self._resolve_host_with_throttle(key, host, port, futures, traces) 

994 loop = asyncio.get_running_loop() 

995 if sys.version_info >= (3, 12): 

996 # Optimization for Python 3.12, try to send immediately 

997 resolved_host_task = asyncio.Task(coro, loop=loop, eager_start=True) 

998 else: 

999 resolved_host_task = loop.create_task(coro) 

1000 

1001 if not resolved_host_task.done(): 

1002 self._resolve_host_tasks.add(resolved_host_task) 

1003 resolved_host_task.add_done_callback(self._resolve_host_tasks.discard) 

1004 

1005 try: 

1006 return await asyncio.shield(resolved_host_task) 

1007 except asyncio.CancelledError: 

1008 

1009 def drop_exception(fut: "asyncio.Future[List[ResolveResult]]") -> None: 

1010 with suppress(Exception, asyncio.CancelledError): 

1011 fut.result() 

1012 

1013 resolved_host_task.add_done_callback(drop_exception) 

1014 raise 

1015 

1016 async def _resolve_host_with_throttle( 

1017 self, 

1018 key: Tuple[str, int], 

1019 host: str, 

1020 port: int, 

1021 futures: Set[asyncio.Future[None]], 

1022 traces: Optional[Sequence["Trace"]], 

1023 ) -> List[ResolveResult]: 

1024 """Resolve host and set result for all waiters. 

1025 

1026 This method must be run in a task and shielded from cancellation 

1027 to avoid cancelling the underlying lookup. 

1028 """ 

1029 try: 

1030 if traces: 

1031 for trace in traces: 

1032 await trace.send_dns_cache_miss(host) 

1033 

1034 for trace in traces: 

1035 await trace.send_dns_resolvehost_start(host) 

1036 

1037 addrs = await self._resolver.resolve(host, port, family=self._family) 

1038 if traces: 

1039 for trace in traces: 

1040 await trace.send_dns_resolvehost_end(host) 

1041 

1042 self._cached_hosts.add(key, addrs) 

1043 for fut in futures: 

1044 set_result(fut, None) 

1045 except BaseException as e: 

1046 # any DNS exception is set for the waiters to raise the same exception. 

1047 # This coro is always run in task that is shielded from cancellation so 

1048 # we should never be propagating cancellation here. 

1049 for fut in futures: 

1050 set_exception(fut, e) 

1051 raise 

1052 finally: 

1053 self._throttle_dns_futures.pop(key) 

1054 

1055 return self._cached_hosts.next_addrs(key) 

1056 

1057 async def _create_connection( 

1058 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1059 ) -> ResponseHandler: 

1060 """Create connection. 

1061 

1062 Has same keyword arguments as BaseEventLoop.create_connection. 

1063 """ 

1064 if req.proxy: 

1065 _, proto = await self._create_proxy_connection(req, traces, timeout) 

1066 else: 

1067 _, proto = await self._create_direct_connection(req, traces, timeout) 

1068 

1069 return proto 

1070 

1071 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]: 

1072 """Logic to get the correct SSL context 

1073 

1074 0. if req.ssl is false, return None 

1075 

1076 1. if ssl_context is specified in req, use it 

1077 2. if _ssl_context is specified in self, use it 

1078 3. otherwise: 

1079 1. if verify_ssl is not specified in req, use self.ssl_context 

1080 (will generate a default context according to self.verify_ssl) 

1081 2. if verify_ssl is True in req, generate a default SSL context 

1082 3. if verify_ssl is False in req, generate a SSL context that 

1083 won't verify 

1084 """ 

1085 if not req.is_ssl(): 

1086 return None 

1087 

1088 if ssl is None: # pragma: no cover 

1089 raise RuntimeError("SSL is not supported.") 

1090 sslcontext = req.ssl 

1091 if isinstance(sslcontext, ssl.SSLContext): 

1092 return sslcontext 

1093 if sslcontext is not True: 

1094 # not verified or fingerprinted 

1095 return _SSL_CONTEXT_UNVERIFIED 

1096 sslcontext = self._ssl 

1097 if isinstance(sslcontext, ssl.SSLContext): 

1098 return sslcontext 

1099 if sslcontext is not True: 

1100 # not verified or fingerprinted 

1101 return _SSL_CONTEXT_UNVERIFIED 

1102 return _SSL_CONTEXT_VERIFIED 

1103 

1104 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]: 

1105 ret = req.ssl 

1106 if isinstance(ret, Fingerprint): 

1107 return ret 

1108 ret = self._ssl 

1109 if isinstance(ret, Fingerprint): 

1110 return ret 

1111 return None 

1112 

1113 async def _wrap_create_connection( 

1114 self, 

1115 *args: Any, 

1116 addr_infos: List[AddrInfoType], 

1117 req: ClientRequest, 

1118 timeout: "ClientTimeout", 

1119 client_error: Type[Exception] = ClientConnectorError, 

1120 **kwargs: Any, 

1121 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1122 try: 

1123 async with ceil_timeout( 

1124 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1125 ): 

1126 sock = await aiohappyeyeballs.start_connection( 

1127 addr_infos=addr_infos, 

1128 local_addr_infos=self._local_addr_infos, 

1129 happy_eyeballs_delay=self._happy_eyeballs_delay, 

1130 interleave=self._interleave, 

1131 loop=self._loop, 

1132 socket_factory=self._socket_factory, 

1133 ) 

1134 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

1135 except cert_errors as exc: 

1136 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1137 except ssl_errors as exc: 

1138 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1139 except OSError as exc: 

1140 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1141 raise 

1142 raise client_error(req.connection_key, exc) from exc 

1143 

1144 def _warn_about_tls_in_tls( 

1145 self, 

1146 underlying_transport: asyncio.Transport, 

1147 req: ClientRequest, 

1148 ) -> None: 

1149 """Issue a warning if the requested URL has HTTPS scheme.""" 

1150 if req.request_info.url.scheme != "https": 

1151 return 

1152 

1153 # Check if uvloop is being used, which supports TLS in TLS, 

1154 # otherwise assume that asyncio's native transport is being used. 

1155 if type(underlying_transport).__module__.startswith("uvloop"): 

1156 return 

1157 

1158 # Support in asyncio was added in Python 3.11 (bpo-44011) 

1159 asyncio_supports_tls_in_tls = sys.version_info >= (3, 11) or getattr( 

1160 underlying_transport, 

1161 "_start_tls_compatible", 

1162 False, 

1163 ) 

1164 

1165 if asyncio_supports_tls_in_tls: 

1166 return 

1167 

1168 warnings.warn( 

1169 "An HTTPS request is being sent through an HTTPS proxy. " 

1170 "This support for TLS in TLS is known to be disabled " 

1171 "in the stdlib asyncio. This is why you'll probably see " 

1172 "an error in the log below.\n\n" 

1173 "It is possible to enable it via monkeypatching. " 

1174 "For more details, see:\n" 

1175 "* https://bugs.python.org/issue37179\n" 

1176 "* https://github.com/python/cpython/pull/28073\n\n" 

1177 "You can temporarily patch this as follows:\n" 

1178 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1179 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1180 RuntimeWarning, 

1181 source=self, 

1182 # Why `4`? At least 3 of the calls in the stack originate 

1183 # from the methods in this class. 

1184 stacklevel=3, 

1185 ) 

1186 

1187 async def _start_tls_connection( 

1188 self, 

1189 underlying_transport: asyncio.Transport, 

1190 req: ClientRequest, 

1191 timeout: "ClientTimeout", 

1192 client_error: Type[Exception] = ClientConnectorError, 

1193 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1194 """Wrap the raw TCP transport with TLS.""" 

1195 tls_proto = self._factory() # Create a brand new proto for TLS 

1196 sslcontext = self._get_ssl_context(req) 

1197 if TYPE_CHECKING: 

1198 # _start_tls_connection is unreachable in the current code path 

1199 # if sslcontext is None. 

1200 assert sslcontext is not None 

1201 

1202 try: 

1203 async with ceil_timeout( 

1204 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1205 ): 

1206 try: 

1207 tls_transport = await self._loop.start_tls( 

1208 underlying_transport, 

1209 tls_proto, 

1210 sslcontext, 

1211 server_hostname=req.server_hostname or req.host, 

1212 ssl_handshake_timeout=timeout.total, 

1213 ) 

1214 except BaseException: 

1215 # We need to close the underlying transport since 

1216 # `start_tls()` probably failed before it had a 

1217 # chance to do this: 

1218 underlying_transport.close() 

1219 raise 

1220 if isinstance(tls_transport, asyncio.Transport): 

1221 fingerprint = self._get_fingerprint(req) 

1222 if fingerprint: 

1223 try: 

1224 fingerprint.check(tls_transport) 

1225 except ServerFingerprintMismatch: 

1226 tls_transport.close() 

1227 if not self._cleanup_closed_disabled: 

1228 self._cleanup_closed_transports.append(tls_transport) 

1229 raise 

1230 except cert_errors as exc: 

1231 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1232 except ssl_errors as exc: 

1233 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1234 except OSError as exc: 

1235 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1236 raise 

1237 raise client_error(req.connection_key, exc) from exc 

1238 except TypeError as type_err: 

1239 # Example cause looks like this: 

1240 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1241 # object at 0x7f760615e460> is not supported by start_tls() 

1242 

1243 raise ClientConnectionError( 

1244 "Cannot initialize a TLS-in-TLS connection to host " 

1245 f"{req.host!s}:{req.port:d} through an underlying connection " 

1246 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1247 f"[{type_err!s}]" 

1248 ) from type_err 

1249 else: 

1250 if tls_transport is None: 

1251 msg = "Failed to start TLS (possibly caused by closing transport)" 

1252 raise client_error(req.connection_key, OSError(msg)) 

1253 tls_proto.connection_made( 

1254 tls_transport 

1255 ) # Kick the state machine of the new TLS protocol 

1256 

1257 return tls_transport, tls_proto 

1258 

1259 def _convert_hosts_to_addr_infos( 

1260 self, hosts: List[ResolveResult] 

1261 ) -> List[AddrInfoType]: 

1262 """Converts the list of hosts to a list of addr_infos. 

1263 

1264 The list of hosts is the result of a DNS lookup. The list of 

1265 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1266 """ 

1267 addr_infos: List[AddrInfoType] = [] 

1268 for hinfo in hosts: 

1269 host = hinfo["host"] 

1270 is_ipv6 = ":" in host 

1271 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1272 if self._family and self._family != family: 

1273 continue 

1274 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1275 addr_infos.append( 

1276 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1277 ) 

1278 return addr_infos 

1279 

1280 async def _create_direct_connection( 

1281 self, 

1282 req: ClientRequest, 

1283 traces: List["Trace"], 

1284 timeout: "ClientTimeout", 

1285 *, 

1286 client_error: Type[Exception] = ClientConnectorError, 

1287 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1288 sslcontext = self._get_ssl_context(req) 

1289 fingerprint = self._get_fingerprint(req) 

1290 

1291 host = req.url.raw_host 

1292 assert host is not None 

1293 # Replace multiple trailing dots with a single one. 

1294 # A trailing dot is only present for fully-qualified domain names. 

1295 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1296 if host.endswith(".."): 

1297 host = host.rstrip(".") + "." 

1298 port = req.port 

1299 assert port is not None 

1300 try: 

1301 # Cancelling this lookup should not cancel the underlying lookup 

1302 # or else the cancel event will get broadcast to all the waiters 

1303 # across all connections. 

1304 hosts = await self._resolve_host(host, port, traces=traces) 

1305 except OSError as exc: 

1306 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1307 raise 

1308 # in case of proxy it is not ClientProxyConnectionError 

1309 # it is problem of resolving proxy ip itself 

1310 raise ClientConnectorDNSError(req.connection_key, exc) from exc 

1311 

1312 last_exc: Optional[Exception] = None 

1313 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1314 while addr_infos: 

1315 # Strip trailing dots, certificates contain FQDN without dots. 

1316 # See https://github.com/aio-libs/aiohttp/issues/3636 

1317 server_hostname = ( 

1318 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1319 ) 

1320 

1321 try: 

1322 transp, proto = await self._wrap_create_connection( 

1323 self._factory, 

1324 timeout=timeout, 

1325 ssl=sslcontext, 

1326 addr_infos=addr_infos, 

1327 server_hostname=server_hostname, 

1328 req=req, 

1329 client_error=client_error, 

1330 ) 

1331 except (ClientConnectorError, asyncio.TimeoutError) as exc: 

1332 last_exc = exc 

1333 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1334 continue 

1335 

1336 if req.is_ssl() and fingerprint: 

1337 try: 

1338 fingerprint.check(transp) 

1339 except ServerFingerprintMismatch as exc: 

1340 transp.close() 

1341 if not self._cleanup_closed_disabled: 

1342 self._cleanup_closed_transports.append(transp) 

1343 last_exc = exc 

1344 # Remove the bad peer from the list of addr_infos 

1345 sock: socket.socket = transp.get_extra_info("socket") 

1346 bad_peer = sock.getpeername() 

1347 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1348 continue 

1349 

1350 return transp, proto 

1351 assert last_exc is not None 

1352 raise last_exc 

1353 

1354 async def _create_proxy_connection( 

1355 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1356 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1357 headers: Dict[str, str] = {} 

1358 if req.proxy_headers is not None: 

1359 headers = req.proxy_headers # type: ignore[assignment] 

1360 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

1361 

1362 url = req.proxy 

1363 assert url is not None 

1364 proxy_req = ClientRequest( 

1365 hdrs.METH_GET, 

1366 url, 

1367 headers=headers, 

1368 auth=req.proxy_auth, 

1369 loop=self._loop, 

1370 ssl=req.ssl, 

1371 ) 

1372 

1373 # create connection to proxy server 

1374 transport, proto = await self._create_direct_connection( 

1375 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1376 ) 

1377 

1378 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

1379 if auth is not None: 

1380 if not req.is_ssl(): 

1381 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1382 else: 

1383 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1384 

1385 if req.is_ssl(): 

1386 self._warn_about_tls_in_tls(transport, req) 

1387 

1388 # For HTTPS requests over HTTP proxy 

1389 # we must notify proxy to tunnel connection 

1390 # so we send CONNECT command: 

1391 # CONNECT www.python.org:443 HTTP/1.1 

1392 # Host: www.python.org 

1393 # 

1394 # next we must do TLS handshake and so on 

1395 # to do this we must wrap raw socket into secure one 

1396 # asyncio handles this perfectly 

1397 proxy_req.method = hdrs.METH_CONNECT 

1398 proxy_req.url = req.url 

1399 key = req.connection_key._replace( 

1400 proxy=None, proxy_auth=None, proxy_headers_hash=None 

1401 ) 

1402 conn = Connection(self, key, proto, self._loop) 

1403 proxy_resp = await proxy_req.send(conn) 

1404 try: 

1405 protocol = conn._protocol 

1406 assert protocol is not None 

1407 

1408 # read_until_eof=True will ensure the connection isn't closed 

1409 # once the response is received and processed allowing 

1410 # START_TLS to work on the connection below. 

1411 protocol.set_response_params( 

1412 read_until_eof=True, 

1413 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1414 ) 

1415 resp = await proxy_resp.start(conn) 

1416 except BaseException: 

1417 proxy_resp.close() 

1418 conn.close() 

1419 raise 

1420 else: 

1421 conn._protocol = None 

1422 try: 

1423 if resp.status != 200: 

1424 message = resp.reason 

1425 if message is None: 

1426 message = HTTPStatus(resp.status).phrase 

1427 raise ClientHttpProxyError( 

1428 proxy_resp.request_info, 

1429 resp.history, 

1430 status=resp.status, 

1431 message=message, 

1432 headers=resp.headers, 

1433 ) 

1434 except BaseException: 

1435 # It shouldn't be closed in `finally` because it's fed to 

1436 # `loop.start_tls()` and the docs say not to touch it after 

1437 # passing there. 

1438 transport.close() 

1439 raise 

1440 

1441 return await self._start_tls_connection( 

1442 # Access the old transport for the last time before it's 

1443 # closed and forgotten forever: 

1444 transport, 

1445 req=req, 

1446 timeout=timeout, 

1447 ) 

1448 finally: 

1449 proxy_resp.close() 

1450 

1451 return transport, proto 

1452 

1453 

1454class UnixConnector(BaseConnector): 

1455 """Unix socket connector. 

1456 

1457 path - Unix socket path. 

1458 keepalive_timeout - (optional) Keep-alive timeout. 

1459 force_close - Set to True to force close and do reconnect 

1460 after each request (and between redirects). 

1461 limit - The total number of simultaneous connections. 

1462 limit_per_host - Number of simultaneous connections to one host. 

1463 loop - Optional event loop. 

1464 """ 

1465 

1466 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"unix"}) 

1467 

1468 def __init__( 

1469 self, 

1470 path: str, 

1471 force_close: bool = False, 

1472 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel, 

1473 limit: int = 100, 

1474 limit_per_host: int = 0, 

1475 ) -> None: 

1476 super().__init__( 

1477 force_close=force_close, 

1478 keepalive_timeout=keepalive_timeout, 

1479 limit=limit, 

1480 limit_per_host=limit_per_host, 

1481 ) 

1482 self._path = path 

1483 

1484 @property 

1485 def path(self) -> str: 

1486 """Path to unix socket.""" 

1487 return self._path 

1488 

1489 async def _create_connection( 

1490 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1491 ) -> ResponseHandler: 

1492 try: 

1493 async with ceil_timeout( 

1494 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1495 ): 

1496 _, proto = await self._loop.create_unix_connection( 

1497 self._factory, self._path 

1498 ) 

1499 except OSError as exc: 

1500 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1501 raise 

1502 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1503 

1504 return proto 

1505 

1506 

1507class NamedPipeConnector(BaseConnector): 

1508 """Named pipe connector. 

1509 

1510 Only supported by the proactor event loop. 

1511 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1512 

1513 path - Windows named pipe path. 

1514 keepalive_timeout - (optional) Keep-alive timeout. 

1515 force_close - Set to True to force close and do reconnect 

1516 after each request (and between redirects). 

1517 limit - The total number of simultaneous connections. 

1518 limit_per_host - Number of simultaneous connections to one host. 

1519 loop - Optional event loop. 

1520 """ 

1521 

1522 allowed_protocol_schema_set = HIGH_LEVEL_SCHEMA_SET | frozenset({"npipe"}) 

1523 

1524 def __init__( 

1525 self, 

1526 path: str, 

1527 force_close: bool = False, 

1528 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel, 

1529 limit: int = 100, 

1530 limit_per_host: int = 0, 

1531 ) -> None: 

1532 super().__init__( 

1533 force_close=force_close, 

1534 keepalive_timeout=keepalive_timeout, 

1535 limit=limit, 

1536 limit_per_host=limit_per_host, 

1537 ) 

1538 if not isinstance( 

1539 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined] 

1540 ): 

1541 raise RuntimeError( 

1542 "Named Pipes only available in proactor loop under windows" 

1543 ) 

1544 self._path = path 

1545 

1546 @property 

1547 def path(self) -> str: 

1548 """Path to the named pipe.""" 

1549 return self._path 

1550 

1551 async def _create_connection( 

1552 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1553 ) -> ResponseHandler: 

1554 try: 

1555 async with ceil_timeout( 

1556 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1557 ): 

1558 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1559 self._factory, self._path 

1560 ) 

1561 # the drain is required so that the connection_made is called 

1562 # and transport is set otherwise it is not set before the 

1563 # `assert conn.transport is not None` 

1564 # in client.py's _request method 

1565 await asyncio.sleep(0) 

1566 # other option is to manually set transport like 

1567 # `proto.transport = trans` 

1568 except OSError as exc: 

1569 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1570 raise 

1571 raise ClientConnectorError(req.connection_key, exc) from exc 

1572 

1573 return cast(ResponseHandler, proto)