Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_sockets.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

266 statements  

1from __future__ import annotations 

2 

3import errno 

4import os 

5import socket 

6import ssl 

7import stat 

8import sys 

9from collections.abc import Awaitable 

10from dataclasses import dataclass 

11from ipaddress import IPv4Address, IPv6Address, ip_address 

12from os import PathLike, chmod 

13from socket import AddressFamily, SocketKind 

14from typing import TYPE_CHECKING, Any, Literal, cast, overload 

15 

16from .. import ConnectionFailed, to_thread 

17from ..abc import ( 

18 ByteStreamConnectable, 

19 ConnectedUDPSocket, 

20 ConnectedUNIXDatagramSocket, 

21 IPAddressType, 

22 IPSockAddrType, 

23 SocketListener, 

24 SocketStream, 

25 UDPSocket, 

26 UNIXDatagramSocket, 

27 UNIXSocketStream, 

28) 

29from ..streams.stapled import MultiListener 

30from ..streams.tls import TLSConnectable, TLSStream 

31from ._eventloop import get_async_backend 

32from ._resources import aclose_forcefully 

33from ._synchronization import Event 

34from ._tasks import create_task_group, move_on_after 

35 

36if TYPE_CHECKING: 

37 from _typeshed import FileDescriptorLike 

38else: 

39 FileDescriptorLike = object 

40 

41if sys.version_info < (3, 11): 

42 from exceptiongroup import ExceptionGroup 

43 

44if sys.version_info >= (3, 12): 

45 from typing import override 

46else: 

47 from typing_extensions import override 

48 

49if sys.version_info < (3, 13): 

50 from typing_extensions import deprecated 

51else: 

52 from warnings import deprecated 

53 

54IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41) # https://bugs.python.org/issue29515 

55 

56AnyIPAddressFamily = Literal[ 

57 AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6 

58] 

59IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6] 

60 

61 

62# tls_hostname given 

63@overload 

64async def connect_tcp( 

65 remote_host: IPAddressType, 

66 remote_port: int, 

67 *, 

68 local_host: IPAddressType | None = ..., 

69 ssl_context: ssl.SSLContext | None = ..., 

70 tls_standard_compatible: bool = ..., 

71 tls_hostname: str, 

72 happy_eyeballs_delay: float = ..., 

73) -> TLSStream: ... 

74 

75 

76# ssl_context given 

77@overload 

78async def connect_tcp( 

79 remote_host: IPAddressType, 

80 remote_port: int, 

81 *, 

82 local_host: IPAddressType | None = ..., 

83 ssl_context: ssl.SSLContext, 

84 tls_standard_compatible: bool = ..., 

85 tls_hostname: str | None = ..., 

86 happy_eyeballs_delay: float = ..., 

87) -> TLSStream: ... 

88 

89 

90# tls=True 

91@overload 

92async def connect_tcp( 

93 remote_host: IPAddressType, 

94 remote_port: int, 

95 *, 

96 local_host: IPAddressType | None = ..., 

97 tls: Literal[True], 

98 ssl_context: ssl.SSLContext | None = ..., 

99 tls_standard_compatible: bool = ..., 

100 tls_hostname: str | None = ..., 

101 happy_eyeballs_delay: float = ..., 

102) -> TLSStream: ... 

103 

104 

105# tls=False 

106@overload 

107async def connect_tcp( 

108 remote_host: IPAddressType, 

109 remote_port: int, 

110 *, 

111 local_host: IPAddressType | None = ..., 

112 tls: Literal[False], 

113 ssl_context: ssl.SSLContext | None = ..., 

114 tls_standard_compatible: bool = ..., 

115 tls_hostname: str | None = ..., 

116 happy_eyeballs_delay: float = ..., 

117) -> SocketStream: ... 

118 

119 

120# No TLS arguments 

121@overload 

122async def connect_tcp( 

123 remote_host: IPAddressType, 

124 remote_port: int, 

125 *, 

126 local_host: IPAddressType | None = ..., 

127 happy_eyeballs_delay: float = ..., 

128) -> SocketStream: ... 

