Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/connector.py: 17%

701 statements  

« prev     ^ index     » next       coverage.py v7.4.0, created at 2024-01-26 06:16 +0000

1import asyncio 

2import dataclasses 

3import functools 

4import logging 

5import random 

6import socket 

7import sys 

8import traceback 

9import warnings 

10from collections import defaultdict, deque 

11from contextlib import suppress 

12from http import HTTPStatus 

13from http.cookies import SimpleCookie 

14from itertools import cycle, islice 

15from time import monotonic 

16from types import TracebackType 

17from typing import ( # noqa 

18 TYPE_CHECKING, 

19 Any, 

20 Awaitable, 

21 Callable, 

22 DefaultDict, 

23 Dict, 

24 Iterator, 

25 List, 

26 Literal, 

27 Optional, 

28 Set, 

29 Tuple, 

30 Type, 

31 Union, 

32 cast, 

33) 

34 

35import aiohappyeyeballs 

36 

37from . import hdrs, helpers 

38from .abc import AbstractResolver 

39from .client_exceptions import ( 

40 ClientConnectionError, 

41 ClientConnectorCertificateError, 

42 ClientConnectorError, 

43 ClientConnectorSSLError, 

44 ClientHttpProxyError, 

45 ClientProxyConnectionError, 

46 ServerFingerprintMismatch, 

47 UnixClientConnectorError, 

48 cert_errors, 

49 ssl_errors, 

50) 

51from .client_proto import ResponseHandler 

52from .client_reqrep import SSL_ALLOWED_TYPES, ClientRequest, Fingerprint 

53from .helpers import _SENTINEL, ceil_timeout, is_ip_address, sentinel, set_result 

54from .locks import EventResultOrError 

55from .resolver import DefaultResolver 

56 

57try: 

58 import ssl 

59 

60 SSLContext = ssl.SSLContext 

61except ImportError: # pragma: no cover 

62 ssl = None # type: ignore[assignment] 

63 SSLContext = object # type: ignore[misc,assignment] 

64 

65 

66__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector") 

67 

68 

69if TYPE_CHECKING: 

70 from .client import ClientTimeout 

71 from .client_reqrep import ConnectionKey 

72 from .tracing import Trace 

73 

74 

75class Connection: 

76 _source_traceback = None 

77 _transport = None 

78 

79 def __init__( 

80 self, 

81 connector: "BaseConnector", 

82 key: "ConnectionKey", 

83 protocol: ResponseHandler, 

84 loop: asyncio.AbstractEventLoop, 

85 ) -> None: 

86 self._key = key 

87 self._connector = connector 

88 self._loop = loop 

89 self._protocol: Optional[ResponseHandler] = protocol 

90 self._callbacks: List[Callable[[], None]] = [] 

91 

92 if loop.get_debug(): 

93 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

94 

95 def __repr__(self) -> str: 

96 return f"Connection<{self._key}>" 

97 

98 def __del__(self, _warnings: Any = warnings) -> None: 

99 if self._protocol is not None: 

100 _warnings.warn( 

101 f"Unclosed connection {self!r}", ResourceWarning, source=self 

102 ) 

103 if self._loop.is_closed(): 

104 return 

105 

106 self._connector._release(self._key, self._protocol, should_close=True) 

107 

108 context = {"client_connection": self, "message": "Unclosed connection"} 

109 if self._source_traceback is not None: 

110 context["source_traceback"] = self._source_traceback 

111 self._loop.call_exception_handler(context) 

112 

113 def __bool__(self) -> Literal[True]: 

114 """Force subclasses to not be falsy, to make checks simpler.""" 

115 return True 

116 

117 @property 

118 def transport(self) -> Optional[asyncio.Transport]: 

119 if self._protocol is None: 

120 return None 

121 return self._protocol.transport 

122 

123 @property 

124 def protocol(self) -> Optional[ResponseHandler]: 

125 return self._protocol 

126 

127 def add_callback(self, callback: Callable[[], None]) -> None: 

128 if callback is not None: 

129 self._callbacks.append(callback) 

130 

131 def _notify_release(self) -> None: 

132 callbacks, self._callbacks = self._callbacks[:], [] 

133 

134 for cb in callbacks: 

135 with suppress(Exception): 

136 cb() 

137 

138 def close(self) -> None: 

139 self._notify_release() 

140 

141 if self._protocol is not None: 

142 self._connector._release(self._key, self._protocol, should_close=True) 

143 self._protocol = None 

144 

145 def release(self) -> None: 

146 self._notify_release() 

147 

148 if self._protocol is not None: 

149 self._connector._release( 

150 self._key, self._protocol, should_close=self._protocol.should_close 

151 ) 

152 self._protocol = None 

153 

154 @property 

155 def closed(self) -> bool: 

156 return self._protocol is None or not self._protocol.is_connected() 

157 

158 

159class _TransportPlaceholder: 

160 """placeholder for BaseConnector.connect function""" 

161 

162 def __init__(self, loop: asyncio.AbstractEventLoop) -> None: 

163 fut = loop.create_future() 

164 fut.set_result(None) 

165 self.closed: asyncio.Future[Optional[Exception]] = fut 

166 

167 def close(self) -> None: 

168 pass 

169 

170 

171class BaseConnector: 

172 """Base connector class. 

173 

174 keepalive_timeout - (optional) Keep-alive timeout. 

175 force_close - Set to True to force close and do reconnect 

176 after each request (and between redirects). 

177 limit - The total number of simultaneous connections. 

178 limit_per_host - Number of simultaneous connections to one host. 

179 enable_cleanup_closed - Enables clean-up closed ssl transports. 

180 Disabled by default. 

181 timeout_ceil_threshold - Trigger ceiling of timeout values when 

182 it's above timeout_ceil_threshold. 

183 loop - Optional event loop. 

184 """ 

185 

186 _closed = True # prevent AttributeError in __del__ if ctor was failed 

187 _source_traceback = None 

188 

189 # abort transport after 2 seconds (cleanup broken connections) 

