Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/anyio/_core/_sockets.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

225 statements  

1from __future__ import annotations 

2 

3import errno 

4import os 

5import socket 

6import ssl 

7import stat 

8import sys 

9from collections.abc import Awaitable 

10from ipaddress import IPv6Address, ip_address 

11from os import PathLike, chmod 

12from socket import AddressFamily, SocketKind 

13from typing import TYPE_CHECKING, Any, Literal, cast, overload 

14 

15from .. import to_thread 

16from ..abc import ( 

17 ConnectedUDPSocket, 

18 ConnectedUNIXDatagramSocket, 

19 IPAddressType, 

20 IPSockAddrType, 

21 SocketListener, 

22 SocketStream, 

23 UDPSocket, 

24 UNIXDatagramSocket, 

25 UNIXSocketStream, 

26) 

27from ..streams.stapled import MultiListener 

28from ..streams.tls import TLSStream 

29from ._eventloop import get_async_backend 

30from ._resources import aclose_forcefully 

31from ._synchronization import Event 

32from ._tasks import create_task_group, move_on_after 

33 

34if TYPE_CHECKING: 

35 from _typeshed import FileDescriptorLike 

36else: 

37 FileDescriptorLike = object 

38 

39if sys.version_info < (3, 11): 

40 from exceptiongroup import ExceptionGroup 

41 

42if sys.version_info < (3, 13): 

43 from typing_extensions import deprecated 

44else: 

45 from warnings import deprecated 

46 

47IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41) # https://bugs.python.org/issue29515 

48 

49AnyIPAddressFamily = Literal[ 

50 AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6 

51] 

52IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6] 

53 

54 

55# tls_hostname given 

56@overload 

57async def connect_tcp( 

58 remote_host: IPAddressType, 

59 remote_port: int, 

60 *, 

61 local_host: IPAddressType | None = ..., 

62 ssl_context: ssl.SSLContext | None = ..., 

63 tls_standard_compatible: bool = ..., 

64 tls_hostname: str, 

65 happy_eyeballs_delay: float = ..., 

66) -> TLSStream: ... 

67 

68 

69# ssl_context given 

70@overload 

71async def connect_tcp( 

72 remote_host: IPAddressType, 

73 remote_port: int, 

74 *, 

75 local_host: IPAddressType | None = ..., 

76 ssl_context: ssl.SSLContext, 

77 tls_standard_compatible: bool = ..., 

78 tls_hostname: str | None = ..., 

79 happy_eyeballs_delay: float = ..., 

80) -> TLSStream: ... 

81 

82 

83# tls=True 

84@overload 

85async def connect_tcp( 

86 remote_host: IPAddressType, 

87 remote_port: int, 

88 *, 

89 local_host: IPAddressType | None = ..., 

90 tls: Literal[True], 

91 ssl_context: ssl.SSLContext | None = ..., 

92 tls_standard_compatible: bool = ..., 

93 tls_hostname: str | None = ..., 

94 happy_eyeballs_delay: float = ..., 

95) -> TLSStream: ... 

96 

97 

98# tls=False 

99@overload 

100async def connect_tcp( 

101 remote_host: IPAddressType, 

102 remote_port: int, 

103 *, 

104 local_host: IPAddressType | None = ..., 

105 tls: Literal[False], 

106 ssl_context: ssl.SSLContext | None = ..., 

107 tls_standard_compatible: bool = ..., 

108 tls_hostname: str | None = ..., 

109 happy_eyeballs_delay: float = ..., 

110) -> SocketStream: ... 

111 

112 

113# No TLS arguments 

114@overload 

115async def connect_tcp( 

116 remote_host: IPAddressType, 

117 remote_port: int, 

118 *, 

119 local_host: IPAddressType | None = ..., 

120 happy_eyeballs_delay: float = ..., 

121) -> SocketStream: ... 

122 

123 

124async def connect_tcp( 

125 remote_host: IPAddressType, 

126 remote_port: int, 

127 *, 

128 local_host: IPAddressType | None = None, 

129 tls: bool = False, 

130 ssl_context: ssl.SSLContext | None = None, 

131 tls_standard_compatible: bool = True, 

132 tls_hostname: str | None = None, 

133 happy_eyeballs_delay: float = 0.25, 

134) -> SocketStream | TLSStream: 