129 

130 

131async def connect_tcp( 

132 remote_host: IPAddressType, 

133 remote_port: int, 

134 *, 

135 local_host: IPAddressType | None = None, 

136 tls: bool = False, 

137 ssl_context: ssl.SSLContext | None = None, 

138 tls_standard_compatible: bool = True, 

139 tls_hostname: str | None = None, 

140 happy_eyeballs_delay: float = 0.25, 

141) -> SocketStream | TLSStream: 

142 """ 

143 Connect to a host using the TCP protocol. 

144 

145 This function implements the stateless version of the Happy Eyeballs algorithm (RFC 

146 6555). If ``remote_host`` is a host name that resolves to multiple IP addresses, 

147 each one is tried until one connection attempt succeeds. If the first attempt does 

148 not connected within 250 milliseconds, a second attempt is started using the next 

149 address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if 

150 available) is tried first. 

151 

152 When the connection has been established, a TLS handshake will be done if either 

153 ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``. 

154 

155 :param remote_host: the IP address or host name to connect to 

156 :param remote_port: port on the target host to connect to 

157 :param local_host: the interface address or name to bind the socket to before 

158 connecting 

159 :param tls: ``True`` to do a TLS handshake with the connected stream and return a 

160 :class:`~anyio.streams.tls.TLSStream` instead 

161 :param ssl_context: the SSL context object to use (if omitted, a default context is 

162 created) 

163 :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake 

164 before closing the stream and requires that the server does this as well. 

165 Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream. 

166 Some protocols, such as HTTP, require this option to be ``False``. 

167 See :meth:`~ssl.SSLContext.wrap_socket` for details. 

168 :param tls_hostname: host name to check the server certificate against (defaults to 

169 the value of ``remote_host``) 

170 :param happy_eyeballs_delay: delay (in seconds) before starting the next connection 

171 attempt 

172 :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream 

173 :raises ConnectionFailed: if the connection fails 

174 

175 """ 

176 # Placed here due to https://github.com/python/mypy/issues/7057 

177 connected_stream: SocketStream | None = None 

178 

179 async def try_connect(remote_host: str, event: Event) -> None: 

180 nonlocal connected_stream 

181 try: 

182 stream = await asynclib.connect_tcp(remote_host, remote_port, local_address) 

183 except OSError as exc: 

184 oserrors.append(exc) 

185 return 

186 else: 

187 if connected_stream is None: 

188 connected_stream = stream 

189 tg.cancel_scope.cancel() 

190 else: 

191 await stream.aclose() 

192 finally: 

193 event.set() 

194 

195 asynclib = get_async_backend() 

196 local_address: IPSockAddrType | None = None 

197 family = socket.AF_UNSPEC 

198 if local_host: 

199 gai_res = await getaddrinfo(str(local_host), None) 

200 family, *_, local_address = gai_res[0] 

201 

202 target_host = str(remote_host) 

203 try: 

204 addr_obj = ip_address(remote_host) 

205 except ValueError: 

206 addr_obj = None 

207 

208 if addr_obj is not None: 

209 if isinstance(addr_obj, IPv6Address): 

210 target_addrs = [(socket.AF_INET6, addr_obj.compressed)] 

211 else: 

212 target_addrs = [(socket.AF_INET, addr_obj.compressed)] 

213 else: 

214 # getaddrinfo() will raise an exception if name resolution fails 

215 gai_res = await getaddrinfo( 

216 target_host, remote_port, family=family, type=socket.SOCK_STREAM 

217 ) 

218 

219 # Organize the list so that the first address is an IPv6 address (if available) 

220 # and the second one is an IPv4 addresses. The rest can be in whatever order. 

221 v6_found = v4_found = False 

222 target_addrs = [] 

223 for af, *_, sa in gai_res: 

224 if af == socket.AF_INET6 and not v6_found: 

225 v6_found = True 

226 target_addrs.insert(0, (af, sa[0])) 

227 elif af == socket.AF_INET and not v4_found and v6_found: 