190 _cleanup_closed_period = 2.0 

191 

192 def __init__( 

193 self, 

194 *, 

195 keepalive_timeout: Union[_SENTINEL, None, float] = sentinel, 

196 force_close: bool = False, 

197 limit: int = 100, 

198 limit_per_host: int = 0, 

199 enable_cleanup_closed: bool = False, 

200 timeout_ceil_threshold: float = 5, 

201 ) -> None: 

202 if force_close: 

203 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

204 raise ValueError( 

205 "keepalive_timeout cannot " "be set if force_close is True" 

206 ) 

207 else: 

208 if keepalive_timeout is sentinel: 

209 keepalive_timeout = 15.0 

210 

211 self._timeout_ceil_threshold = timeout_ceil_threshold 

212 

213 loop = asyncio.get_running_loop() 

214 

215 self._closed = False 

216 if loop.get_debug(): 

217 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

218 

219 self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {} 

220 self._limit = limit 

221 self._limit_per_host = limit_per_host 

222 self._acquired: Set[ResponseHandler] = set() 

223 self._acquired_per_host: DefaultDict[ 

224 ConnectionKey, Set[ResponseHandler] 

225 ] = defaultdict(set) 

226 self._keepalive_timeout = cast(float, keepalive_timeout) 

227 self._force_close = force_close 

228 

229 # {host_key: FIFO list of waiters} 

230 self._waiters = defaultdict(deque) # type: ignore[var-annotated] 

231 

232 self._loop = loop 

233 self._factory = functools.partial(ResponseHandler, loop=loop) 

234 

235 self.cookies = SimpleCookie() 

236 

237 # start keep-alive connection cleanup task 

238 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

239 

240 # start cleanup closed transports task 

241 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

242 self._cleanup_closed_disabled = not enable_cleanup_closed 

243 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

244 self._cleanup_closed() 

245 

246 def __del__(self, _warnings: Any = warnings) -> None: 

247 if self._closed: 

248 return 

249 if not self._conns: 

250 return 

251 

252 conns = [repr(c) for c in self._conns.values()] 

253 

254 self._close_immediately() 

255 

256 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, source=self) 

257 context = { 

258 "connector": self, 

259 "connections": conns, 

260 "message": "Unclosed connector", 

261 } 

262 if self._source_traceback is not None: 

263 context["source_traceback"] = self._source_traceback 

264 self._loop.call_exception_handler(context) 

265 

266 async def __aenter__(self) -> "BaseConnector": 

267 return self 

268 

269 async def __aexit__( 

270 self, 

271 exc_type: Optional[Type[BaseException]] = None, 

272 exc_value: Optional[BaseException] = None, 

273 exc_traceback: Optional[TracebackType] = None, 

274 ) -> None: 

275 await self.close() 

276 

277 @property 

278 def force_close(self) -> bool: 

279 """Ultimately close connection on releasing if True.""" 

280 return self._force_close 

281 

282 @property 

283 def limit(self) -> int: 

284 """The total number for simultaneous connections. 

285 

286 If limit is 0 the connector has no limit. 

287 The default limit size is 100. 

288 """ 

289 return self._limit 

290 

291 @property 

292 def limit_per_host(self) -> int: 

293 """The limit for simultaneous connections to the same endpoint. 

294 

295 Endpoints are the same if they are have equal 

296 (host, port, is_ssl) triple. 

297 """ 

298 return self._limit_per_host 

299 

300 def _cleanup(self) -> None: 

301 """Cleanup unused transports.""" 

302 if self._cleanup_handle: 

303 self._cleanup_handle.cancel() 

304 # _cleanup_handle should be unset, otherwise _release() will not 

305 # recreate it ever! 

306 self._cleanup_handle = None 

307 

308 now = self._loop.time() 

309 timeout = self._keepalive_timeout 

310 

311 if self._conns: 

312 connections = {} 

313 deadline = now - timeout 

314 for key, conns in self._conns.items(): 

315 alive = [] 

316 for proto, use_time in conns: 

317 if proto.is_connected(): 

318 if use_time - deadline < 0: 

319 transport = proto.transport 

320 proto.close() 

321 if key.is_ssl and not self._cleanup_closed_disabled: 

322 self._cleanup_closed_transports.append(transport) 

323 else: 

324 alive.append((proto, use_time)) 

325 else: 

326 transport = proto.transport 

327 proto.close() 

328 if key.is_ssl and not self._cleanup_closed_disabled: 

329 self._cleanup_closed_transports.append(transport) 

330 

331 if alive: 

332 connections[key] = alive 

333 

334 self._conns = connections 

335 

336 if self._conns: 

337 self._cleanup_handle = helpers.weakref_handle( 

338 self, 

339 "_cleanup", 

340 timeout, 

341 self._loop, 

342 timeout_ceil_threshold=self._timeout_ceil_threshold, 

343 ) 

344 

345 def _drop_acquired_per_host( 

346 self, key: "ConnectionKey", val: ResponseHandler 

347 ) -> None: 

348 acquired_per_host = self._acquired_per_host 

349 if key not in acquired_per_host: 

350 return 

351 conns = acquired_per_host[key] 

352 conns.remove(val) 

353 if not conns: 

354 del self._acquired_per_host[key] 

355 

356 def _cleanup_closed(self) -> None: 

357 """Double confirmation for transport close. 

358 

359 Some broken ssl servers may leave socket open without proper close. 

360 """ 

361 if self._cleanup_closed_handle: 

362 self._cleanup_closed_handle.cancel() 

363 

364 for transport in self._cleanup_closed_transports: 

365 if transport is not None: 

366 transport.abort() 

367 

368 self._cleanup_closed_transports = [] 

369 

370 if not self._cleanup_closed_disabled: 

371 self._cleanup_closed_handle = helpers.weakref_handle( 

372 self, 

373 "_cleanup_closed", 

374 self._cleanup_closed_period, 

375 self._loop, 

376 timeout_ceil_threshold=self._timeout_ceil_threshold, 

377 ) 

378 

379 async def close(self) -> None: 

