Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/connector.py: 33%

710 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:56 +0000

1import asyncio 

2import functools 

3import random 

4import sys 

5import traceback 

6import warnings 

7from collections import defaultdict, deque 

8from contextlib import suppress 

9from http.cookies import SimpleCookie 

10from itertools import cycle, islice 

11from time import monotonic 

12from types import TracebackType 

13from typing import ( 

14 TYPE_CHECKING, 

15 Any, 

16 Awaitable, 

17 Callable, 

18 DefaultDict, 

19 Dict, 

20 Iterator, 

21 List, 

22 Optional, 

23 Set, 

24 Tuple, 

25 Type, 

26 Union, 

27 cast, 

28) 

29 

30import attr 

31 

32from . import hdrs, helpers 

33from .abc import AbstractResolver 

34from .client_exceptions import ( 

35 ClientConnectionError, 

36 ClientConnectorCertificateError, 

37 ClientConnectorError, 

38 ClientConnectorSSLError, 

39 ClientHttpProxyError, 

40 ClientProxyConnectionError, 

41 ServerFingerprintMismatch, 

42 UnixClientConnectorError, 

43 cert_errors, 

44 ssl_errors, 

45) 

46from .client_proto import ResponseHandler 

47from .client_reqrep import ClientRequest, Fingerprint, _merge_ssl_params 

48from .helpers import ( 

49 PY_36, 

50 ceil_timeout, 

51 get_running_loop, 

52 is_ip_address, 

53 noop, 

54 sentinel, 

55) 

56from .http import RESPONSES 

57from .locks import EventResultOrError 

58from .resolver import DefaultResolver 

59 

60try: 

61 import ssl 

62 

63 SSLContext = ssl.SSLContext 

64except ImportError: # pragma: no cover 

65 ssl = None # type: ignore[assignment] 

66 SSLContext = object # type: ignore[misc,assignment] 

67 

68 

69__all__ = ("BaseConnector", "TCPConnector", "UnixConnector", "NamedPipeConnector") 

70 

71 

72if TYPE_CHECKING: # pragma: no cover 

73 from .client import ClientTimeout 

74 from .client_reqrep import ConnectionKey 

75 from .tracing import Trace 

76 

77 

78class _DeprecationWaiter: 

79 __slots__ = ("_awaitable", "_awaited") 

80 

81 def __init__(self, awaitable: Awaitable[Any]) -> None: 

82 self._awaitable = awaitable 

83 self._awaited = False 

84 

85 def __await__(self) -> Any: 

86 self._awaited = True 

87 return self._awaitable.__await__() 

88 

89 def __del__(self) -> None: 

90 if not self._awaited: 

91 warnings.warn( 

92 "Connector.close() is a coroutine, " 

93 "please use await connector.close()", 

94 DeprecationWarning, 

95 ) 

96 

97 

98class Connection: 

99 

100 _source_traceback = None 

101 _transport = None 

102 

103 def __init__( 

104 self, 

105 connector: "BaseConnector", 

106 key: "ConnectionKey", 

107 protocol: ResponseHandler, 

108 loop: asyncio.AbstractEventLoop, 

109 ) -> None: 

110 self._key = key 

111 self._connector = connector 

112 self._loop = loop 

113 self._protocol: Optional[ResponseHandler] = protocol 

114 self._callbacks: List[Callable[[], None]] = [] 

115 

116 if loop.get_debug(): 

117 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

118 

119 def __repr__(self) -> str: 

120 return f"Connection<{self._key}>" 

121 

122 def __del__(self, _warnings: Any = warnings) -> None: 

123 if self._protocol is not None: 

124 if PY_36: 

125 kwargs = {"source": self} 

126 else: 

127 kwargs = {} 

128 _warnings.warn(f"Unclosed connection {self!r}", ResourceWarning, **kwargs) 

129 if self._loop.is_closed(): 

130 return 

131 

132 self._connector._release(self._key, self._protocol, should_close=True) 

133 

134 context = {"client_connection": self, "message": "Unclosed connection"} 

135 if self._source_traceback is not None: 

136 context["source_traceback"] = self._source_traceback 

137 self._loop.call_exception_handler(context) 

138 

139 @property 

140 def loop(self) -> asyncio.AbstractEventLoop: 

141 warnings.warn( 

142 "connector.loop property is deprecated", DeprecationWarning, stacklevel=2 

143 ) 

144 return self._loop 

145 

146 @property 

147 def transport(self) -> Optional[asyncio.Transport]: 

148 if self._protocol is None: 

149 return None 

150 return self._protocol.transport 

151 

152 @property 

153 def protocol(self) -> Optional[ResponseHandler]: 

154 return self._protocol 

155 

156 def add_callback(self, callback: Callable[[], None]) -> None: 

157 if callback is not None: 

158 self._callbacks.append(callback) 

159 

160 def _notify_release(self) -> None: 

161 callbacks, self._callbacks = self._callbacks[:], [] 

162 

163 for cb in callbacks: 

164 with suppress(Exception): 

165 cb() 

166 

167 def close(self) -> None: 

168 self._notify_release() 

169 

170 if self._protocol is not None: 

171 self._connector._release(self._key, self._protocol, should_close=True) 

172 self._protocol = None 

173 

174 def release(self) -> None: 

175 self._notify_release() 

176 

177 if self._protocol is not None: 

178 self._connector._release( 

179 self._key, self._protocol, should_close=self._protocol.should_close 

180 ) 

181 self._protocol = None 

182 

183 @property 

184 def closed(self) -> bool: 

185 return self._protocol is None or not self._protocol.is_connected() 

186 

187 

188class _TransportPlaceholder: 

189 """placeholder for BaseConnector.connect function""" 

190 

191 def close(self) -> None: 

192 pass 

193 

194 

195class BaseConnector: 

196 """Base connector class. 

197 

198 keepalive_timeout - (optional) Keep-alive timeout. 

199 force_close - Set to True to force close and do reconnect 

200 after each request (and between redirects). 

201 limit - The total number of simultaneous connections. 

202 limit_per_host - Number of simultaneous connections to one host. 

203 enable_cleanup_closed - Enables clean-up closed ssl transports. 

204 Disabled by default. 

205 loop - Optional event loop. 

206 """ 

207 

208 _closed = True # prevent AttributeError in __del__ if ctor was failed 

209 _source_traceback = None 

210 