228 v4_found = True 

229 target_addrs.insert(1, (af, sa[0])) 

230 else: 

231 target_addrs.append((af, sa[0])) 

232 

233 oserrors: list[OSError] = [] 

234 try: 

235 async with create_task_group() as tg: 

236 for _af, addr in target_addrs: 

237 event = Event() 

238 tg.start_soon(try_connect, addr, event) 

239 with move_on_after(happy_eyeballs_delay): 

240 await event.wait() 

241 

242 if connected_stream is None: 

243 cause = ( 

244 oserrors[0] 

245 if len(oserrors) == 1 

246 else ExceptionGroup("multiple connection attempts failed", oserrors) 

247 ) 

248 raise OSError("All connection attempts failed") from cause 

249 finally: 

250 oserrors.clear() 

251 

252 if tls or tls_hostname or ssl_context: 

253 try: 

254 return await TLSStream.wrap( 

255 connected_stream, 

256 server_side=False, 

257 hostname=tls_hostname or str(remote_host), 

258 ssl_context=ssl_context, 

259 standard_compatible=tls_standard_compatible, 

260 ) 

261 except BaseException: 

262 await aclose_forcefully(connected_stream) 

263 raise 

264 

265 return connected_stream 

266 

267 

268async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream: 

269 """ 

270 Connect to the given UNIX socket. 

271 

272 Not available on Windows. 

273 

274 :param path: path to the socket 

275 :return: a socket stream object 

276 :raises ConnectionFailed: if the connection fails 

277 

278 """ 

279 path = os.fspath(path) 

280 return await get_async_backend().connect_unix(path) 

281 

282 

283async def create_tcp_listener( 

284 *, 

285 local_host: IPAddressType | None = None, 

286 local_port: int = 0, 

287 family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC, 

288 backlog: int = 65536, 

289 reuse_port: bool = False, 

290) -> MultiListener[SocketStream]: 

291 """ 

292 Create a TCP socket listener. 

293 

294 :param local_port: port number to listen on 

295 :param local_host: IP address of the interface to listen on. If omitted, listen on 

296 all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address 

297 family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6. 

298 :param family: address family (used if ``local_host`` was omitted) 

299 :param backlog: maximum number of queued incoming connections (up to a maximum of 

300 2**16, or 65536) 

301 :param reuse_port: ``True`` to allow multiple sockets to bind to the same 

302 address/port (not supported on Windows) 

303 :return: a multi-listener object containing one or more socket listeners 

304 

305 """ 

306 asynclib = get_async_backend() 

307 backlog = min(backlog, 65536) 

308 local_host = str(local_host) if local_host is not None else None 

309 gai_res = await getaddrinfo( 

310 local_host, 

311 local_port, 

312 family=family, 

313 type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0, 

314 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, 

315 ) 

316 listeners: list[SocketListener] = [] 

317 try: 

318 # The set() is here to work around a glibc bug: 

319 # https://sourceware.org/bugzilla/show_bug.cgi?id=14969 

320 sockaddr: tuple[str, int] | tuple[str, int, int, int] 

321 for fam, kind, *_, sockaddr in sorted(set(gai_res)): 

322 # Workaround for an uvloop bug where we don't get the correct scope ID for 

323 # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to 

324 # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539 

325 if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM: 

326 continue 

327 

328 raw_socket = socket.socket(fam) 

329 raw_socket.setblocking(False) 

330 

331 # For Windows, enable exclusive address use. For others, enable address 

332 # reuse. 

333 if sys.platform == "win32": 

334 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 

335 else: 

336 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

337 

338 if reuse_port: 

339 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

340 

341 # If only IPv6 was requested, disable dual stack operation 

342 if fam == socket.AF_INET6: 

343 raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) 

344 

345 # Workaround for #554 

346 if "%" in sockaddr[0]: 

347 addr, scope_id = sockaddr[0].split("%", 1) 

348 sockaddr = (addr, sockaddr[1], 0, int(scope_id)) 

349 

350 raw_socket.bind(sockaddr) 