380 """Close all opened transports.""" 

381 waiters = self._close_immediately() 

382 if waiters: 

383 results = await asyncio.gather(*waiters, return_exceptions=True) 

384 for res in results: 

385 if isinstance(res, Exception): 

386 err_msg = "Error while closing connector: " + repr(res) 

387 logging.error(err_msg) 

388 

389 def _close_immediately(self) -> List["asyncio.Future[None]"]: 

390 waiters: List["asyncio.Future[None]"] = [] 

391 

392 if self._closed: 

393 return waiters 

394 

395 self._closed = True 

396 

397 try: 

398 if self._loop.is_closed(): 

399 return waiters 

400 

401 # cancel cleanup task 

402 if self._cleanup_handle: 

403 self._cleanup_handle.cancel() 

404 

405 # cancel cleanup close task 

406 if self._cleanup_closed_handle: 

407 self._cleanup_closed_handle.cancel() 

408 

409 for data in self._conns.values(): 

410 for proto, t0 in data: 

411 proto.close() 

412 waiters.append(proto.closed) 

413 

414 for proto in self._acquired: 

415 proto.close() 

416 waiters.append(proto.closed) 

417 

418 # TODO (A.Yushovskiy, 24-May-2019) collect transp. closing futures 

419 for transport in self._cleanup_closed_transports: 

420 if transport is not None: 

421 transport.abort() 

422 

423 return waiters 

424 

425 finally: 

426 self._conns.clear() 

427 self._acquired.clear() 

428 self._waiters.clear() 

429 self._cleanup_handle = None 

430 self._cleanup_closed_transports.clear() 

431 self._cleanup_closed_handle = None 

432 

433 @property 

434 def closed(self) -> bool: 

435 """Is connector closed. 

436 

437 A readonly property. 

438 """ 

439 return self._closed 

440 

441 def _available_connections(self, key: "ConnectionKey") -> int: 

442 """ 

443 Return number of available connections. 

444 

445 The limit, limit_per_host and the connection key are taken into account. 

446 

447 If it returns less than 1 means that there are no connections 

448 available. 

449 """ 

450 if self._limit: 

451 # total calc available connections 

452 available = self._limit - len(self._acquired) 

453 

454 # check limit per host 

455 if ( 

456 self._limit_per_host 

457 and available > 0 

458 and key in self._acquired_per_host 

459 ): 

460 acquired = self._acquired_per_host.get(key) 

461 assert acquired is not None 

462 available = self._limit_per_host - len(acquired) 

463 

464 elif self._limit_per_host and key in self._acquired_per_host: 

465 # check limit per host 

466 acquired = self._acquired_per_host.get(key) 

467 assert acquired is not None 

468 available = self._limit_per_host - len(acquired) 

469 else: 

470 available = 1 

471 

472 return available 

473 

474 async def connect( 

475 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

476 ) -> Connection: 

477 """Get from pool or create new connection.""" 

478 key = req.connection_key 

479 available = self._available_connections(key) 

480 

481 # Wait if there are no available connections or if there are/were 

482 # waiters (i.e. don't steal connection from a waiter about to wake up) 

483 if available <= 0 or key in self._waiters: 

484 fut = self._loop.create_future() 

485 

486 # This connection will now count towards the limit. 

487 self._waiters[key].append(fut) 

488 

489 if traces: 

490 for trace in traces: 

491 await trace.send_connection_queued_start() 

492 

493 try: 

494 await fut 

495 except BaseException as e: 

496 if key in self._waiters: 

497 # remove a waiter even if it was cancelled, normally it's 

498 # removed when it's notified 

499 try: 

500 self._waiters[key].remove(fut) 

501 except ValueError: # fut may no longer be in list 

502 pass 

503 

504 raise e 

505 finally: 

506 if key in self._waiters and not self._waiters[key]: 

507 del self._waiters[key] 

508 

509 if traces: 

510 for trace in traces: 

511 await trace.send_connection_queued_end() 

512 

513 proto = self._get(key) 

514 if proto is None: 

515 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) 

516 self._acquired.add(placeholder) 

517 self._acquired_per_host[key].add(placeholder) 

518 

519 if traces: 

520 for trace in traces: 

521 await trace.send_connection_create_start() 

522 

523 try: 

524 proto = await self._create_connection(req, traces, timeout) 

525 if self._closed: 

526 proto.close() 

527 raise ClientConnectionError("Connector is closed.") 

528 except BaseException: 

529 if not self._closed: 

530 self._acquired.remove(placeholder) 

531 self._drop_acquired_per_host(key, placeholder) 

532 self._release_waiter() 

533 raise 

534 else: 

535 if not self._closed: 

536 self._acquired.remove(placeholder) 

537 self._drop_acquired_per_host(key, placeholder) 

538 

539 if traces: 

540 for trace in traces: 

541 await trace.send_connection_create_end() 

542 else: 

543 if traces: 

544 # Acquire the connection to prevent race conditions with limits 

545 placeholder = cast(ResponseHandler, _TransportPlaceholder(self._loop)) 

546 self._acquired.add(placeholder) 

547 self._acquired_per_host[key].add(placeholder) 

548 for trace in traces: 

549 await trace.send_connection_reuseconn() 

550 self._acquired.remove(placeholder) 

551 self._drop_acquired_per_host(key, placeholder) 

552 

553 self._acquired.add(proto) 

554 self._acquired_per_host[key].add(proto) 

555 return Connection(self, key, proto, self._loop) 

556 

557 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]: 

558 try: 

559 conns = self._conns[key] 

560 except KeyError: 

561 return None 

562 

563 t1 = self._loop.time() 

564 while conns: 

565 proto, t0 = conns.pop() 

566 if proto.is_connected(): 

567 if t1 - t0 > self._keepalive_timeout: 

568 transport = proto.transport 

569 proto.close() 

570 # only for SSL transports 

571 if key.is_ssl and not self._cleanup_closed_disabled: 

572 self._cleanup_closed_transports.append(transport) 

573 else: 