211 # abort transport after 2 seconds (cleanup broken connections) 

212 _cleanup_closed_period = 2.0 

213 

214 def __init__( 

215 self, 

216 *, 

217 keepalive_timeout: Union[object, None, float] = sentinel, 

218 force_close: bool = False, 

219 limit: int = 100, 

220 limit_per_host: int = 0, 

221 enable_cleanup_closed: bool = False, 

222 loop: Optional[asyncio.AbstractEventLoop] = None, 

223 ) -> None: 

224 

225 if force_close: 

226 if keepalive_timeout is not None and keepalive_timeout is not sentinel: 

227 raise ValueError( 

228 "keepalive_timeout cannot " "be set if force_close is True" 

229 ) 

230 else: 

231 if keepalive_timeout is sentinel: 

232 keepalive_timeout = 15.0 

233 

234 loop = get_running_loop(loop) 

235 

236 self._closed = False 

237 if loop.get_debug(): 

238 self._source_traceback = traceback.extract_stack(sys._getframe(1)) 

239 

240 self._conns: Dict[ConnectionKey, List[Tuple[ResponseHandler, float]]] = {} 

241 self._limit = limit 

242 self._limit_per_host = limit_per_host 

243 self._acquired: Set[ResponseHandler] = set() 

244 self._acquired_per_host: DefaultDict[ 

245 ConnectionKey, Set[ResponseHandler] 

246 ] = defaultdict(set) 

247 self._keepalive_timeout = cast(float, keepalive_timeout) 

248 self._force_close = force_close 

249 

250 # {host_key: FIFO list of waiters} 

251 self._waiters = defaultdict(deque) # type: ignore[var-annotated] 

252 

253 self._loop = loop 

254 self._factory = functools.partial(ResponseHandler, loop=loop) 

255 

256 self.cookies: SimpleCookie[str] = SimpleCookie() 

257 

258 # start keep-alive connection cleanup task 

259 self._cleanup_handle: Optional[asyncio.TimerHandle] = None 

260 

261 # start cleanup closed transports task 

262 self._cleanup_closed_handle: Optional[asyncio.TimerHandle] = None 

263 self._cleanup_closed_disabled = not enable_cleanup_closed 

264 self._cleanup_closed_transports: List[Optional[asyncio.Transport]] = [] 

265 self._cleanup_closed() 

266 

267 def __del__(self, _warnings: Any = warnings) -> None: 

268 if self._closed: 

269 return 

270 if not self._conns: 

271 return 

272 

273 conns = [repr(c) for c in self._conns.values()] 

274 

275 self._close() 

276 

277 if PY_36: 

278 kwargs = {"source": self} 

279 else: 

280 kwargs = {} 

281 _warnings.warn(f"Unclosed connector {self!r}", ResourceWarning, **kwargs) 

282 context = { 

283 "connector": self, 

284 "connections": conns, 

285 "message": "Unclosed connector", 

286 } 

287 if self._source_traceback is not None: 

288 context["source_traceback"] = self._source_traceback 

289 self._loop.call_exception_handler(context) 

290 

291 def __enter__(self) -> "BaseConnector": 

292 warnings.warn( 

293 '"with Connector():" is deprecated, ' 

294 'use "async with Connector():" instead', 

295 DeprecationWarning, 

296 ) 

297 return self 

298 

299 def __exit__(self, *exc: Any) -> None: 

300 self._close() 

301 

302 async def __aenter__(self) -> "BaseConnector": 

303 return self 

304 

305 async def __aexit__( 

306 self, 

307 exc_type: Optional[Type[BaseException]] = None, 

308 exc_value: Optional[BaseException] = None, 

309 exc_traceback: Optional[TracebackType] = None, 

310 ) -> None: 

311 await self.close() 

312 

313 @property 

314 def force_close(self) -> bool: 

315 """Ultimately close connection on releasing if True.""" 

316 return self._force_close 

317 

318 @property 

319 def limit(self) -> int: 

320 """The total number for simultaneous connections. 

321 

322 If limit is 0 the connector has no limit. 

323 The default limit size is 100. 

324 """ 

325 return self._limit 

326 

327 @property 

328 def limit_per_host(self) -> int: 

329 """The limit for simultaneous connections to the same endpoint. 

330 

331 Endpoints are the same if they are have equal 

332 (host, port, is_ssl) triple. 

333 """ 

334 return self._limit_per_host 

335 

336 def _cleanup(self) -> None: 

337 """Cleanup unused transports.""" 

338 if self._cleanup_handle: 

339 self._cleanup_handle.cancel() 

340 # _cleanup_handle should be unset, otherwise _release() will not 

341 # recreate it ever! 

342 self._cleanup_handle = None 

343 

344 now = self._loop.time() 

345 timeout = self._keepalive_timeout 

346 

347 if self._conns: 

348 connections = {} 

349 deadline = now - timeout 

350 for key, conns in self._conns.items(): 

351 alive = [] 

352 for proto, use_time in conns: 

353 if proto.is_connected(): 

354 if use_time - deadline < 0: 

355 transport = proto.transport 

356 proto.close() 

357 if key.is_ssl and not self._cleanup_closed_disabled: 

358 self._cleanup_closed_transports.append(transport) 

359 else: 

360 alive.append((proto, use_time)) 

361 else: 

362 transport = proto.transport 

363 proto.close() 

364 if key.is_ssl and not self._cleanup_closed_disabled: 

365 self._cleanup_closed_transports.append(transport) 

366 

367 if alive: 

368 connections[key] = alive 

369 

370 self._conns = connections 

371 

372 if self._conns: 

373 self._cleanup_handle = helpers.weakref_handle( 

374 self, "_cleanup", timeout, self._loop 

375 ) 

376 

377 def _drop_acquired_per_host( 

378 self, key: "ConnectionKey", val: ResponseHandler 

379 ) -> None: 

380 acquired_per_host = self._acquired_per_host 

381 if key not in acquired_per_host: 

382 return 

383 conns = acquired_per_host[key] 

384 conns.remove(val) 

385 if not conns: 

386 del self._acquired_per_host[key] 

387 

388 def _cleanup_closed(self) -> None: 

389 """Double confirmation for transport close. 

390 

391 Some broken ssl servers may leave socket open without proper close. 

392 """ 

393 if self._cleanup_closed_handle: 

394 self._cleanup_closed_handle.cancel() 

395 