351 raw_socket.listen(backlog) 

352 listener = asynclib.create_tcp_listener(raw_socket) 

353 listeners.append(listener) 

354 except BaseException: 

355 for listener in listeners: 

356 await listener.aclose() 

357 

358 raise 

359 

360 return MultiListener(listeners) 

361 

362 

363async def create_unix_listener( 

364 path: str | bytes | PathLike[Any], 

365 *, 

366 mode: int | None = None, 

367 backlog: int = 65536, 

368) -> SocketListener: 

369 """ 

370 Create a UNIX socket listener. 

371 

372 Not available on Windows. 

373 

374 :param path: path of the socket 

375 :param mode: permissions to set on the socket 

376 :param backlog: maximum number of queued incoming connections (up to a maximum of 

377 2**16, or 65536) 

378 :return: a listener object 

379 

380 .. versionchanged:: 3.0 

381 If a socket already exists on the file system in the given path, it will be 

382 removed first. 

383 

384 """ 

385 backlog = min(backlog, 65536) 

386 raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM) 

387 try: 

388 raw_socket.listen(backlog) 

389 return get_async_backend().create_unix_listener(raw_socket) 

390 except BaseException: 

391 raw_socket.close() 

392 raise 

393 

394 

395async def create_udp_socket( 

396 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC, 

397 *, 

398 local_host: IPAddressType | None = None, 

399 local_port: int = 0, 

400 reuse_port: bool = False, 

401) -> UDPSocket: 

402 """ 

403 Create a UDP socket. 

404 

405 If ``port`` has been given, the socket will be bound to this port on the local 

406 machine, making this socket suitable for providing UDP based services. 

407 

408 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically 

409 determined from ``local_host`` if omitted 

410 :param local_host: IP address or host name of the local interface to bind to 

411 :param local_port: local port to bind to 

412 :param reuse_port: ``True`` to allow multiple sockets to bind to the same 

413 address/port (not supported on Windows) 

414 :return: a UDP socket 

415 

416 """ 

417 if family is AddressFamily.AF_UNSPEC and not local_host: 

418 raise ValueError('Either "family" or "local_host" must be given') 

419 

420 if local_host: 

421 gai_res = await getaddrinfo( 

422 str(local_host), 

423 local_port, 

424 family=family, 

425 type=socket.SOCK_DGRAM, 

426 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, 

427 ) 

428 family = cast(AnyIPAddressFamily, gai_res[0][0]) 

429 local_address = gai_res[0][-1] 

430 elif family is AddressFamily.AF_INET6: 

431 local_address = ("::", 0) 

432 else: 

433 local_address = ("0.0.0.0", 0) 

434 

435 sock = await get_async_backend().create_udp_socket( 

436 family, local_address, None, reuse_port 

437 ) 

438 return cast(UDPSocket, sock) 

439 

440 

441async def create_connected_udp_socket( 

442 remote_host: IPAddressType, 

443 remote_port: int, 

444 *, 

445 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC, 

446 local_host: IPAddressType | None = None, 

447 local_port: int = 0, 

448 reuse_port: bool = False, 

449) -> ConnectedUDPSocket: 

450 """ 

451 Create a connected UDP socket. 

452 

453 Connected UDP sockets can only communicate with the specified remote host/port, an 

454 any packets sent from other sources are dropped. 

455 

456 :param remote_host: remote host to set as the default target 

457 :param remote_port: port on the remote host to set as the default target 

458 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically 

459 determined from ``local_host`` or ``remote_host`` if omitted 

460 :param local_host: IP address or host name of the local interface to bind to 

461 :param local_port: local port to bind to 

462 :param reuse_port: ``True`` to allow multiple sockets to bind to the same 

463 address/port (not supported on Windows) 

464 :return: a connected UDP socket 

465 

466 """ 

467 local_address = None 

468 if local_host: 

469 gai_res = await getaddrinfo( 

470 str(local_host), 

471 local_port, 

472 family=family, 

473 type=socket.SOCK_DGRAM, 

474 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, 

475 ) 