135 """ 

136 Connect to a host using the TCP protocol. 

137 

138 This function implements the stateless version of the Happy Eyeballs algorithm (RFC 

139 6555). If ``remote_host`` is a host name that resolves to multiple IP addresses, 

140 each one is tried until one connection attempt succeeds. If the first attempt does 

141 not connected within 250 milliseconds, a second attempt is started using the next 

142 address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if 

143 available) is tried first. 

144 

145 When the connection has been established, a TLS handshake will be done if either 

146 ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``. 

147 

148 :param remote_host: the IP address or host name to connect to 

149 :param remote_port: port on the target host to connect to 

150 :param local_host: the interface address or name to bind the socket to before 

151 connecting 

152 :param tls: ``True`` to do a TLS handshake with the connected stream and return a 

153 :class:`~anyio.streams.tls.TLSStream` instead 

154 :param ssl_context: the SSL context object to use (if omitted, a default context is 

155 created) 

156 :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake 

157 before closing the stream and requires that the server does this as well. 

158 Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream. 

159 Some protocols, such as HTTP, require this option to be ``False``. 

160 See :meth:`~ssl.SSLContext.wrap_socket` for details. 

161 :param tls_hostname: host name to check the server certificate against (defaults to 

162 the value of ``remote_host``) 

163 :param happy_eyeballs_delay: delay (in seconds) before starting the next connection 

164 attempt 

165 :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream 

166 :raises OSError: if the connection attempt fails 

167 

168 """ 

169 # Placed here due to https://github.com/python/mypy/issues/7057 

170 connected_stream: SocketStream | None = None 

171 

172 async def try_connect(remote_host: str, event: Event) -> None: 

173 nonlocal connected_stream 

174 try: 

175 stream = await asynclib.connect_tcp(remote_host, remote_port, local_address) 

176 except OSError as exc: 

177 oserrors.append(exc) 

178 return 

179 else: 

180 if connected_stream is None: 

181 connected_stream = stream 

182 tg.cancel_scope.cancel() 

183 else: 

184 await stream.aclose() 

185 finally: 

186 event.set() 

187 

188 asynclib = get_async_backend() 

189 local_address: IPSockAddrType | None = None 

190 family = socket.AF_UNSPEC 

191 if local_host: 

192 gai_res = await getaddrinfo(str(local_host), None) 

193 family, *_, local_address = gai_res[0] 

194 

195 target_host = str(remote_host) 

196 try: 

197 addr_obj = ip_address(remote_host) 

198 except ValueError: 

199 addr_obj = None 

200 

201 if addr_obj is not None: 

202 if isinstance(addr_obj, IPv6Address): 

203 target_addrs = [(socket.AF_INET6, addr_obj.compressed)] 

204 else: 

205 target_addrs = [(socket.AF_INET, addr_obj.compressed)] 

206 else: 

207 # getaddrinfo() will raise an exception if name resolution fails 

208 gai_res = await getaddrinfo( 

209 target_host, remote_port, family=family, type=socket.SOCK_STREAM 

210 ) 

211 

212 # Organize the list so that the first address is an IPv6 address (if available) 

213 # and the second one is an IPv4 addresses. The rest can be in whatever order. 

214 v6_found = v4_found = False 

215 target_addrs = [] 

216 for af, *rest, sa in gai_res: 

217 if af == socket.AF_INET6 and not v6_found: 

218 v6_found = True 

219 target_addrs.insert(0, (af, sa[0])) 

220 elif af == socket.AF_INET and not v4_found and v6_found: 

221 v4_found = True 

222 target_addrs.insert(1, (af, sa[0])) 

223 else: 

224 target_addrs.append((af, sa[0])) 

225 

226 oserrors: list[OSError] = [] 

227 try: 

228 async with create_task_group() as tg: 

229 for i, (af, addr) in enumerate(target_addrs): 

230 event = Event() 

231 tg.start_soon(try_connect, addr, event) 

232 with move_on_after(happy_eyeballs_delay): 

233 await event.wait() 

234 

235 if connected_stream is None: 

236 cause = ( 

237 oserrors[0] 

238 if len(oserrors) == 1 

239 else ExceptionGroup("multiple connection attempts failed", oserrors) 

240 ) 

241 raise OSError("All connection attempts failed") from cause 

242 finally: 

243 oserrors.clear() 

244 

245 if tls or tls_hostname or ssl_context: 

246 try: 

247 return await TLSStream.wrap( 

248 connected_stream, 

249 server_side=False, 

250 hostname=tls_hostname or str(remote_host), 

251 ssl_context=ssl_context, 

252 standard_compatible=tls_standard_compatible, 

253 ) 