396 for transport in self._cleanup_closed_transports: 

397 if transport is not None: 

398 transport.abort() 

399 

400 self._cleanup_closed_transports = [] 

401 

402 if not self._cleanup_closed_disabled: 

403 self._cleanup_closed_handle = helpers.weakref_handle( 

404 self, "_cleanup_closed", self._cleanup_closed_period, self._loop 

405 ) 

406 

407 def close(self) -> Awaitable[None]: 

408 """Close all opened transports.""" 

409 self._close() 

410 return _DeprecationWaiter(noop()) 

411 

412 def _close(self) -> None: 

413 if self._closed: 

414 return 

415 

416 self._closed = True 

417 

418 try: 

419 if self._loop.is_closed(): 

420 return 

421 

422 # cancel cleanup task 

423 if self._cleanup_handle: 

424 self._cleanup_handle.cancel() 

425 

426 # cancel cleanup close task 

427 if self._cleanup_closed_handle: 

428 self._cleanup_closed_handle.cancel() 

429 

430 for data in self._conns.values(): 

431 for proto, t0 in data: 

432 proto.close() 

433 

434 for proto in self._acquired: 

435 proto.close() 

436 

437 for transport in self._cleanup_closed_transports: 

438 if transport is not None: 

439 transport.abort() 

440 

441 finally: 

442 self._conns.clear() 

443 self._acquired.clear() 

444 self._waiters.clear() 

445 self._cleanup_handle = None 

446 self._cleanup_closed_transports.clear() 

447 self._cleanup_closed_handle = None 

448 

449 @property 

450 def closed(self) -> bool: 

451 """Is connector closed. 

452 

453 A readonly property. 

454 """ 

455 return self._closed 

456 

457 def _available_connections(self, key: "ConnectionKey") -> int: 

458 """ 

459 Return number of available connections. 

460 

461 The limit, limit_per_host and the connection key are taken into account. 

462 

463 If it returns less than 1 means that there are no connections 

464 available. 

465 """ 

466 if self._limit: 

467 # total calc available connections 

468 available = self._limit - len(self._acquired) 

469 

470 # check limit per host 

471 if ( 

472 self._limit_per_host 

473 and available > 0 

474 and key in self._acquired_per_host 

475 ): 

476 acquired = self._acquired_per_host.get(key) 

477 assert acquired is not None 

478 available = self._limit_per_host - len(acquired) 

479 

480 elif self._limit_per_host and key in self._acquired_per_host: 

481 # check limit per host 

482 acquired = self._acquired_per_host.get(key) 

483 assert acquired is not None 

484 available = self._limit_per_host - len(acquired) 

485 else: 

486 available = 1 

487 

488 return available 

489 

490 async def connect( 

491 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

492 ) -> Connection: 

493 """Get from pool or create new connection.""" 

494 key = req.connection_key 

495 available = self._available_connections(key) 

496 

497 # Wait if there are no available connections or if there are/were 

498 # waiters (i.e. don't steal connection from a waiter about to wake up) 

499 if available <= 0 or key in self._waiters: 

500 fut = self._loop.create_future() 

501 

502 # This connection will now count towards the limit. 

503 self._waiters[key].append(fut) 

504 

505 if traces: 

506 for trace in traces: 

507 await trace.send_connection_queued_start() 

508 

509 try: 

510 await fut 

511 except BaseException as e: 

512 if key in self._waiters: 

513 # remove a waiter even if it was cancelled, normally it's 

514 # removed when it's notified 

515 try: 

516 self._waiters[key].remove(fut) 

517 except ValueError: # fut may no longer be in list 

518 pass 

519 

520 raise e 

521 finally: 

522 if key in self._waiters and not self._waiters[key]: 

523 del self._waiters[key] 

524 

525 if traces: 

526 for trace in traces: 

527 await trace.send_connection_queued_end() 

528 

529 proto = self._get(key) 

530 if proto is None: 

531 placeholder = cast(ResponseHandler, _TransportPlaceholder()) 

532 self._acquired.add(placeholder) 

533 self._acquired_per_host[key].add(placeholder) 

534 

535 if traces: 

536 for trace in traces: 

537 await trace.send_connection_create_start() 

538 

539 try: 

540 proto = await self._create_connection(req, traces, timeout) 

541 if self._closed: 

542 proto.close() 

543 raise ClientConnectionError("Connector is closed.") 

544 except BaseException: 

545 if not self._closed: 

546 self._acquired.remove(placeholder) 

547 self._drop_acquired_per_host(key, placeholder) 

548 self._release_waiter() 

549 raise 

550 else: 

551 if not self._closed: 

552 self._acquired.remove(placeholder) 

553 self._drop_acquired_per_host(key, placeholder) 

554 

555 if traces: 

556 for trace in traces: 

557 await trace.send_connection_create_end() 

558 else: 

559 if traces: 

560 # Acquire the connection to prevent race conditions with limits 

561 placeholder = cast(ResponseHandler, _TransportPlaceholder()) 

562 self._acquired.add(placeholder) 

563 self._acquired_per_host[key].add(placeholder) 

564 for trace in traces: 

565 await trace.send_connection_reuseconn() 

566 self._acquired.remove(placeholder) 

567 self._drop_acquired_per_host(key, placeholder) 

568 

569 self._acquired.add(proto) 

570 self._acquired_per_host[key].add(proto) 

571 return Connection(self, key, proto, self._loop) 

572 

573 def _get(self, key: "ConnectionKey") -> Optional[ResponseHandler]: 

574 try: 

575 conns = self._conns[key] 

576 except KeyError: 

577 return None 

578 

579 t1 = self._loop.time() 

580 while conns: 

581 proto, t0 = conns.pop() 

582 if proto.is_connected(): 

583 if t1 - t0 > self._keepalive_timeout: 

584 transport = proto.transport 

585 proto.close() 

586 # only for SSL transports 

587 if key.is_ssl and not self._cleanup_closed_disabled: 

588 self._cleanup_closed_transports.append(transport) 

589 else: 

590 if not conns: 

591 # The very last connection was reclaimed: drop the key 

592 del self._conns[key] 

593 return proto 

594 else: 

595 transport = proto.transport 

596 proto.close() 

597 if key.is_ssl and not self._cleanup_closed_disabled: 

598 self._cleanup_closed_transports.append(transport) 

599 

600 # No more connections: drop the key 