476 family = cast(AnyIPAddressFamily, gai_res[0][0]) 

477 local_address = gai_res[0][-1] 

478 

479 gai_res = await getaddrinfo( 

480 str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM 

481 ) 

482 family = cast(AnyIPAddressFamily, gai_res[0][0]) 

483 remote_address = gai_res[0][-1] 

484 

485 sock = await get_async_backend().create_udp_socket( 

486 family, local_address, remote_address, reuse_port 

487 ) 

488 return cast(ConnectedUDPSocket, sock) 

489 

490 

491async def create_unix_datagram_socket( 

492 *, 

493 local_path: None | str | bytes | PathLike[Any] = None, 

494 local_mode: int | None = None, 

495) -> UNIXDatagramSocket: 

496 """ 

497 Create a UNIX datagram socket. 

498 

499 Not available on Windows. 

500 

501 If ``local_path`` has been given, the socket will be bound to this path, making this 

502 socket suitable for receiving datagrams from other processes. Other processes can 

503 send datagrams to this socket only if ``local_path`` is set. 

504 

505 If a socket already exists on the file system in the ``local_path``, it will be 

506 removed first. 

507 

508 :param local_path: the path on which to bind to 

509 :param local_mode: permissions to set on the local socket 

510 :return: a UNIX datagram socket 

511 

512 """ 

513 raw_socket = await setup_unix_local_socket( 

514 local_path, local_mode, socket.SOCK_DGRAM 

515 ) 

516 return await get_async_backend().create_unix_datagram_socket(raw_socket, None) 

517 

518 

519async def create_connected_unix_datagram_socket( 

520 remote_path: str | bytes | PathLike[Any], 

521 *, 

522 local_path: None | str | bytes | PathLike[Any] = None, 

523 local_mode: int | None = None, 

524) -> ConnectedUNIXDatagramSocket: 

525 """ 

526 Create a connected UNIX datagram socket. 

527 

528 Connected datagram sockets can only communicate with the specified remote path. 

529 

530 If ``local_path`` has been given, the socket will be bound to this path, making 

531 this socket suitable for receiving datagrams from other processes. Other processes 

532 can send datagrams to this socket only if ``local_path`` is set. 

533 

534 If a socket already exists on the file system in the ``local_path``, it will be 

535 removed first. 

536 

537 :param remote_path: the path to set as the default target 

538 :param local_path: the path on which to bind to 

539 :param local_mode: permissions to set on the local socket 

540 :return: a connected UNIX datagram socket 

541 

542 """ 

543 remote_path = os.fspath(remote_path) 

544 raw_socket = await setup_unix_local_socket( 

545 local_path, local_mode, socket.SOCK_DGRAM 

546 ) 

547 return await get_async_backend().create_unix_datagram_socket( 

548 raw_socket, remote_path 

549 ) 

550 

551 

552async def getaddrinfo( 

553 host: bytes | str | None, 

554 port: str | int | None, 

555 *, 

556 family: int | AddressFamily = 0, 

557 type: int | SocketKind = 0, 

558 proto: int = 0, 

559 flags: int = 0, 

560) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]: 

561 """ 

562 Look up a numeric IP address given a host name. 

563 

564 Internationalized domain names are translated according to the (non-transitional) 

565 IDNA 2008 standard. 

566 

567 .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of 

568 (host, port), unlike what :func:`socket.getaddrinfo` does. 

569 

570 :param host: host name 

571 :param port: port number 

572 :param family: socket family (`'AF_INET``, ...) 

573 :param type: socket type (``SOCK_STREAM``, ...) 

574 :param proto: protocol number 

575 :param flags: flags to pass to upstream ``getaddrinfo()`` 

576 :return: list of tuples containing (family, type, proto, canonname, sockaddr) 

577 

578 .. seealso:: :func:`socket.getaddrinfo` 

579 

580 """ 

581 # Handle unicode hostnames 

582 if isinstance(host, str): 

583 try: 

584 encoded_host: bytes | None = host.encode("ascii") 

585 except UnicodeEncodeError: 