574 if not conns: 

575 # The very last connection was reclaimed: drop the key 

576 del self._conns[key] 

577 return proto 

578 else: 

579 transport = proto.transport 

580 proto.close() 

581 if key.is_ssl and not self._cleanup_closed_disabled: 

582 self._cleanup_closed_transports.append(transport) 

583 

584 # No more connections: drop the key 

585 del self._conns[key] 

586 return None 

587 

588 def _release_waiter(self) -> None: 

589 """ 

590 Iterates over all waiters until one to be released is found. 

591 

592 The one to be released is not finished and 

593 belongs to a host that has available connections. 

594 """ 

595 if not self._waiters: 

596 return 

597 

598 # Having the dict keys ordered this avoids to iterate 

599 # at the same order at each call. 

600 queues = list(self._waiters.keys()) 

601 random.shuffle(queues) 

602 

603 for key in queues: 

604 if self._available_connections(key) < 1: 

605 continue 

606 

607 waiters = self._waiters[key] 

608 while waiters: 

609 waiter = waiters.popleft() 

610 if not waiter.done(): 

611 waiter.set_result(None) 

612 return 

613 

614 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

615 if self._closed: 

616 # acquired connection is already released on connector closing 

617 return 

618 

619 try: 

620 self._acquired.remove(proto) 

621 self._drop_acquired_per_host(key, proto) 

622 except KeyError: # pragma: no cover 

623 # this may be result of undetermenistic order of objects 

624 # finalization due garbage collection. 

625 pass 

626 else: 

627 self._release_waiter() 

628 

629 def _release( 

630 self, 

631 key: "ConnectionKey", 

632 protocol: ResponseHandler, 

633 *, 

634 should_close: bool = False, 

635 ) -> None: 

636 if self._closed: 

637 # acquired connection is already released on connector closing 

638 return 

639 

640 self._release_acquired(key, protocol) 

641 

642 if self._force_close: 

643 should_close = True 

644 

645 if should_close or protocol.should_close: 

646 transport = protocol.transport 

647 protocol.close() 

648 # TODO: Remove once fixed: https://bugs.python.org/issue39951 

649 # See PR #6321 

650 set_result(protocol.closed, None) 

651 

652 if key.is_ssl and not self._cleanup_closed_disabled: 

653 self._cleanup_closed_transports.append(transport) 

654 else: 

655 conns = self._conns.get(key) 

656 if conns is None: 

657 conns = self._conns[key] = [] 

658 conns.append((protocol, self._loop.time())) 

659 

660 if self._cleanup_handle is None: 

661 self._cleanup_handle = helpers.weakref_handle( 

662 self, 

663 "_cleanup", 

664 self._keepalive_timeout, 

665 self._loop, 

666 timeout_ceil_threshold=self._timeout_ceil_threshold, 

667 ) 

668 

669 async def _create_connection( 

670 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

671 ) -> ResponseHandler: 

672 raise NotImplementedError() 

673 

674 

675class _DNSCacheTable: 

676 def __init__(self, ttl: Optional[float] = None) -> None: 

677 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] = {} 

678 self._timestamps: Dict[Tuple[str, int], float] = {} 

679 self._ttl = ttl 

680 

681 def __contains__(self, host: object) -> bool: 

682 return host in self._addrs_rr 

683 

684 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None: 

685 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

686 

687 if self._ttl is not None: 

688 self._timestamps[key] = monotonic() 

689 

690 def remove(self, key: Tuple[str, int]) -> None: 

691 self._addrs_rr.pop(key, None) 

692 

693 if self._ttl is not None: 

694 self._timestamps.pop(key, None) 

695 

696 def clear(self) -> None: 

697 self._addrs_rr.clear() 

698 self._timestamps.clear() 

699 

700 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]: 

701 loop, length = self._addrs_rr[key] 

702 addrs = list(islice(loop, length)) 

703 # Consume one more element to shift internal state of `cycle` 

704 next(loop) 

705 return addrs 

706 

707 def expired(self, key: Tuple[str, int]) -> bool: 

708 if self._ttl is None: 

709 return False 

710 

711 return self._timestamps[key] + self._ttl < monotonic() 

712 

713 

714class TCPConnector(BaseConnector): 

715 """TCP connector. 

716 

717 verify_ssl - Set to True to check ssl certifications. 

718 fingerprint - Pass the binary sha256 

719 digest of the expected certificate in DER format to verify 

720 that the certificate the server presents matches. See also 

721 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning 

722 resolver - Enable DNS lookups and use this 

723 resolver 

724 use_dns_cache - Use memory cache for DNS lookups. 

725 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

726 family - socket address family 

727 local_addr - local tuple of (host, port) to bind socket to 

728 

729 keepalive_timeout - (optional) Keep-alive timeout. 

730 force_close - Set to True to force close and do reconnect 

731 after each request (and between redirects). 

732 limit - The total number of simultaneous connections. 

733 limit_per_host - Number of simultaneous connections to one host. 

734 enable_cleanup_closed - Enables clean-up closed ssl transports. 

735 Disabled by default. 

736 happy_eyeballs_delay - This is the “Connection Attempt Delay” 

737 as defined in RFC 8305. To disable 

738 the happy eyeballs algorithm, set to None. 

739 interleave - “First Address Family Count” as defined in RFC 8305 

740 loop - Optional event loop. 

741 """ 

742 

743 def __init__( 

744 self, 

745 *, 

746 use_dns_cache: bool = True, 

747 ttl_dns_cache: Optional[int] = 10, 

748 family: int = 0, 

749 ssl: Union[bool, Fingerprint, SSLContext] = True, 

750 local_addr: Optional[Tuple[str, int]] = None, 

751 resolver: Optional[AbstractResolver] = None, 

752 keepalive_timeout: Union[None, float, _SENTINEL] = sentinel, 

753 force_close: bool = False, 

754 limit: int = 100, 

755 limit_per_host: int = 0, 

756 enable_cleanup_closed: bool = False, 

757 timeout_ceil_threshold: float = 5, 

758 happy_eyeballs_delay: Optional[float] = 0.25, 

759 interleave: Optional[int] = None, 

760 ) -> None: 