601 del self._conns[key] 

602 return None 

603 

604 def _release_waiter(self) -> None: 

605 """ 

606 Iterates over all waiters until one to be released is found. 

607 

608 The one to be released is not finsihed and 

609 belongs to a host that has available connections. 

610 """ 

611 if not self._waiters: 

612 return 

613 

614 # Having the dict keys ordered this avoids to iterate 

615 # at the same order at each call. 

616 queues = list(self._waiters.keys()) 

617 random.shuffle(queues) 

618 

619 for key in queues: 

620 if self._available_connections(key) < 1: 

621 continue 

622 

623 waiters = self._waiters[key] 

624 while waiters: 

625 waiter = waiters.popleft() 

626 if not waiter.done(): 

627 waiter.set_result(None) 

628 return 

629 

630 def _release_acquired(self, key: "ConnectionKey", proto: ResponseHandler) -> None: 

631 if self._closed: 

632 # acquired connection is already released on connector closing 

633 return 

634 

635 try: 

636 self._acquired.remove(proto) 

637 self._drop_acquired_per_host(key, proto) 

638 except KeyError: # pragma: no cover 

639 # this may be result of undetermenistic order of objects 

640 # finalization due garbage collection. 

641 pass 

642 else: 

643 self._release_waiter() 

644 

645 def _release( 

646 self, 

647 key: "ConnectionKey", 

648 protocol: ResponseHandler, 

649 *, 

650 should_close: bool = False, 

651 ) -> None: 

652 if self._closed: 

653 # acquired connection is already released on connector closing 

654 return 

655 

656 self._release_acquired(key, protocol) 

657 

658 if self._force_close: 

659 should_close = True 

660 

661 if should_close or protocol.should_close: 

662 transport = protocol.transport 

663 protocol.close() 

664 

665 if key.is_ssl and not self._cleanup_closed_disabled: 

666 self._cleanup_closed_transports.append(transport) 

667 else: 

668 conns = self._conns.get(key) 

669 if conns is None: 

670 conns = self._conns[key] = [] 

671 conns.append((protocol, self._loop.time())) 

672 

673 if self._cleanup_handle is None: 

674 self._cleanup_handle = helpers.weakref_handle( 

675 self, "_cleanup", self._keepalive_timeout, self._loop 

676 ) 

677 

678 async def _create_connection( 

679 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

680 ) -> ResponseHandler: 

681 raise NotImplementedError() 

682 

683 

684class _DNSCacheTable: 

685 def __init__(self, ttl: Optional[float] = None) -> None: 

686 self._addrs_rr: Dict[Tuple[str, int], Tuple[Iterator[Dict[str, Any]], int]] = {} 

687 self._timestamps: Dict[Tuple[str, int], float] = {} 

688 self._ttl = ttl 

689 

690 def __contains__(self, host: object) -> bool: 

691 return host in self._addrs_rr 

692 

693 def add(self, key: Tuple[str, int], addrs: List[Dict[str, Any]]) -> None: 

694 self._addrs_rr[key] = (cycle(addrs), len(addrs)) 

695 

696 if self._ttl: 

697 self._timestamps[key] = monotonic() 

698 

699 def remove(self, key: Tuple[str, int]) -> None: 

700 self._addrs_rr.pop(key, None) 

701 

702 if self._ttl: 

703 self._timestamps.pop(key, None) 

704 

705 def clear(self) -> None: 

706 self._addrs_rr.clear() 

707 self._timestamps.clear() 

708 

709 def next_addrs(self, key: Tuple[str, int]) -> List[Dict[str, Any]]: 

710 loop, length = self._addrs_rr[key] 

711 addrs = list(islice(loop, length)) 

712 # Consume one more element to shift internal state of `cycle` 

713 next(loop) 

714 return addrs 

715 

716 def expired(self, key: Tuple[str, int]) -> bool: 

717 if self._ttl is None: 

718 return False 

719 

720 return self._timestamps[key] + self._ttl < monotonic() 

721 

722 

723class TCPConnector(BaseConnector): 

724 """TCP connector. 

725 

726 verify_ssl - Set to True to check ssl certifications. 

727 fingerprint - Pass the binary sha256 

728 digest of the expected certificate in DER format to verify 

729 that the certificate the server presents matches. See also 

730 https://en.wikipedia.org/wiki/Transport_Layer_Security#Certificate_pinning 

731 resolver - Enable DNS lookups and use this 

732 resolver 

733 use_dns_cache - Use memory cache for DNS lookups. 

734 ttl_dns_cache - Max seconds having cached a DNS entry, None forever. 

735 family - socket address family 

736 local_addr - local tuple of (host, port) to bind socket to 

737 

738 keepalive_timeout - (optional) Keep-alive timeout. 

739 force_close - Set to True to force close and do reconnect 

740 after each request (and between redirects). 

741 limit - The total number of simultaneous connections. 

742 limit_per_host - Number of simultaneous connections to one host. 

743 enable_cleanup_closed - Enables clean-up closed ssl transports. 

744 Disabled by default. 

745 loop - Optional event loop. 

746 """ 

747 

748 def __init__( 

749 self, 

750 *, 

751 verify_ssl: bool = True, 

752 fingerprint: Optional[bytes] = None, 

753 use_dns_cache: bool = True, 

754 ttl_dns_cache: Optional[int] = 10, 

755 family: int = 0, 

756 ssl_context: Optional[SSLContext] = None, 

757 ssl: Union[None, bool, Fingerprint, SSLContext] = None, 

758 local_addr: Optional[Tuple[str, int]] = None, 

759 resolver: Optional[AbstractResolver] = None, 

760 keepalive_timeout: Union[None, float, object] = sentinel, 

761 force_close: bool = False, 

762 limit: int = 100, 

763 limit_per_host: int = 0, 

764 enable_cleanup_closed: bool = False, 

765 loop: Optional[asyncio.AbstractEventLoop] = None, 

766 ): 

767 super().__init__( 

768 keepalive_timeout=keepalive_timeout, 

769 force_close=force_close, 

770 limit=limit, 

771 limit_per_host=limit_per_host, 

772 enable_cleanup_closed=enable_cleanup_closed, 

773 loop=loop, 

774 ) 

775 

776 self._ssl = _merge_ssl_params(ssl, verify_ssl, ssl_context, fingerprint) 

777 if resolver is None: 