586 import idna 

587 

588 encoded_host = idna.encode(host, uts46=True) 

589 else: 

590 encoded_host = host 

591 

592 gai_res = await get_async_backend().getaddrinfo( 

593 encoded_host, port, family=family, type=type, proto=proto, flags=flags 

594 ) 

595 return [ 

596 (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr)) 

597 for family, type, proto, canonname, sockaddr in gai_res 

598 # filter out IPv6 results when IPv6 is disabled 

599 if not isinstance(sockaddr[0], int) 

600 ] 

601 

602 

603def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]: 

604 """ 

605 Look up the host name of an IP address. 

606 

607 :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4) 

608 :param flags: flags to pass to upstream ``getnameinfo()`` 

609 :return: a tuple of (host name, service name) 

610 

611 .. seealso:: :func:`socket.getnameinfo` 

612 

613 """ 

614 return get_async_backend().getnameinfo(sockaddr, flags) 

615 

616 

617@deprecated("This function is deprecated; use `wait_readable` instead") 

618def wait_socket_readable(sock: socket.socket) -> Awaitable[None]: 

619 """ 

620 .. deprecated:: 4.7.0 

621 Use :func:`wait_readable` instead. 

622 

623 Wait until the given socket has data to be read. 

624 

625 .. warning:: Only use this on raw sockets that have not been wrapped by any higher 

626 level constructs like socket streams! 

627 

628 :param sock: a socket object 

629 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the 

630 socket to become readable 

631 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket 

632 to become readable 

633 

634 """ 

635 return get_async_backend().wait_readable(sock.fileno()) 

636 

637 

638@deprecated("This function is deprecated; use `wait_writable` instead") 

639def wait_socket_writable(sock: socket.socket) -> Awaitable[None]: 

640 """ 

641 .. deprecated:: 4.7.0 

642 Use :func:`wait_writable` instead. 

643 

644 Wait until the given socket can be written to. 

645 

646 This does **NOT** work on Windows when using the asyncio backend with a proactor 

647 event loop (default on py3.8+). 

648 

649 .. warning:: Only use this on raw sockets that have not been wrapped by any higher 

650 level constructs like socket streams! 

651 

652 :param sock: a socket object 

653 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the 

654 socket to become writable 

655 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket 

656 to become writable 

657 

658 """ 

659 return get_async_backend().wait_writable(sock.fileno()) 

660 

661 

662def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]: 

663 """ 

664 Wait until the given object has data to be read. 

665 

666 On Unix systems, ``obj`` must either be an integer file descriptor, or else an 

667 object with a ``.fileno()`` method which returns an integer file descriptor. Any 

668 kind of file descriptor can be passed, though the exact semantics will depend on 

669 your kernel. For example, this probably won't do anything useful for on-disk files. 

670 

671 On Windows systems, ``obj`` must either be an integer ``SOCKET`` handle, or else an 

672 object with a ``.fileno()`` method which returns an integer ``SOCKET`` handle. File 

673 descriptors aren't supported, and neither are handles that refer to anything besides 

674 a ``SOCKET``. 

675 

676 On backends where this functionality is not natively provided (asyncio 

677 ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread 

678 which is set to shut down when the interpreter shuts down. 

679 

680 .. warning:: Don't use this on raw sockets that have been wrapped by any higher 

681 level constructs like socket streams! 

682 

683 :param obj: an object with a ``.fileno()`` method or an integer handle 

684 :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the 

685 object to become readable 

686 :raises ~anyio.BusyResourceError: if another task is already waiting for the object 

687 to become readable 

688 

689 """ 

690 return get_async_backend().wait_readable(obj) 

691 

692 

693def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]: 

694 """ 

695 Wait until the given object can be written to. 

696 

697 :param obj: an object with a ``.fileno()`` method or an integer handle 

698 :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the 

699 object to become writable 

700 :raises ~anyio.BusyResourceError: if another task is already waiting for the object 

701 to become writable 

702 

703 .. seealso:: See the documentation of :func:`wait_readable` for the definition of 

704 ``obj`` and notes on backend compatibility. 

705 

706 .. warning:: Don't use this on raw sockets that have been wrapped by any higher 

707 level constructs like socket streams! 

708 

709 """ 