254 except BaseException: 

255 await aclose_forcefully(connected_stream) 

256 raise 

257 

258 return connected_stream 

259 

260 

261async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream: 

262 """ 

263 Connect to the given UNIX socket. 

264 

265 Not available on Windows. 

266 

267 :param path: path to the socket 

268 :return: a socket stream object 

269 

270 """ 

271 path = os.fspath(path) 

272 return await get_async_backend().connect_unix(path) 

273 

274 

275async def create_tcp_listener( 

276 *, 

277 local_host: IPAddressType | None = None, 

278 local_port: int = 0, 

279 family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC, 

280 backlog: int = 65536, 

281 reuse_port: bool = False, 

282) -> MultiListener[SocketStream]: 

283 """ 

284 Create a TCP socket listener. 

285 

286 :param local_port: port number to listen on 

287 :param local_host: IP address of the interface to listen on. If omitted, listen on 

288 all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address 

289 family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6. 

290 :param family: address family (used if ``local_host`` was omitted) 

291 :param backlog: maximum number of queued incoming connections (up to a maximum of 

292 2**16, or 65536) 

293 :param reuse_port: ``True`` to allow multiple sockets to bind to the same 

294 address/port (not supported on Windows) 

295 :return: a list of listener objects 

296 

297 """ 

298 asynclib = get_async_backend() 

299 backlog = min(backlog, 65536) 

300 local_host = str(local_host) if local_host is not None else None 

301 gai_res = await getaddrinfo( 

302 local_host, 

303 local_port, 

304 family=family, 

305 type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0, 

306 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, 

307 ) 

308 listeners: list[SocketListener] = [] 

309 try: 

310 # The set() is here to work around a glibc bug: 

311 # https://sourceware.org/bugzilla/show_bug.cgi?id=14969 

312 sockaddr: tuple[str, int] | tuple[str, int, int, int] 

313 for fam, kind, *_, sockaddr in sorted(set(gai_res)): 

314 # Workaround for an uvloop bug where we don't get the correct scope ID for 

315 # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to 

316 # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539 

317 if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM: 

318 continue 

319 

320 raw_socket = socket.socket(fam) 

321 raw_socket.setblocking(False) 

322 

323 # For Windows, enable exclusive address use. For others, enable address 

324 # reuse. 

325 if sys.platform == "win32": 

326 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1) 

327 else: 

328 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

329 

330 if reuse_port: 

331 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1) 

332 

333 # If only IPv6 was requested, disable dual stack operation 

334 if fam == socket.AF_INET6: 

335 raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1) 

336 

337 # Workaround for #554 

338 if "%" in sockaddr[0]: 

339 addr, scope_id = sockaddr[0].split("%", 1) 

340 sockaddr = (addr, sockaddr[1], 0, int(scope_id)) 

341 

342 raw_socket.bind(sockaddr) 

343 raw_socket.listen(backlog) 

344 listener = asynclib.create_tcp_listener(raw_socket) 

345 listeners.append(listener) 

346 except BaseException: 

347 for listener in listeners: 

348 await listener.aclose() 

349 

350 raise 

351 

352 return MultiListener(listeners) 

353 

354 

355async def create_unix_listener( 

356 path: str | bytes | PathLike[Any], 

357 *, 

358 mode: int | None = None, 

359 backlog: int = 65536, 

360) -> SocketListener: 

361 """ 

362 Create a UNIX socket listener. 

363 

364 Not available on Windows. 

365 

366 :param path: path of the socket 

367 :param mode: permissions to set on the socket 

368 :param backlog: maximum number of queued incoming connections (up to a maximum of 

369 2**16, or 65536) 

370 :return: a listener object 

371 

372 .. versionchanged:: 3.0 

373 If a socket already exists on the file system in the given path, it will be 

374 removed first. 

375 

376 """ 

377 backlog = min(backlog, 65536) 

378 raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM) 

379 try: 

380 raw_socket.listen(backlog) 

381 return get_async_backend().create_unix_listener(raw_socket) 

382 except BaseException: 

383 raw_socket.close() 

384 raise 

385 

386 

387async def create_udp_socket( 

388 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC, 

389 *, 

390 local_host: IPAddressType | None = None, 

391 local_port: int = 0, 

392 reuse_port: bool = False, 

393) -> UDPSocket: 