778 resolver = DefaultResolver(loop=self._loop) 

779 self._resolver = resolver 

780 

781 self._use_dns_cache = use_dns_cache 

782 self._cached_hosts = _DNSCacheTable(ttl=ttl_dns_cache) 

783 self._throttle_dns_events: Dict[Tuple[str, int], EventResultOrError] = {} 

784 self._family = family 

785 self._local_addr = local_addr 

786 

787 def close(self) -> Awaitable[None]: 

788 """Close all ongoing DNS calls.""" 

789 for ev in self._throttle_dns_events.values(): 

790 ev.cancel() 

791 

792 return super().close() 

793 

794 @property 

795 def family(self) -> int: 

796 """Socket family like AF_INET.""" 

797 return self._family 

798 

799 @property 

800 def use_dns_cache(self) -> bool: 

801 """True if local DNS caching is enabled.""" 

802 return self._use_dns_cache 

803 

804 def clear_dns_cache( 

805 self, host: Optional[str] = None, port: Optional[int] = None 

806 ) -> None: 

807 """Remove specified host/port or clear all dns local cache.""" 

808 if host is not None and port is not None: 

809 self._cached_hosts.remove((host, port)) 

810 elif host is not None or port is not None: 

811 raise ValueError("either both host and port " "or none of them are allowed") 

812 else: 

813 self._cached_hosts.clear() 

814 

815 async def _resolve_host( 

816 self, host: str, port: int, traces: Optional[List["Trace"]] = None 

817 ) -> List[Dict[str, Any]]: 

818 if is_ip_address(host): 

819 return [ 

820 { 

821 "hostname": host, 

822 "host": host, 

823 "port": port, 

824 "family": self._family, 

825 "proto": 0, 

826 "flags": 0, 

827 } 

828 ] 

829 

830 if not self._use_dns_cache: 

831 

832 if traces: 

833 for trace in traces: 

834 await trace.send_dns_resolvehost_start(host) 

835 

836 res = await self._resolver.resolve(host, port, family=self._family) 

837 

838 if traces: 

839 for trace in traces: 

840 await trace.send_dns_resolvehost_end(host) 

841 

842 return res 

843 

844 key = (host, port) 

845 

846 if (key in self._cached_hosts) and (not self._cached_hosts.expired(key)): 

847 # get result early, before any await (#4014) 

848 result = self._cached_hosts.next_addrs(key) 

849 

850 if traces: 

851 for trace in traces: 

852 await trace.send_dns_cache_hit(host) 

853 return result 

854 

855 if key in self._throttle_dns_events: 

856 # get event early, before any await (#4014) 

857 event = self._throttle_dns_events[key] 

858 if traces: 

859 for trace in traces: 

860 await trace.send_dns_cache_hit(host) 

861 await event.wait() 

862 else: 

863 # update dict early, before any await (#4014) 

864 self._throttle_dns_events[key] = EventResultOrError(self._loop) 

865 if traces: 

866 for trace in traces: 

867 await trace.send_dns_cache_miss(host) 

868 try: 

869 

870 if traces: 

871 for trace in traces: 

872 await trace.send_dns_resolvehost_start(host) 

873 

874 addrs = await self._resolver.resolve(host, port, family=self._family) 

875 if traces: 

876 for trace in traces: 

877 await trace.send_dns_resolvehost_end(host) 

878 

879 self._cached_hosts.add(key, addrs) 

880 self._throttle_dns_events[key].set() 

881 except BaseException as e: 

882 # any DNS exception, independently of the implementation 

883 # is set for the waiters to raise the same exception. 

884 self._throttle_dns_events[key].set(exc=e) 

885 raise 

886 finally: 

887 self._throttle_dns_events.pop(key) 

888 

889 return self._cached_hosts.next_addrs(key) 

890 

891 async def _create_connection( 

892 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

893 ) -> ResponseHandler: 

894 """Create connection. 

895 

896 Has same keyword arguments as BaseEventLoop.create_connection. 

897 """ 

898 if req.proxy: 

899 _, proto = await self._create_proxy_connection(req, traces, timeout) 

900 else: 

901 _, proto = await self._create_direct_connection(req, traces, timeout) 

902 

903 return proto 

904 

905 @staticmethod 

906 @functools.lru_cache(None) 

907 def _make_ssl_context(verified: bool) -> SSLContext: 

908 if verified: 

909 return ssl.create_default_context() 

910 else: 

911 sslcontext = ssl.SSLContext(ssl.PROTOCOL_TLS_CLIENT) 

912 sslcontext.options |= ssl.OP_NO_SSLv2 

913 sslcontext.options |= ssl.OP_NO_SSLv3 

914 sslcontext.check_hostname = False 

915 sslcontext.verify_mode = ssl.CERT_NONE 

916 try: 

917 sslcontext.options |= ssl.OP_NO_COMPRESSION 

918 except AttributeError as attr_err: 

919 warnings.warn( 

920 "{!s}: The Python interpreter is compiled " 

921 "against OpenSSL < 1.0.0. Ref: " 

922 "https://docs.python.org/3/library/ssl.html" 

923 "#ssl.OP_NO_COMPRESSION".format(attr_err), 

924 ) 

925 sslcontext.set_default_verify_paths() 

926 return sslcontext 

927 

928 def _get_ssl_context(self, req: "ClientRequest") -> Optional[SSLContext]: 

929 """Logic to get the correct SSL context 

930 

931 0. if req.ssl is false, return None 

932 

933 1. if ssl_context is specified in req, use it 

934 2. if _ssl_context is specified in self, use it 

935 3. otherwise: 

936 1. if verify_ssl is not specified in req, use self.ssl_context 

937 (will generate a default context according to self.verify_ssl) 

938 2. if verify_ssl is True in req, generate a default SSL context 

939 3. if verify_ssl is False in req, generate a SSL context that 

940 won't verify 

941 """ 

942 if req.is_ssl(): 

943 if ssl is None: # pragma: no cover 

944 raise RuntimeError("SSL is not supported.") 

945 sslcontext = req.ssl 

946 if isinstance(sslcontext, ssl.SSLContext): 

947 return sslcontext 

948 if sslcontext is not None: 

949 # not verified or fingerprinted 

950 return self._make_ssl_context(False) 

951 sslcontext = self._ssl 

952 if isinstance(sslcontext, ssl.SSLContext): 

953 return sslcontext 

954 if sslcontext is not None: 