710 return get_async_backend().wait_writable(obj) 

711 

712 

713def notify_closing(obj: FileDescriptorLike) -> None: 

714 """ 

715 Call this before closing a file descriptor (on Unix) or socket (on 

716 Windows). This will cause any `wait_readable` or `wait_writable` 

717 calls on the given object to immediately wake up and raise 

718 `~anyio.ClosedResourceError`. 

719 

720 This doesn't actually close the object – you still have to do that 

721 yourself afterwards. Also, you want to be careful to make sure no 

722 new tasks start waiting on the object in between when you call this 

723 and when it's actually closed. So to close something properly, you 

724 usually want to do these steps in order: 

725 

726 1. Explicitly mark the object as closed, so that any new attempts 

727 to use it will abort before they start. 

728 2. Call `notify_closing` to wake up any already-existing users. 

729 3. Actually close the object. 

730 

731 It's also possible to do them in a different order if that's more 

732 convenient, *but only if* you make sure not to have any checkpoints in 

733 between the steps. This way they all happen in a single atomic 

734 step, so other tasks won't be able to tell what order they happened 

735 in anyway. 

736 

737 :param obj: an object with a ``.fileno()`` method or an integer handle 

738 

739 """ 

740 get_async_backend().notify_closing(obj) 

741 

742 

743# 

744# Private API 

745# 

746 

747 

748def convert_ipv6_sockaddr( 

749 sockaddr: tuple[str, int, int, int] | tuple[str, int], 

750) -> tuple[str, int]: 

751 """ 

752 Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format. 

753 

754 If the scope ID is nonzero, it is added to the address, separated with ``%``. 

755 Otherwise the flow id and scope id are simply cut off from the tuple. 

756 Any other kinds of socket addresses are returned as-is. 

757 

758 :param sockaddr: the result of :meth:`~socket.socket.getsockname` 

759 :return: the converted socket address 

760 

761 """ 

762 # This is more complicated than it should be because of MyPy 

763 if isinstance(sockaddr, tuple) and len(sockaddr) == 4: 

764 host, port, flowinfo, scope_id = sockaddr 

765 if scope_id: 

766 # PyPy (as of v7.3.11) leaves the interface name in the result, so 

767 # we discard it and only get the scope ID from the end 

768 # (https://foss.heptapod.net/pypy/pypy/-/issues/3938) 

769 host = host.split("%")[0] 

770 

771 # Add scope_id to the address 

772 return f"{host}%{scope_id}", port 

773 else: 

774 return host, port 

775 else: 

776 return sockaddr 

777 

778 

779async def setup_unix_local_socket( 

780 path: None | str | bytes | PathLike[Any], 

781 mode: int | None, 

782 socktype: int, 

783) -> socket.socket: 

784 """ 

785 Create a UNIX local socket object, deleting the socket at the given path if it 

786 exists. 

787 

788 Not available on Windows. 

789 

790 :param path: path of the socket 

791 :param mode: permissions to set on the socket 

792 :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM 

793 

794 """ 

795 path_str: str | None 

796 if path is not None: 

797 path_str = os.fsdecode(path) 

798 

799 # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call 

800 if not path_str.startswith("\0"): 

801 # Copied from pathlib... 

802 try: 

803 stat_result = os.stat(path) 

804 except OSError as e: 

805 if e.errno not in ( 

806 errno.ENOENT, 

807 errno.ENOTDIR, 

808 errno.EBADF, 

809 errno.ELOOP, 

810 ): 

811 raise 

812 else: 

813 if stat.S_ISSOCK(stat_result.st_mode): 

814 os.unlink(path) 

815 else: 

816 path_str = None 

817 

818 raw_socket = socket.socket(socket.AF_UNIX, socktype) 

819 raw_socket.setblocking(False) 

820 

821 if path_str is not None: 

822 try: 

823 await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True) 