394 """ 

395 Create a UDP socket. 

396 

397 If ``port`` has been given, the socket will be bound to this port on the local 

398 machine, making this socket suitable for providing UDP based services. 

399 

400 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically 

401 determined from ``local_host`` if omitted 

402 :param local_host: IP address or host name of the local interface to bind to 

403 :param local_port: local port to bind to 

404 :param reuse_port: ``True`` to allow multiple sockets to bind to the same 

405 address/port (not supported on Windows) 

406 :return: a UDP socket 

407 

408 """ 

409 if family is AddressFamily.AF_UNSPEC and not local_host: 

410 raise ValueError('Either "family" or "local_host" must be given') 

411 

412 if local_host: 

413 gai_res = await getaddrinfo( 

414 str(local_host), 

415 local_port, 

416 family=family, 

417 type=socket.SOCK_DGRAM, 

418 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, 

419 ) 

420 family = cast(AnyIPAddressFamily, gai_res[0][0]) 

421 local_address = gai_res[0][-1] 

422 elif family is AddressFamily.AF_INET6: 

423 local_address = ("::", 0) 

424 else: 

425 local_address = ("0.0.0.0", 0) 

426 

427 sock = await get_async_backend().create_udp_socket( 

428 family, local_address, None, reuse_port 

429 ) 

430 return cast(UDPSocket, sock) 

431 

432 

433async def create_connected_udp_socket( 

434 remote_host: IPAddressType, 

435 remote_port: int, 

436 *, 

437 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC, 

438 local_host: IPAddressType | None = None, 

439 local_port: int = 0, 

440 reuse_port: bool = False, 

441) -> ConnectedUDPSocket: 

442 """ 

443 Create a connected UDP socket. 

444 

445 Connected UDP sockets can only communicate with the specified remote host/port, an 

446 any packets sent from other sources are dropped. 

447 

448 :param remote_host: remote host to set as the default target 

449 :param remote_port: port on the remote host to set as the default target 

450 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically 

451 determined from ``local_host`` or ``remote_host`` if omitted 

452 :param local_host: IP address or host name of the local interface to bind to 

453 :param local_port: local port to bind to 

454 :param reuse_port: ``True`` to allow multiple sockets to bind to the same 

455 address/port (not supported on Windows) 

456 :return: a connected UDP socket 

457 

458 """ 

459 local_address = None 

460 if local_host: 

461 gai_res = await getaddrinfo( 

462 str(local_host), 

463 local_port, 

464 family=family, 

465 type=socket.SOCK_DGRAM, 

466 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG, 

467 ) 

468 family = cast(AnyIPAddressFamily, gai_res[0][0]) 

469 local_address = gai_res[0][-1] 

470 

471 gai_res = await getaddrinfo( 

472 str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM 

473 ) 

474 family = cast(AnyIPAddressFamily, gai_res[0][0]) 

475 remote_address = gai_res[0][-1] 

476 

477 sock = await get_async_backend().create_udp_socket( 

478 family, local_address, remote_address, reuse_port 

479 ) 

480 return cast(ConnectedUDPSocket, sock) 

481 

482 

483async def create_unix_datagram_socket( 

484 *, 

485 local_path: None | str | bytes | PathLike[Any] = None, 

486 local_mode: int | None = None, 

487) -> UNIXDatagramSocket: 

488 """ 

489 Create a UNIX datagram socket. 

490 

491 Not available on Windows. 

492 

493 If ``local_path`` has been given, the socket will be bound to this path, making this 

494 socket suitable for receiving datagrams from other processes. Other processes can 

495 send datagrams to this socket only if ``local_path`` is set. 

496 

497 If a socket already exists on the file system in the ``local_path``, it will be 

498 removed first. 

499 

500 :param local_path: the path on which to bind to 

501 :param local_mode: permissions to set on the local socket 

502 :return: a UNIX datagram socket 

503 

504 """ 

505 raw_socket = await setup_unix_local_socket( 

506 local_path, local_mode, socket.SOCK_DGRAM 

507 ) 

508 return await get_async_backend().create_unix_datagram_socket(raw_socket, None) 

509 

510 

511async def create_connected_unix_datagram_socket( 

512 remote_path: str | bytes | PathLike[Any], 

513 *, 

514 local_path: None | str | bytes | PathLike[Any] = None, 

515 local_mode: int | None = None, 

516) -> ConnectedUNIXDatagramSocket: 