955 # not verified or fingerprinted 

956 return self._make_ssl_context(False) 

957 return self._make_ssl_context(True) 

958 else: 

959 return None 

960 

961 def _get_fingerprint(self, req: "ClientRequest") -> Optional["Fingerprint"]: 

962 ret = req.ssl 

963 if isinstance(ret, Fingerprint): 

964 return ret 

965 ret = self._ssl 

966 if isinstance(ret, Fingerprint): 

967 return ret 

968 return None 

969 

970 async def _wrap_create_connection( 

971 self, 

972 *args: Any, 

973 req: "ClientRequest", 

974 timeout: "ClientTimeout", 

975 client_error: Type[Exception] = ClientConnectorError, 

976 **kwargs: Any, 

977 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

978 try: 

979 async with ceil_timeout(timeout.sock_connect): 

980 return await self._loop.create_connection(*args, **kwargs) # type: ignore[return-value] # noqa 

981 except cert_errors as exc: 

982 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

983 except ssl_errors as exc: 

984 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

985 except OSError as exc: 

986 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

987 raise 

988 raise client_error(req.connection_key, exc) from exc 

989 

990 def _fail_on_no_start_tls(self, req: "ClientRequest") -> None: 

991 """Raise a :py:exc:`RuntimeError` on missing ``start_tls()``. 

992 

993 One case is that :py:meth:`asyncio.loop.start_tls` is not yet 

994 implemented under Python 3.6. It is necessary for TLS-in-TLS so 

995 that it is possible to send HTTPS queries through HTTPS proxies. 

996 

997 This doesn't affect regular HTTP requests, though. 

998 """ 

999 if not req.is_ssl(): 

1000 return 

1001 

1002 proxy_url = req.proxy 

1003 assert proxy_url is not None 

1004 if proxy_url.scheme != "https": 

1005 return 

1006 

1007 self._check_loop_for_start_tls() 

1008 

1009 def _check_loop_for_start_tls(self) -> None: 

1010 try: 

1011 self._loop.start_tls 

1012 except AttributeError as attr_exc: 

1013 raise RuntimeError( 

1014 "An HTTPS request is being sent through an HTTPS proxy. " 

1015 "This needs support for TLS in TLS but it is not implemented " 

1016 "in your runtime for the stdlib asyncio.\n\n" 

1017 "Please upgrade to Python 3.7 or higher. For more details, " 

1018 "please see:\n" 

1019 "* https://bugs.python.org/issue37179\n" 

1020 "* https://github.com/python/cpython/pull/28073\n" 

1021 "* https://docs.aiohttp.org/en/stable/" 

1022 "client_advanced.html#proxy-support\n" 

1023 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1024 ) from attr_exc 

1025 

1026 def _loop_supports_start_tls(self) -> bool: 

1027 try: 

1028 self._check_loop_for_start_tls() 

1029 except RuntimeError: 

1030 return False 

1031 else: 

1032 return True 

1033 

1034 def _warn_about_tls_in_tls( 

1035 self, 

1036 underlying_transport: asyncio.Transport, 

1037 req: "ClientRequest", 

1038 ) -> None: 

1039 """Issue a warning if the requested URL has HTTPS scheme.""" 

1040 if req.request_info.url.scheme != "https": 

1041 return 

1042 

1043 asyncio_supports_tls_in_tls = getattr( 

1044 underlying_transport, 

1045 "_start_tls_compatible", 

1046 False, 

1047 ) 

1048 

1049 if asyncio_supports_tls_in_tls: 

1050 return 

1051 

1052 warnings.warn( 

1053 "An HTTPS request is being sent through an HTTPS proxy. " 

1054 "This support for TLS in TLS is known to be disabled " 

1055 "in the stdlib asyncio. This is why you'll probably see " 

1056 "an error in the log below.\n\n" 

1057 "It is possible to enable it via monkeypatching under " 

1058 "Python 3.7 or higher. For more details, see:\n" 

1059 "* https://bugs.python.org/issue37179\n" 

1060 "* https://github.com/python/cpython/pull/28073\n\n" 

1061 "You can temporarily patch this as follows:\n" 

1062 "* https://docs.aiohttp.org/en/stable/client_advanced.html#proxy-support\n" 

1063 "* https://github.com/aio-libs/aiohttp/discussions/6044\n", 

1064 RuntimeWarning, 

1065 source=self, 

1066 # Why `4`? At least 3 of the calls in the stack originate 

1067 # from the methods in this class. 

1068 stacklevel=3, 

1069 ) 

1070 

1071 async def _start_tls_connection( 

1072 self, 

1073 underlying_transport: asyncio.Transport, 

1074 req: "ClientRequest", 

1075 timeout: "ClientTimeout", 

1076 client_error: Type[Exception] = ClientConnectorError, 

1077 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1078 """Wrap the raw TCP transport with TLS.""" 

1079 tls_proto = self._factory() # Create a brand new proto for TLS 

1080 

1081 # Safety of the `cast()` call here is based on the fact that 

1082 # internally `_get_ssl_context()` only returns `None` when 

1083 # `req.is_ssl()` evaluates to `False` which is never gonna happen 

1084 # in this code path. Of course, it's rather fragile 

1085 # maintainability-wise but this is to be solved separately. 

1086 sslcontext = cast(ssl.SSLContext, self._get_ssl_context(req)) 

1087 

1088 try: 

1089 async with ceil_timeout(timeout.sock_connect): 

1090 try: 

1091 tls_transport = await self._loop.start_tls( 

1092 underlying_transport, 

1093 tls_proto, 

1094 sslcontext, 

1095 server_hostname=req.host, 

1096 ssl_handshake_timeout=timeout.total, 

1097 ) 

1098 except BaseException: 

1099 # We need to close the underlying transport since 

1100 # `start_tls()` probably failed before it had a 

1101 # chance to do this: 

1102 underlying_transport.close() 

1103 raise 

1104 except cert_errors as exc: 

1105 raise ClientConnectorCertificateError(req.connection_key, exc) from exc 

1106 except ssl_errors as exc: 

1107 raise ClientConnectorSSLError(req.connection_key, exc) from exc 

1108 except OSError as exc: 

1109 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1110 raise 

1111 raise client_error(req.connection_key, exc) from exc 

1112 except TypeError as type_err: 

1113 # Example cause looks like this: 

1114 # TypeError: transport <asyncio.sslproto._SSLProtocolTransport 

1115 # object at 0x7f760615e460> is not supported by start_tls() 

1116 

1117 raise ClientConnectionError( 

1118 "Cannot initialize a TLS-in-TLS connection to host " 

1119 f"{req.host!s}:{req.port:d} through an underlying connection " 

1120 f"to an HTTPS proxy {req.proxy!s} ssl:{req.ssl or 'default'} " 

1121 f"[{type_err!s}]" 

1122 ) from type_err 

1123 else: 

1124 tls_proto.connection_made( 

1125 tls_transport 

1126 ) # Kick the state machine of the new TLS protocol 

1127 

1128 return tls_transport, tls_proto 

1129 

1130 async def _create_direct_connection( 

1131 self, 

1132 req: "ClientRequest", 

1133 traces: List["Trace"], 

1134 timeout: "ClientTimeout", 

1135 *, 

1136 client_error: Type[Exception] = ClientConnectorError, 

1137 ) -> Tuple[asyncio.Transport, ResponseHandler]: 

1138 sslcontext = self._get_ssl_context(req) 

1139 fingerprint = self._get_fingerprint(req) 

1140 

1141 host = req.url.raw_host 

1142 assert host is not None 

1143 port = req.port 

1144 assert port is not None 

1145 host_resolved = asyncio.ensure_future( 

1146 self._resolve_host(host, port, traces=traces), loop=self._loop 

1147 ) 

1148 try: 

1149 # Cancelling this lookup should not cancel the underlying lookup 

1150 # or else the cancel event will get broadcast to all the waiters 

1151 # across all connections. 

1152 hosts = await asyncio.shield(host_resolved) 

1153 except asyncio.CancelledError: 

1154 

1155 def drop_exception(fut: "asyncio.Future[List[Dict[str, Any]]]") -> None: 

1156 with suppress(Exception, asyncio.CancelledError): 

1157 fut.result() 

1158 

1159 host_resolved.add_done_callback(drop_exception) 

1160 raise 

1161 except OSError as exc: 

1162 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1163 raise 

1164 # in case of proxy it is not ClientProxyConnectionError 

1165 # it is problem of resolving proxy ip itself 

1166 raise ClientConnectorError(req.connection_key, exc) from exc 

1167 

1168 last_exc: Optional[Exception] = None 

1169 

1170 for hinfo in hosts: 

1171 host = hinfo["host"] 

1172 port = hinfo["port"] 

1173 

1174 try: 

1175 transp, proto = await self._wrap_create_connection( 

1176 self._factory, 

1177 host, 

1178 port, 

1179 timeout=timeout, 

1180 ssl=sslcontext, 

1181 family=hinfo["family"], 

1182 proto=hinfo["proto"], 

1183 flags=hinfo["flags"], 

1184 server_hostname=hinfo["hostname"] if sslcontext else None, 

1185 local_addr=self._local_addr, 

1186 req=req, 

1187 client_error=client_error, 

1188 ) 

1189 except ClientConnectorError as exc: 

1190 last_exc = exc 

1191 continue 

1192 

1193 if req.is_ssl() and fingerprint: 

1194 try: 

1195 fingerprint.check(transp) 

1196 except ServerFingerprintMismatch as exc: 

1197 transp.close() 

1198 if not self._cleanup_closed_disabled: 

1199 self._cleanup_closed_transports.append(transp) 

1200 last_exc = exc 

1201 continue 

1202 

1203 return transp, proto 

1204 else: 

1205 assert last_exc is not None 

1206 raise last_exc 

1207 

1208 async def _create_proxy_connection( 

1209 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

1210 ) -> Tuple[asyncio.BaseTransport, ResponseHandler]: 

1211 self._fail_on_no_start_tls(req) 

1212 runtime_has_start_tls = self._loop_supports_start_tls() 

1213 

1214 headers: Dict[str, str] = {} 

1215 if req.proxy_headers is not None: 

1216 headers = req.proxy_headers # type: ignore[assignment] 

1217 headers[hdrs.HOST] = req.headers[hdrs.HOST] 

1218 

1219 url = req.proxy 

1220 assert url is not None 

1221 proxy_req = ClientRequest( 

1222 hdrs.METH_GET, 

1223 url, 

1224 headers=headers, 

1225 auth=req.proxy_auth, 

1226 loop=self._loop, 

1227 ssl=req.ssl, 

1228 ) 

1229 

1230 # create connection to proxy server 

1231 transport, proto = await self._create_direct_connection( 

1232 proxy_req, [], timeout, client_error=ClientProxyConnectionError 

1233 ) 

1234 

1235 # Many HTTP proxies has buggy keepalive support. Let's not 

1236 # reuse connection but close it after processing every 

1237 # response. 

1238 proto.force_close() 

1239 

1240 auth = proxy_req.headers.pop(hdrs.AUTHORIZATION, None) 

1241 if auth is not None: 

1242 if not req.is_ssl(): 

1243 req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1244 else: 

1245 proxy_req.headers[hdrs.PROXY_AUTHORIZATION] = auth 

1246 

1247 if req.is_ssl(): 

1248 if runtime_has_start_tls: 

1249 self._warn_about_tls_in_tls(transport, req) 

1250 

1251 # For HTTPS requests over HTTP proxy 

1252 # we must notify proxy to tunnel connection 

1253 # so we send CONNECT command: 

1254 # CONNECT www.python.org:443 HTTP/1.1 

1255 # Host: www.python.org 

1256 # 

1257 # next we must do TLS handshake and so on 

1258 # to do this we must wrap raw socket into secure one 

1259 # asyncio handles this perfectly 

1260 proxy_req.method = hdrs.METH_CONNECT 

1261 proxy_req.url = req.url 

1262 key = attr.evolve( 

1263 req.connection_key, proxy=None, proxy_auth=None, proxy_headers_hash=None 

1264 ) 

1265 conn = Connection(self, key, proto, self._loop) 

1266 proxy_resp = await proxy_req.send(conn) 

1267 try: 

1268 protocol = conn._protocol 

1269 assert protocol is not None 

1270 

1271 # read_until_eof=True will ensure the connection isn't closed 

1272 # once the response is received and processed allowing 

1273 # START_TLS to work on the connection below. 

1274 protocol.set_response_params(read_until_eof=runtime_has_start_tls) 

1275 resp = await proxy_resp.start(conn) 

1276 except BaseException: 

1277 proxy_resp.close() 

1278 conn.close() 

1279 raise 

1280 else: 

1281 conn._protocol = None 

1282 conn._transport = None 

1283 try: 

1284 if resp.status != 200: 

1285 message = resp.reason 

1286 if message is None: 

1287 message = RESPONSES[resp.status][0] 

1288 raise ClientHttpProxyError( 

1289 proxy_resp.request_info, 

1290 resp.history, 

1291 status=resp.status, 

1292 message=message, 

1293 headers=resp.headers, 

1294 ) 

1295 if not runtime_has_start_tls: 

1296 rawsock = transport.get_extra_info("socket", default=None) 

1297 if rawsock is None: 

1298 raise RuntimeError( 

1299 "Transport does not expose socket instance" 

1300 ) 

1301 # Duplicate the socket, so now we can close proxy transport 

1302 rawsock = rawsock.dup() 

1303 except BaseException: 

1304 # It shouldn't be closed in `finally` because it's fed to 

1305 # `loop.start_tls()` and the docs say not to touch it after 

1306 # passing there. 

1307 transport.close() 

1308 raise 

1309 finally: 

1310 if not runtime_has_start_tls: 

1311 transport.close() 

1312 

1313 if not runtime_has_start_tls: 

1314 # HTTP proxy with support for upgrade to HTTPS 

1315 sslcontext = self._get_ssl_context(req) 

1316 return await self._wrap_create_connection( 

1317 self._factory, 

1318 timeout=timeout, 

1319 ssl=sslcontext, 

1320 sock=rawsock, 

1321 server_hostname=req.host, 

1322 req=req, 

1323 ) 

1324 

1325 return await self._start_tls_connection( 

1326 # Access the old transport for the last time before it's 

1327 # closed and forgotten forever: 

1328 transport, 

1329 req=req, 

1330 timeout=timeout, 

1331 ) 

1332 finally: 

1333 proxy_resp.close() 

1334 

1335 return transport, proto 

1336 

1337 

1338class UnixConnector(BaseConnector): 

1339 """Unix socket connector. 

1340 

1341 path - Unix socket path. 

1342 keepalive_timeout - (optional) Keep-alive timeout. 

1343 force_close - Set to True to force close and do reconnect 

1344 after each request (and between redirects). 

1345 limit - The total number of simultaneous connections. 

1346 limit_per_host - Number of simultaneous connections to one host. 

1347 loop - Optional event loop. 

1348 """ 

1349 

1350 def __init__( 

1351 self, 

1352 path: str, 

1353 force_close: bool = False, 

1354 keepalive_timeout: Union[object, float, None] = sentinel, 

1355 limit: int = 100, 

1356 limit_per_host: int = 0, 

1357 loop: Optional[asyncio.AbstractEventLoop] = None, 

1358 ) -> None: 

1359 super().__init__( 

1360 force_close=force_close, 

1361 keepalive_timeout=keepalive_timeout, 

1362 limit=limit, 

1363 limit_per_host=limit_per_host, 

1364 loop=loop, 

1365 ) 

1366 self._path = path 

1367 

1368 @property 

1369 def path(self) -> str: 

1370 """Path to unix socket.""" 

1371 return self._path 

1372 

1373 async def _create_connection( 

1374 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

1375 ) -> ResponseHandler: 

1376 try: 

1377 async with ceil_timeout(timeout.sock_connect): 

1378 _, proto = await self._loop.create_unix_connection( 

1379 self._factory, self._path 

1380 ) 

1381 except OSError as exc: 

1382 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1383 raise 

1384 raise UnixClientConnectorError(self.path, req.connection_key, exc) from exc 

1385 

1386 return cast(ResponseHandler, proto) 

1387 

1388 

1389class NamedPipeConnector(BaseConnector): 

1390 """Named pipe connector. 

1391 

1392 Only supported by the proactor event loop. 

1393 See also: https://docs.python.org/3.7/library/asyncio-eventloop.html 

1394 

1395 path - Windows named pipe path. 

1396 keepalive_timeout - (optional) Keep-alive timeout. 

1397 force_close - Set to True to force close and do reconnect 

1398 after each request (and between redirects). 

1399 limit - The total number of simultaneous connections. 

1400 limit_per_host - Number of simultaneous connections to one host. 

1401 loop - Optional event loop. 

1402 """ 

1403 

1404 def __init__( 

1405 self, 

1406 path: str, 

1407 force_close: bool = False, 

1408 keepalive_timeout: Union[object, float, None] = sentinel, 

1409 limit: int = 100, 

1410 limit_per_host: int = 0, 

1411 loop: Optional[asyncio.AbstractEventLoop] = None, 

1412 ) -> None: 

1413 super().__init__( 

1414 force_close=force_close, 

1415 keepalive_timeout=keepalive_timeout, 

1416 limit=limit, 

1417 limit_per_host=limit_per_host, 

1418 loop=loop, 

1419 ) 

1420 if not isinstance( 

1421 self._loop, asyncio.ProactorEventLoop # type: ignore[attr-defined] 

1422 ): 

1423 raise RuntimeError( 

1424 "Named Pipes only available in proactor " "loop under windows" 

1425 ) 

1426 self._path = path 

1427 

1428 @property 

1429 def path(self) -> str: 

1430 """Path to the named pipe.""" 

1431 return self._path 

1432 

1433 async def _create_connection( 

1434 self, req: "ClientRequest", traces: List["Trace"], timeout: "ClientTimeout" 

1435 ) -> ResponseHandler: 

1436 try: 

1437 async with ceil_timeout(timeout.sock_connect): 

1438 _, proto = await self._loop.create_pipe_connection( # type: ignore[attr-defined] # noqa: E501 

1439 self._factory, self._path 

1440 ) 

1441 # the drain is required so that the connection_made is called 

1442 # and transport is set otherwise it is not set before the 

1443 # `assert conn.transport is not None` 

1444 # in client.py's _request method 

1445 await asyncio.sleep(0) 

1446 # other option is to manually set transport like 

1447 # `proto.transport = trans` 

1448 except OSError as exc: 

1449 if exc.errno is None and isinstance(exc, asyncio.TimeoutError): 

1450 raise 

1451 raise ClientConnectorError(req.connection_key, exc) from exc 

1452 

1453 return cast(ResponseHandler, proto)