761 super().__init__( 

762 keepalive_timeout=keepalive_timeout, 

763 force_close=force_close, 

764 limit=limit, 

765 limit_per_host=limit_per_host, 

766 enable_cleanup_closed=enable_cleanup_closed, 

767 timeout_ceil_threshold=timeout_ceil_threshold, 

768 ) 

769 

770 if not isinstance(ssl, SSL_ALLOWED_TYPES): 

771 raise TypeError( 

772 "ssl should be SSLContext, Fingerprint, or bool, " 

773 "got {!r} instead.".format(ssl) 

774 ) 

775 self._ssl = ssl 

776 if resolver is None: 

777 resolver = DefaultResolver() 

778 self._resolver: AbstractResolver = resolver 

779 

780 self._use_dns_cache = use_dns_cache 

781 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 

782 self._throttle_dns_events: Dict[Tuple[str, int], EventResultOrError] = {} 

783 self._family = family 

784 self._local_addr_infos = aiohappyeyeballs.addr_to_addr_infos(local_addr) 

785 self._happy_eyeballs_delay = happy_eyeballs_delay 

786 self._interleave = interleave 

787 

788 def _close_immediately(self) -> List["asyncio.Future[None]"]: 

789 for ev in self._throttle_dns_events.values(): 

790 ev.cancel() 

791 return super()._close_immediately() 

792 

793 @property 

794 def family(self) -> int: 

795 """Socket family like AF_INET.""" 

796 return self._family 

797 

798 @property 

799 def use_dns_cache(self) -> bool: 

800 """True if local DNS caching is enabled.""" 

801 return self._use_dns_cache 

802 

803 def clear_dns_cache( 

804 self, host: Optional[str] = None, port: Optional[int] = None 

805 ) -> None: 

806 """Remove specified host/port or clear all dns local cache.""" 

807 if host is not None and port is not None: 

808 self._cached_hosts.remove((host, port)) 

809 elif host is not None or port is not None: 

810 raise ValueError("either both host and port " "or none of them are allowed") 

811 else: 

812 self._cached_hosts.clear() 

813 

814 async def _resolve_host( 

815 self, host: str, port: int, traces: Optional[List["Trace"]] = None 

816 ) -> List[Dict[str, Any]]: 

817 if is_ip_address(host): 

818 return [ 

819 { 

820 "hostname": host, 

821 "host": host, 

822 "port": port, 

823 "family": self._family, 

824 "proto": 0, 

825 "flags": 0, 

826 } 

827 ] 

828 

829 if not self._use_dns_cache: 

830 if traces: 

831 for trace in traces: 

832 await trace.send_dns_resolvehost_start(host) 

833 

834 res = await self._resolver.resolve(host, port, family=self._family) 

835 

836 if traces: 

837 for trace in traces: 

838 await trace.send_dns_resolvehost_end(host) 

839 

840 return res 

841 

842 key = (host, port) 

843 

844 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)): 

845 # get result early, before any await (#4014) 

846 result = self._cached_hosts.next_addrs(key) 

847 

848 if traces: 

849 for trace in traces: 

850 await trace.send_dns_cache_hit(host) 

851 return result 

852 

853 if key in self._throttle_dns_events: 

854 # get event early, before any await (#4014) 

855 event = self._throttle_dns_events[key] 

856 if traces: 

857 for trace in traces: 

858 await trace.send_dns_cache_hit(host) 

859 await event.wait() 

860 else: 

861 # update dict early, before any await (#4014) 

862 self._throttle_dns_events[key] = EventResultOrError(self._loop) 

863 if traces: 

864 for trace in traces: 

865 await trace.send_dns_cache_miss(host) 

866 try: 

867 if traces: 

868 for trace in traces: 

869 await trace.send_dns_resolvehost_start(host) 

870 

871 addrs = await self._resolver.resolve(host, port, family=self._family) 

872 if traces: 

873 for trace in traces: 

874 await trace.send_dns_resolvehost_end(host) 

875 

876 self._cached_hosts.add(key, addrs) 

877 self._throttle_dns_events[key].set() 

878 except BaseException as e: 

879 # any DNS exception, independently of the implementation 

880 # is set for the waiters to raise the same exception. 

881 self._throttle_dns_events[key].set(exc=e) 

882 raise 

883 finally: 

884 self._throttle_dns_events.pop(key) 

885 

886 return self._cached_hosts.next_addrs(key) 

887 

888 async def _create_connection( 

889 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

890 ) -> ResponseHandler: 

891 """Create connection. 

892 

893 Has same keyword arguments as BaseEventLoop.create_connection. 

894 """ 

895 if req.proxy: 

896 _, proto = await self._create_proxy_connection(req, traces, timeout) 

897 else: 

898 _, proto = await self._create_direct_connection(req, traces, timeout) 

899 

900 return proto 

901 

902 @staticmethod 

903 @functools.lru_cache(None) 

904 def _make_ssl_context(verified: bool) -> SSLContext: 

905 if verified: 

906 return ssl.create_default_context() 

907 else: 

908 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

909 sslcontext.options |= ssl.OP_NO_SSLv2 

910 sslcontext.options |= ssl.OP_NO_SSLv3 

911 sslcontext.check_hostname = False 

912 sslcontext.verify_mode = ssl.CERT_NONE 

913 try: 

914 sslcontext.options |= ssl.OP_NO_COMPRESSION 

915 except AttributeError as attr_err: 

916 warnings.warn( 

917 "{!s}: The Python interpreter is compiled " 

918 "against OpenSSL < 1.0.0. Ref: " 

919 "https://docs.python.org/3/library/ssl.html" 

920 "#ssl.OP_NO_COMPRESSION".format(attr_err), 

921 ) 

922 sslcontext.set_default_verify_paths() 

923 return sslcontext 

924 

925 def _get_ssl_context(self, req: ClientRequest) -> Optional[SSLContext]: 