517 """ 

518 Create a connected UNIX datagram socket. 

519 

520 Connected datagram sockets can only communicate with the specified remote path. 

521 

522 If ``local_path`` has been given, the socket will be bound to this path, making 

523 this socket suitable for receiving datagrams from other processes. Other processes 

524 can send datagrams to this socket only if ``local_path`` is set. 

525 

526 If a socket already exists on the file system in the ``local_path``, it will be 

527 removed first. 

528 

529 :param remote_path: the path to set as the default target 

530 :param local_path: the path on which to bind to 

531 :param local_mode: permissions to set on the local socket 

532 :return: a connected UNIX datagram socket 

533 

534 """ 

535 remote_path = os.fspath(remote_path) 

536 raw_socket = await setup_unix_local_socket( 

537 local_path, local_mode, socket.SOCK_DGRAM 

538 ) 

539 return await get_async_backend().create_unix_datagram_socket( 

540 raw_socket, remote_path 

541 ) 

542 

543 

544async def getaddrinfo( 

545 host: bytes | str | None, 

546 port: str | int | None, 

547 *, 

548 family: int | AddressFamily = 0, 

549 type: int | SocketKind = 0, 

550 proto: int = 0, 

551 flags: int = 0, 

552) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]: 

553 """ 

554 Look up a numeric IP address given a host name. 

555 

556 Internationalized domain names are translated according to the (non-transitional) 

557 IDNA 2008 standard. 

558 

559 .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of 

560 (host, port), unlike what :func:`socket.getaddrinfo` does. 

561 

562 :param host: host name 

563 :param port: port number 

564 :param family: socket family (`'AF_INET``, ...) 

565 :param type: socket type (``SOCK_STREAM``, ...) 

566 :param proto: protocol number 

567 :param flags: flags to pass to upstream ``getaddrinfo()`` 

568 :return: list of tuples containing (family, type, proto, canonname, sockaddr) 

569 

570 .. seealso:: :func:`socket.getaddrinfo` 

571 

572 """ 

573 # Handle unicode hostnames 

574 if isinstance(host, str): 

575 try: 

576 encoded_host: bytes | None = host.encode("ascii") 

577 except UnicodeEncodeError: 

578 import idna 

579 

580 encoded_host = idna.encode(host, uts46=True) 

581 else: 

582 encoded_host = host 

583 

584 gai_res = await get_async_backend().getaddrinfo( 

585 encoded_host, port, family=family, type=type, proto=proto, flags=flags 

586 ) 

587 return [ 

588 (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr)) 

589 for family, type, proto, canonname, sockaddr in gai_res 

590 # filter out IPv6 results when IPv6 is disabled 

591 if not isinstance(sockaddr[0], int) 

592 ] 

593 

594 

595def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]: 

596 """ 

597 Look up the host name of an IP address. 

598 

599 :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4) 

600 :param flags: flags to pass to upstream ``getnameinfo()`` 

601 :return: a tuple of (host name, service name) 

602 

603 .. seealso:: :func:`socket.getnameinfo` 

604 

605 """ 

606 return get_async_backend().getnameinfo(sockaddr, flags) 

607 

608 

609@deprecated("This function is deprecated; use `wait_readable` instead") 

610def wait_socket_readable(sock: socket.socket) -> Awaitable[None]: 

611 """ 

612 .. deprecated:: 4.7.0 

613 Use :func:`wait_readable` instead. 

614 

615 Wait until the given socket has data to be read. 

616 

617 .. warning:: Only use this on raw sockets that have not been wrapped by any higher 

618 level constructs like socket streams! 

619 

620 :param sock: a socket object 

621 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the 

622 socket to become readable 

623 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket 

624 to become readable 

625 

626 """ 

627 return get_async_backend().wait_readable(sock.fileno()) 

628 

629 

630@deprecated("This function is deprecated; use `wait_writable` instead") 

631def wait_socket_writable(sock: socket.socket) -> Awaitable[None]: 

632 """ 

633 .. deprecated:: 4.7.0 

634 Use :func:`wait_writable` instead. 

635 

636 Wait until the given socket can be written to. 

637 

638 This does **NOT** work on Windows when using the asyncio backend with a proactor 

639 event loop (default on py3.8+). 

640 

641 .. warning:: Only use this on raw sockets that have not been wrapped by any higher 

642 level constructs like socket streams! 

643 

644 :param sock: a socket object 

645 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the 

646 socket to become writable 

647 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket 

648 to become writable 

649 

650 """ 

651 return get_async_backend().wait_writable(sock.fileno()) 

652 

653 

654def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]: 