824 if mode is not None: 

825 await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True) 

826 except BaseException: 

827 raw_socket.close() 

828 raise 

829 

830 return raw_socket 

831 

832 

833@dataclass 

834class TCPConnectable(ByteStreamConnectable): 

835 """ 

836 Connects to a TCP server at the given host and port. 

837 

838 :param host: host name or IP address of the server 

839 :param port: TCP port number of the server 

840 """ 

841 

842 host: str | IPv4Address | IPv6Address 

843 port: int 

844 

845 def __post_init__(self) -> None: 

846 if self.port < 1 or self.port > 65535: 

847 raise ValueError("TCP port number out of range") 

848 

849 @override 

850 async def connect(self) -> SocketStream: 

851 try: 

852 return await connect_tcp(self.host, self.port) 

853 except OSError as exc: 

854 raise ConnectionFailed( 

855 f"error connecting to {self.host}:{self.port}: {exc}" 

856 ) from exc 

857 

858 

859@dataclass 

860class UNIXConnectable(ByteStreamConnectable): 

861 """ 

862 Connects to a UNIX domain socket at the given path. 

863 

864 :param path: the file system path of the socket 

865 """ 

866 

867 path: str | bytes | PathLike[str] | PathLike[bytes] 

868 

869 @override 

870 async def connect(self) -> UNIXSocketStream: 

871 try: 

872 return await connect_unix(self.path) 

873 except OSError as exc: 

874 raise ConnectionFailed(f"error connecting to {self.path!r}: {exc}") from exc 

875 

876 

877def as_connectable( 

878 remote: ByteStreamConnectable 

879 | tuple[str | IPv4Address | IPv6Address, int] 

880 | str 

881 | bytes 

882 | PathLike[str], 

883 /, 

884 *, 

885 tls: bool = False, 

886 ssl_context: ssl.SSLContext | None = None, 

887 tls_hostname: str | None = None, 

888 tls_standard_compatible: bool = True, 

889) -> ByteStreamConnectable: 

890 """ 

891 Return a byte stream connectable from the given object. 

892 

893 If a bytestream connectable is given, it is returned unchanged. 

894 If a tuple of (host, port) is given, a TCP connectable is returned. 

895 If a string or bytes path is given, a UNIX connectable is returned. 

896 

897 If ``tls=True``, the connectable will be wrapped in a 

898 :class:`~.streams.tls.TLSConnectable`. 

899 

900 :param remote: a connectable, a tuple of (host, port) or a path to a UNIX socket 

901 :param tls: if ``True``, wrap the plaintext connectable in a 

902 :class:`~.streams.tls.TLSConnectable`, using the provided TLS settings) 

903 :param ssl_context: if ``tls=True``, the SSLContext object to use (if not provided, 

904 a secure default will be created) 

905 :param tls_hostname: if ``tls=True``, host name of the server to use for checking 

906 the server certificate (defaults to the host portion of the address for TCP 

907 connectables) 

908 :param tls_standard_compatible: if ``False`` and ``tls=True``, makes the TLS stream 

909 skip the closing handshake when closing the connection, so it won't raise an 

910 exception if the server does the same 

911 

912 """ 

913 connectable: TCPConnectable | UNIXConnectable | TLSConnectable 

914 if isinstance(remote, ByteStreamConnectable): 

915 return remote 

916 elif isinstance(remote, tuple) and len(remote) == 2: 

917 connectable = TCPConnectable(*remote) 

918 elif isinstance(remote, (str, bytes, PathLike)): 

919 connectable = UNIXConnectable(remote) 

920 else: 

921 raise TypeError(f"cannot convert {remote!r} to a connectable") 

922 

923 if tls: 

924 if not tls_hostname and isinstance(connectable, TCPConnectable): 

925 tls_hostname = str(connectable.host) 

926 

927 connectable = TLSConnectable( 

928 connectable, 

929 ssl_context=ssl_context, 

930 hostname=tls_hostname, 

931 standard_compatible=tls_standard_compatible, 

932 ) 

933 

934 return connectable