926 """Logic to get the correct SSL context 

927 

928 0. if req.ssl is false, return None 

929 

930 1. if ssl_context is specified in req, use it 

931 2. if _ssl_context is specified in self, use it 

932 3. otherwise: 

933 1. if verify_ssl is not specified in req, use self.ssl_context 

934 (will generate a default context according to self.verify_ssl) 

935 2. if verify_ssl is True in req, generate a default SSL context 

936 3. if verify_ssl is False in req, generate a SSL context that 

937 won't verify 

938 """ 

939 if req.is_ssl(): 

940 if ssl is None: # pragma: no cover 

941 raise RuntimeError("SSL is not supported.") 

942 sslcontext = req.ssl 

943 if isinstance(sslcontext, ssl.SSLContext): 

944 return sslcontext 

945 if sslcontext is not True: 

946 # not verified or fingerprinted 

947 return self._make_ssl_context(False) 

948 sslcontext = self._ssl 

949 if isinstance(sslcontext, ssl.SSLContext): 

950 return sslcontext 

951 if sslcontext is not True: 

952 # not verified or fingerprinted 

953 return self._make_ssl_context(False) 

954 return self._make_ssl_context(True) 

955 else: 

956 return None 

957 

958 def _get_fingerprint(self, req: ClientRequest) -> Optional["Fingerprint"]: 

959 ret = req.ssl 

960 if isinstance(ret, Fingerprint): 

961 return ret 

962 ret = self._ssl 

963 if isinstance(ret, Fingerprint): 

964 return ret 

965 return None 

966 

967 async def _wrap_create_connection( 

968 self, 

969 *args: Any, 

970 addr_infos: List[aiohappyeyeballs.AddrInfoType], 

971 req: ClientRequest, 

972 timeout: "ClientTimeout", 

973 client_error: Type[Exception] = ClientConnectorError, 

974 **kwargs: Any, 

975 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

976 try: 

977 async with ceil_timeout( 

978 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

979 ): 

980 sock = await aiohappyeyeballs.start_connection( 

981 addr_infos=addr_infos, 

982 local_addr_infos=self._local_addr_infos, 

983 happy_eyeballs_delay=self._happy_eyeballs_delay, 

984 interleave=self._interleave, 

985 loop=self._loop, 

986 ) 

987 return await self._loop.create_connection(*args, **kwargs, sock=sock) 

988 except cert_errors as exc: 

989 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

990 except ssl_errors as exc: 

991 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

992 except OSError as exc: 

993 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

994 raise 

995 raise client_error(req.connection_key, exc) from exc 

996 

997 def _warn_about_tls_in_tls( 

998 self, 

999 underlying_transport: asyncio.Transport, 

1000 req: ClientRequest, 

1001 ) -> None: 

1002 """Issue a warning if the requested URL has HTTPS scheme.""" 

1003 if req.request_info.url.scheme != "https": 

1004 return 

1005 

1006 asyncio_supports_tls_in_tls = getattr( 

1007 underlying_transport, 

1008 "_start_tls_compatible", 

1009 False, 

1010 ) 

1011 

1012 if asyncio_supports_tls_in_tls: 

1013 return 

1014 

1015 warnings.warn( 

1016 "An HTTPS request is being sent through an HTTPS proxy. " 

1017 "This support for TLS in TLS is known to be disabled " 

1018 "in the stdlib asyncio. This is why you'll probably see " 

1019 "an error in the log below.\n\n" 

1020 "It is possible to enable it via monkeypatching. " 

1021 "For more details, see:\n" 

1022 "* https://bugs.python.org/issue37179\n" 

1023 "* https://github.com/python/cpython/pull/28073\n\n" 

1024 "You can temporarily patch this as follows:\n" 

1025 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1026 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1027 RuntimeWarning, 

1028 source=self, 

1029 # Why `4`? At least 3 of the calls in the stack originate 

1030 # from the methods in this class. 

1031 stacklevel=3, 

1032 ) 

1033 

1034 async def _start_tls_connection( 

1035 self, 

1036 underlying_transport: asyncio.Transport, 

1037 req: ClientRequest, 

1038 timeout: "ClientTimeout", 

1039 client_error: Type[Exception] = ClientConnectorError, 

1040 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1041 """Wrap the raw TCP transport with TLS.""" 

1042 tls_proto = self._factory() # Create a brand new proto for TLS 

1043 

1044 # Safety of the `cast()` call here is based on the fact that 

1045 # internally `_get_ssl_context()` only returns `None` when 

1046 # `req.is_ssl()` evaluates to `False` which is never gonna happen 

1047 # in this code path. Of course, it's rather fragile 

1048 # maintainability-wise but this is to be solved separately. 

1049 sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req)) 

1050 

1051 try: 

1052 async with ceil_timeout( 

1053 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1054 ): 

1055 try: 

1056 tls_transport = await self._loop.start_tls( 

1057 underlying_transport, 

1058 tls_proto, 

1059 sslcontext, 

1060 server_hostname=req.server_hostname or req.host, 

1061 ssl_handshake_timeout=timeout.total, 

1062 ) 

1063 except BaseException: 

1064 # We need to close the underlying transport since 

1065 # `start_tls()` probably failed before it had a 

1066 # chance to do this: 

1067 underlying_transport.close() 

1068 raise 

1069 except cert_errors as exc: 

1070 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1071 except ssl_errors as exc: 

1072 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1073 except OSError as exc: 

1074 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1075 raise 

1076 raise client_error(req.connection_key, exc) from exc 

1077 except TypeError as type_err: 

1078 # Example cause looks like this: 

1079 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1080 # object at 0x7f760615e460> is not supported by start_tls() 

1081 

1082 raise ClientConnectionError( 

1083 "Cannot initialize a TLS-in-TLS connection to host " 

1084 f"{req.host!s}:{req.port:d} through an underlying connection " 

1085 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1086 f"[{type_err!s}]" 

1087 ) from type_err 

1088 else: 

1089 if tls_transport is None: 