655 """ 

656 Wait until the given object has data to be read. 

657 

658 On Unix systems, ``obj`` must either be an integer file descriptor, or else an 

659 object with a ``.fileno()`` method which returns an integer file descriptor. Any 

660 kind of file descriptor can be passed, though the exact semantics will depend on 

661 your kernel. For example, this probably won't do anything useful for on-disk files. 

662 

663 On Windows systems, ``obj`` must either be an integer ``SOCKET`` handle, or else an 

664 object with a ``.fileno()`` method which returns an integer ``SOCKET`` handle. File 

665 descriptors aren't supported, and neither are handles that refer to anything besides 

666 a ``SOCKET``. 

667 

668 On backends where this functionality is not natively provided (asyncio 

669 ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread 

670 which is set to shut down when the interpreter shuts down. 

671 

672 .. warning:: Don't use this on raw sockets that have been wrapped by any higher 

673 level constructs like socket streams! 

674 

675 :param obj: an object with a ``.fileno()`` method or an integer handle 

676 :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the 

677 object to become readable 

678 :raises ~anyio.BusyResourceError: if another task is already waiting for the object 

679 to become readable 

680 

681 """ 

682 return get_async_backend().wait_readable(obj) 

683 

684 

685def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]: 

686 """ 

687 Wait until the given object can be written to. 

688 

689 :param obj: an object with a ``.fileno()`` method or an integer handle 

690 :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the 

691 object to become writable 

692 :raises ~anyio.BusyResourceError: if another task is already waiting for the object 

693 to become writable 

694 

695 .. seealso:: See the documentation of :func:`wait_readable` for the definition of 

696 ``obj`` and notes on backend compatibility. 

697 

698 .. warning:: Don't use this on raw sockets that have been wrapped by any higher 

699 level constructs like socket streams! 

700 

701 """ 

702 return get_async_backend().wait_writable(obj) 

703 

704 

705# 

706# Private API 

707# 

708 

709 

710def convert_ipv6_sockaddr( 

711 sockaddr: tuple[str, int, int, int] | tuple[str, int], 

712) -> tuple[str, int]: 

713 """ 

714 Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format. 

715 

716 If the scope ID is nonzero, it is added to the address, separated with ``%``. 

717 Otherwise the flow id and scope id are simply cut off from the tuple. 

718 Any other kinds of socket addresses are returned as-is. 

719 

720 :param sockaddr: the result of :meth:`~socket.socket.getsockname` 

721 :return: the converted socket address 

722 

723 """ 

724 # This is more complicated than it should be because of MyPy 

725 if isinstance(sockaddr, tuple) and len(sockaddr) == 4: 

726 host, port, flowinfo, scope_id = sockaddr 

727 if scope_id: 

728 # PyPy (as of v7.3.11) leaves the interface name in the result, so 

729 # we discard it and only get the scope ID from the end 

730 # (https://foss.heptapod.net/pypy/pypy/-/issues/3938) 

731 host = host.split("%")[0] 

732 

733 # Add scope_id to the address 

734 return f"{host}%{scope_id}", port 

735 else: 

736 return host, port 

737 else: 

738 return sockaddr 

739 

740 

741async def setup_unix_local_socket( 

742 path: None | str | bytes | PathLike[Any], 

743 mode: int | None, 

744 socktype: int, 

745) -> socket.socket: 

746 """ 

747 Create a UNIX local socket object, deleting the socket at the given path if it 

748 exists. 

749 

750 Not available on Windows. 

751 

752 :param path: path of the socket 

753 :param mode: permissions to set on the socket 

754 :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM 

755 

756 """ 

757 path_str: str | None 

758 if path is not None: 

759 path_str = os.fsdecode(path) 

760 

761 # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call 

762 if not path_str.startswith("\0"): 

763 # Copied from pathlib... 

764 try: 

765 stat_result = os.stat(path) 

766 except OSError as e: 

767 if e.errno not in ( 

768 errno.ENOENT, 

769 errno.ENOTDIR, 

770 errno.EBADF, 

771 errno.ELOOP, 

772 ): 

773 raise 

774 else: 

775 if stat.S_ISSOCK(stat_result.st_mode): 

776 os.unlink(path) 

777 else: 

778 path_str = None 

779 

780 raw_socket = socket.socket(socket.AF_UNIX, socktype) 

781 raw_socket.setblocking(False) 

782 

783 if path_str is not None: 

784 try: 

785 await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True) 

786 if mode is not None: 

787 await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True) 

788 except BaseException: 

789 raw_socket.close() 

790 raise 

791 

792 return raw_socket