1090 msg = "Failed to start TLS (possibly caused by closing transport)" 

1091 raise client_error(req.connection_key, OSError(msg)) 

1092 tls_proto.connection_made( 

1093 tls_transport 

1094 ) # Kick the state machine of the new TLS protocol 

1095 

1096 return tls_transport, tls_proto 

1097 

1098 def _convert_hosts_to_addr_infos( 

1099 self, hosts: List[Dict[str, Any]] 

1100 ) -> List[aiohappyeyeballs.AddrInfoType]: 

1101 """Converts the list of hosts to a list of addr_infos. 

1102 

1103 The list of hosts is the result of a DNS lookup. The list of 

1104 addr_infos is the result of a call to `socket.getaddrinfo()`. 

1105 """ 

1106 addr_infos: List[aiohappyeyeballs.AddrInfoType] = [] 

1107 for hinfo in hosts: 

1108 host = hinfo["host"] 

1109 is_ipv6 = ":" in host 

1110 family = socket.AF_INET6 if is_ipv6 else socket.AF_INET 

1111 if self._family and self._family != family: 

1112 continue 

1113 addr = (host, hinfo["port"], 0, 0) if is_ipv6 else (host, hinfo["port"]) 

1114 addr_infos.append( 

1115 (family, socket.SOCK_STREAM, socket.IPPROTO_TCP, "", addr) 

1116 ) 

1117 return addr_infos 

1118 

1119 async def _create_direct_connection( 

1120 self, 

1121 req: ClientRequest, 

1122 traces: List["Trace"], 

1123 timeout: "ClientTimeout", 

1124 *, 

1125 client_error: Type[Exception] = ClientConnectorError, 

1126 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1127 sslcontext = self._get_ssl_context(req) 

1128 fingerprint = self._get_fingerprint(req) 

1129 

1130 host = req.url.raw_host 

1131 assert host is not None 

1132 # Replace multiple trailing dots with a single one. 

1133 # A trailing dot is only present for fully-qualified domain names. 

1134 # See https://github.com/aio-libs/aiohttp/pull/7364. 

1135 if host.endswith(".."): 

1136 host = host.rstrip(".") + "." 

1137 port = req.port 

1138 assert port is not None 

1139 host_resolved = asyncio.ensure_future( 

1140 self._resolve_host(host, port, traces=traces), loop=self._loop 

1141 ) 

1142 try: 

1143 # Cancelling this lookup should not cancel the underlying lookup 

1144 # or else the cancel event will get broadcast to all the waiters 

1145 # across all connections. 

1146 hosts = await asyncio.shield(host_resolved) 

1147 except asyncio.CancelledError: 

1148 

1149 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None: 

1150 with suppress(Exception, asyncio.CancelledError): 

1151 fut.result() 

1152 

1153 host_resolved.add_done_callback(drop_exception) 

1154 raise 

1155 except OSError as exc: 

1156 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1157 raise 

1158 # in case of proxy it is not ClientProxyConnectionError 

1159 # it is problem of resolving proxy ip itself 

1160 raise ClientConnectorError(req.connection_key, exc) from exc 

1161 

1162 last_exc: Optional[Exception] = None 

1163 addr_infos = self._convert_hosts_to_addr_infos(hosts) 

1164 while addr_infos: 

1165 # Strip trailing dots, certificates contain FQDN without dots. 

1166 # See https://github.com/aio-libs/aiohttp/issues/3636 

1167 server_hostname = ( 

1168 (req.server_hostname or host).rstrip(".") if sslcontext else None 

1169 ) 

1170 

1171 try: 

1172 transp, proto = await self._wrap_create_connection( 

1173 self._factory, 

1174 timeout=timeout, 

1175 ssl=sslcontext, 

1176 addr_infos=addr_infos, 

1177 server_hostname=server_hostname, 

1178 req=req, 

1179 client_error=client_error, 

1180 ) 

1181 except ClientConnectorError as exc: 

1182 last_exc = exc 

1183 aiohappyeyeballs.pop_addr_infos_interleave(addr_infos, self._interleave) 

1184 continue 

1185 

1186 if req.is_ssl() and fingerprint: 

1187 try: 

1188 fingerprint.check(transp) 

1189 except ServerFingerprintMismatch as exc: 

1190 transp.close() 

1191 if not self._cleanup_closed_disabled: 

1192 self._cleanup_closed_transports.append(transp) 

1193 last_exc = exc 

1194 # Remove the bad peer from the list of addr_infos 

1195 sock: socket.socket = transp.get_extra_info("socket") 

1196 bad_peer = sock.getpeername() 

1197 aiohappyeyeballs.remove_addr_infos(addr_infos, bad_peer) 

1198 continue 

1199 

1200 return transp, proto 

1201 assert last_exc is not None 

1202 raise last_exc 

1203 

1204 async def _create_proxy_connection( 

1205 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1206 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1207 headers: Dict[str, str] = {} 

1208 if req.proxy_headers is not None: 

1209 headers = req.proxy_headers # type: ignore[assignment] 

1210 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

1211 

1212 url = req.proxy 

1213 assert url is not None 

1214 proxy_req = ClientRequest( 

1215 hdrs.METH_GET, 

1216 url, 

1217 headers=headers, 

1218 auth=req.proxy_auth, 

1219 loop=self._loop, 

1220 ssl=req.ssl, 

1221 ) 

1222 

1223 # create connection to proxy server 

1224 transport, proto = await self._create_direct_connection( 

1225 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1226 ) 

1227 

1228 # Many HTTP proxies has buggy keepalive support. Let's not 

1229 # reuse connection but close it after processing every 

1230 # response. 

1231 proto.force_close() 

1232 

1233 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

1234 if auth is not None: 

1235 if not req.is_ssl(): 

1236 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1237 else: 

1238 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1239 

1240 if req.is_ssl(): 

1241 self._warn_about_tls_in_tls(transport, req) 

1242 

1243 # For HTTPS requests over HTTP proxy 

1244 # we must notify proxy to tunnel connection 

1245 # so we send CONNECT command: 

1246 # CONNECT www.python.org:443 HTTP/1.1 

1247 # Host: www.python.org 

1248 # 

1249 # next we must do TLS handshake and so on 

1250 # to do this we must wrap raw socket into secure one 

1251 # asyncio handles this perfectly 

1252 proxy_req.method = hdrs.METH_CONNECT 

1253 proxy_req.url = req.url 

1254 key = dataclasses.replace( 

1255 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None 

1256 ) 

1257 conn = Connection(self, key, proto, self._loop) 

1258 proxy_resp = await proxy_req.send(conn) 

1259 try: 

1260 protocol = conn._protocol 

1261 assert protocol is not None 

1262 

1263 # read_until_eof=True will ensure the connection isn't closed 

1264 # once the response is received and processed allowing 

1265 # START_TLS to work on the connection below. 

1266 protocol.set_response_params( 

1267 read_until_eof=True, 

1268 timeout_ceil_threshold=self._timeout_ceil_threshold, 

1269 ) 

1270 resp = await proxy_resp.start(conn) 

1271 except BaseException: 

1272 proxy_resp.close() 

1273 conn.close() 

1274 raise 

1275 else: 

1276 conn._protocol = None 

1277 conn._transport = None 

1278 try: 

1279 if resp.status != 200: 

1280 message = resp.reason 

1281 if message is None: 

1282 message = HTTPStatus(resp.status).phrase 

1283 raise ClientHttpProxyError( 

1284 proxy_resp.request_info, 

1285 resp.history, 

1286 status=resp.status, 

1287 message=message, 

1288 headers=resp.headers, 

1289 ) 

1290 except BaseException: 

1291 # It shouldn't be closed in `finally` because it's fed to 

1292 # `loop.start_tls()` and the docs say not to touch it after 

1293 # passing there. 

1294 transport.close() 

1295 raise 

1296 

1297 return await self._start_tls_connection( 

1298 # Access the old transport for the last time before it's 

1299 # closed and forgotten forever: 

1300 transport, 

1301 req=req, 

1302 timeout=timeout, 

1303 ) 

1304 finally: 

1305 proxy_resp.close() 

1306 

1307 return transport, proto 

1308 

1309 

1310class UnixConnector(BaseConnector): 

1311 """Unix socket connector. 

1312 

1313 path - Unix socket path. 

1314 keepalive_timeout - (optional) Keep-alive timeout. 

1315 force_close - Set to True to force close and do reconnect 

1316 after each request (and between redirects). 

1317 limit - The total number of simultaneous connections. 

1318 limit_per_host - Number of simultaneous connections to one host. 

1319 loop - Optional event loop. 

1320 """ 

1321 

1322 def __init__( 

1323 self, 

1324 path: str, 

1325 force_close: bool = False, 

1326 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel, 

1327 limit: int = 100, 

1328 limit_per_host: int = 0, 

1329 ) -> None: 

1330 super().__init__( 

1331 force_close=force_close, 

1332 keepalive_timeout=keepalive_timeout, 

1333 limit=limit, 

1334 limit_per_host=limit_per_host, 

1335 ) 

1336 self._path = path 

1337 

1338 @property 

1339 def path(self) -> str: 

1340 """Path to unix socket.""" 

1341 return self._path 

1342 

1343 async def _create_connection( 

1344 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1345 ) -> ResponseHandler: 

1346 try: 

1347 async with ceil_timeout( 

1348 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1349 ): 

1350 _, proto = await self._loop.create_unix_connection( 

1351 self._factory, self._path 

1352 ) 

1353 except OSError as exc: 

1354 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1355 raise 

1356 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1357 

1358 return proto 

1359 

1360 

1361class NamedPipeConnector(BaseConnector): 

1362 """Named pipe connector. 

1363 

1364 Only supported by the proactor event loop. 

1365 See also: https://docs.python.org/3/library/asyncio-eventloop.html 

1366 

1367 path - Windows named pipe path. 

1368 keepalive_timeout - (optional) Keep-alive timeout. 

1369 force_close - Set to True to force close and do reconnect 

1370 after each request (and between redirects). 

1371 limit - The total number of simultaneous connections. 

1372 limit_per_host - Number of simultaneous connections to one host. 

1373 loop - Optional event loop. 

1374 """ 

1375 

1376 def __init__( 

1377 self, 

1378 path: str, 

1379 force_close: bool = False, 

1380 keepalive_timeout: Union[_SENTINEL, float, None] = sentinel, 

1381 limit: int = 100, 

1382 limit_per_host: int = 0, 

1383 ) -> None: 

1384 super().__init__( 

1385 force_close=force_close, 

1386 keepalive_timeout=keepalive_timeout, 

1387 limit=limit, 

1388 limit_per_host=limit_per_host, 

1389 ) 

1390 if not isinstance( 

1391 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined] 

1392 ): 

1393 raise RuntimeError( 

1394 "Named Pipes only available in proactor " "loop under windows" 

1395 ) 

1396 self._path = path 

1397 

1398 @property 

1399 def path(self) -> str: 

1400 """Path to the named pipe.""" 

1401 return self._path 

1402 

1403 async def _create_connection( 

1404 self, req: ClientRequest, traces: List["Trace"], timeout: "ClientTimeout" 

1405 ) -> ResponseHandler: 

1406 try: 

1407 async with ceil_timeout( 

1408 timeout.sock_connect, ceil_threshold=timeout.ceil_threshold 

1409 ): 

1410 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] 

1411 self._factory, self._path 

1412 ) 

1413 # the drain is required so that the connection_made is called 

1414 # and transport is set otherwise it is not set before the 

1415 # `assert conn.transport is not None` 

1416 # in client.py's _request method 

1417 await asyncio.sleep(0) 

1418 # other option is to manually set transport like 

1419 # `proto.transport = trans` 

1420 except OSError as exc: 

1421 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1422 raise 

1423 raise ClientConnectorError(req.connection_key, exc) from exc 

1424 

1425 return cast(ResponseHandler, proto)