1from __future__ import annotations
2
3import errno
4import os
5import socket
6import ssl
7import stat
8import sys
9from collections.abc import Awaitable
10from ipaddress import IPv6Address, ip_address
11from os import PathLike, chmod
12from socket import AddressFamily, SocketKind
13from typing import TYPE_CHECKING, Any, Literal, cast, overload
14
15from .. import to_thread
16from ..abc import (
17 ConnectedUDPSocket,
18 ConnectedUNIXDatagramSocket,
19 IPAddressType,
20 IPSockAddrType,
21 SocketListener,
22 SocketStream,
23 UDPSocket,
24 UNIXDatagramSocket,
25 UNIXSocketStream,
26)
27from ..streams.stapled import MultiListener
28from ..streams.tls import TLSStream
29from ._eventloop import get_async_backend
30from ._resources import aclose_forcefully
31from ._synchronization import Event
32from ._tasks import create_task_group, move_on_after
33
34if TYPE_CHECKING:
35 from _typeshed import FileDescriptorLike
36else:
37 FileDescriptorLike = object
38
39if sys.version_info < (3, 11):
40 from exceptiongroup import ExceptionGroup
41
42if sys.version_info < (3, 13):
43 from typing_extensions import deprecated
44else:
45 from warnings import deprecated
46
47IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41) # https://bugs.python.org/issue29515
48
49AnyIPAddressFamily = Literal[
50 AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6
51]
52IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6]
53
54
55# tls_hostname given
56@overload
57async def connect_tcp(
58 remote_host: IPAddressType,
59 remote_port: int,
60 *,
61 local_host: IPAddressType | None = ...,
62 ssl_context: ssl.SSLContext | None = ...,
63 tls_standard_compatible: bool = ...,
64 tls_hostname: str,
65 happy_eyeballs_delay: float = ...,
66) -> TLSStream: ...
67
68
69# ssl_context given
70@overload
71async def connect_tcp(
72 remote_host: IPAddressType,
73 remote_port: int,
74 *,
75 local_host: IPAddressType | None = ...,
76 ssl_context: ssl.SSLContext,
77 tls_standard_compatible: bool = ...,
78 tls_hostname: str | None = ...,
79 happy_eyeballs_delay: float = ...,
80) -> TLSStream: ...
81
82
83# tls=True
84@overload
85async def connect_tcp(
86 remote_host: IPAddressType,
87 remote_port: int,
88 *,
89 local_host: IPAddressType | None = ...,
90 tls: Literal[True],
91 ssl_context: ssl.SSLContext | None = ...,
92 tls_standard_compatible: bool = ...,
93 tls_hostname: str | None = ...,
94 happy_eyeballs_delay: float = ...,
95) -> TLSStream: ...
96
97
98# tls=False
99@overload
100async def connect_tcp(
101 remote_host: IPAddressType,
102 remote_port: int,
103 *,
104 local_host: IPAddressType | None = ...,
105 tls: Literal[False],
106 ssl_context: ssl.SSLContext | None = ...,
107 tls_standard_compatible: bool = ...,
108 tls_hostname: str | None = ...,
109 happy_eyeballs_delay: float = ...,
110) -> SocketStream: ...
111
112
113# No TLS arguments
114@overload
115async def connect_tcp(
116 remote_host: IPAddressType,
117 remote_port: int,
118 *,
119 local_host: IPAddressType | None = ...,
120 happy_eyeballs_delay: float = ...,
121) -> SocketStream: ...
122
123
124async def connect_tcp(
125 remote_host: IPAddressType,
126 remote_port: int,
127 *,
128 local_host: IPAddressType | None = None,
129 tls: bool = False,
130 ssl_context: ssl.SSLContext | None = None,
131 tls_standard_compatible: bool = True,
132 tls_hostname: str | None = None,
133 happy_eyeballs_delay: float = 0.25,
134) -> SocketStream | TLSStream:
135 """
136 Connect to a host using the TCP protocol.
137
138 This function implements the stateless version of the Happy Eyeballs algorithm (RFC
139 6555). If ``remote_host`` is a host name that resolves to multiple IP addresses,
140 each one is tried until one connection attempt succeeds. If the first attempt does
141 not connected within 250 milliseconds, a second attempt is started using the next
142 address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if
143 available) is tried first.
144
145 When the connection has been established, a TLS handshake will be done if either
146 ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.
147
148 :param remote_host: the IP address or host name to connect to
149 :param remote_port: port on the target host to connect to
150 :param local_host: the interface address or name to bind the socket to before
151 connecting
152 :param tls: ``True`` to do a TLS handshake with the connected stream and return a
153 :class:`~anyio.streams.tls.TLSStream` instead
154 :param ssl_context: the SSL context object to use (if omitted, a default context is
155 created)
156 :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake
157 before closing the stream and requires that the server does this as well.
158 Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
159 Some protocols, such as HTTP, require this option to be ``False``.
160 See :meth:`~ssl.SSLContext.wrap_socket` for details.
161 :param tls_hostname: host name to check the server certificate against (defaults to
162 the value of ``remote_host``)
163 :param happy_eyeballs_delay: delay (in seconds) before starting the next connection
164 attempt
165 :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
166 :raises OSError: if the connection attempt fails
167
168 """
169 # Placed here due to https://github.com/python/mypy/issues/7057
170 connected_stream: SocketStream | None = None
171
172 async def try_connect(remote_host: str, event: Event) -> None:
173 nonlocal connected_stream
174 try:
175 stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
176 except OSError as exc:
177 oserrors.append(exc)
178 return
179 else:
180 if connected_stream is None:
181 connected_stream = stream
182 tg.cancel_scope.cancel()
183 else:
184 await stream.aclose()
185 finally:
186 event.set()
187
188 asynclib = get_async_backend()
189 local_address: IPSockAddrType | None = None
190 family = socket.AF_UNSPEC
191 if local_host:
192 gai_res = await getaddrinfo(str(local_host), None)
193 family, *_, local_address = gai_res[0]
194
195 target_host = str(remote_host)
196 try:
197 addr_obj = ip_address(remote_host)
198 except ValueError:
199 addr_obj = None
200
201 if addr_obj is not None:
202 if isinstance(addr_obj, IPv6Address):
203 target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
204 else:
205 target_addrs = [(socket.AF_INET, addr_obj.compressed)]
206 else:
207 # getaddrinfo() will raise an exception if name resolution fails
208 gai_res = await getaddrinfo(
209 target_host, remote_port, family=family, type=socket.SOCK_STREAM
210 )
211
212 # Organize the list so that the first address is an IPv6 address (if available)
213 # and the second one is an IPv4 addresses. The rest can be in whatever order.
214 v6_found = v4_found = False
215 target_addrs = []
216 for af, *rest, sa in gai_res:
217 if af == socket.AF_INET6 and not v6_found:
218 v6_found = True
219 target_addrs.insert(0, (af, sa[0]))
220 elif af == socket.AF_INET and not v4_found and v6_found:
221 v4_found = True
222 target_addrs.insert(1, (af, sa[0]))
223 else:
224 target_addrs.append((af, sa[0]))
225
226 oserrors: list[OSError] = []
227 try:
228 async with create_task_group() as tg:
229 for i, (af, addr) in enumerate(target_addrs):
230 event = Event()
231 tg.start_soon(try_connect, addr, event)
232 with move_on_after(happy_eyeballs_delay):
233 await event.wait()
234
235 if connected_stream is None:
236 cause = (
237 oserrors[0]
238 if len(oserrors) == 1
239 else ExceptionGroup("multiple connection attempts failed", oserrors)
240 )
241 raise OSError("All connection attempts failed") from cause
242 finally:
243 oserrors.clear()
244
245 if tls or tls_hostname or ssl_context:
246 try:
247 return await TLSStream.wrap(
248 connected_stream,
249 server_side=False,
250 hostname=tls_hostname or str(remote_host),
251 ssl_context=ssl_context,
252 standard_compatible=tls_standard_compatible,
253 )
254 except BaseException:
255 await aclose_forcefully(connected_stream)
256 raise
257
258 return connected_stream
259
260
261async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream:
262 """
263 Connect to the given UNIX socket.
264
265 Not available on Windows.
266
267 :param path: path to the socket
268 :return: a socket stream object
269
270 """
271 path = os.fspath(path)
272 return await get_async_backend().connect_unix(path)
273
274
275async def create_tcp_listener(
276 *,
277 local_host: IPAddressType | None = None,
278 local_port: int = 0,
279 family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC,
280 backlog: int = 65536,
281 reuse_port: bool = False,
282) -> MultiListener[SocketStream]:
283 """
284 Create a TCP socket listener.
285
286 :param local_port: port number to listen on
287 :param local_host: IP address of the interface to listen on. If omitted, listen on
288 all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address
289 family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6.
290 :param family: address family (used if ``local_host`` was omitted)
291 :param backlog: maximum number of queued incoming connections (up to a maximum of
292 2**16, or 65536)
293 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
294 address/port (not supported on Windows)
295 :return: a list of listener objects
296
297 """
298 asynclib = get_async_backend()
299 backlog = min(backlog, 65536)
300 local_host = str(local_host) if local_host is not None else None
301 gai_res = await getaddrinfo(
302 local_host,
303 local_port,
304 family=family,
305 type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0,
306 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
307 )
308 listeners: list[SocketListener] = []
309 try:
310 # The set() is here to work around a glibc bug:
311 # https://sourceware.org/bugzilla/show_bug.cgi?id=14969
312 sockaddr: tuple[str, int] | tuple[str, int, int, int]
313 for fam, kind, *_, sockaddr in sorted(set(gai_res)):
314 # Workaround for an uvloop bug where we don't get the correct scope ID for
315 # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to
316 # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539
317 if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM:
318 continue
319
320 raw_socket = socket.socket(fam)
321 raw_socket.setblocking(False)
322
323 # For Windows, enable exclusive address use. For others, enable address
324 # reuse.
325 if sys.platform == "win32":
326 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
327 else:
328 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
329
330 if reuse_port:
331 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
332
333 # If only IPv6 was requested, disable dual stack operation
334 if fam == socket.AF_INET6:
335 raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
336
337 # Workaround for #554
338 if "%" in sockaddr[0]:
339 addr, scope_id = sockaddr[0].split("%", 1)
340 sockaddr = (addr, sockaddr[1], 0, int(scope_id))
341
342 raw_socket.bind(sockaddr)
343 raw_socket.listen(backlog)
344 listener = asynclib.create_tcp_listener(raw_socket)
345 listeners.append(listener)
346 except BaseException:
347 for listener in listeners:
348 await listener.aclose()
349
350 raise
351
352 return MultiListener(listeners)
353
354
355async def create_unix_listener(
356 path: str | bytes | PathLike[Any],
357 *,
358 mode: int | None = None,
359 backlog: int = 65536,
360) -> SocketListener:
361 """
362 Create a UNIX socket listener.
363
364 Not available on Windows.
365
366 :param path: path of the socket
367 :param mode: permissions to set on the socket
368 :param backlog: maximum number of queued incoming connections (up to a maximum of
369 2**16, or 65536)
370 :return: a listener object
371
372 .. versionchanged:: 3.0
373 If a socket already exists on the file system in the given path, it will be
374 removed first.
375
376 """
377 backlog = min(backlog, 65536)
378 raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM)
379 try:
380 raw_socket.listen(backlog)
381 return get_async_backend().create_unix_listener(raw_socket)
382 except BaseException:
383 raw_socket.close()
384 raise
385
386
387async def create_udp_socket(
388 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
389 *,
390 local_host: IPAddressType | None = None,
391 local_port: int = 0,
392 reuse_port: bool = False,
393) -> UDPSocket:
394 """
395 Create a UDP socket.
396
397 If ``port`` has been given, the socket will be bound to this port on the local
398 machine, making this socket suitable for providing UDP based services.
399
400 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
401 determined from ``local_host`` if omitted
402 :param local_host: IP address or host name of the local interface to bind to
403 :param local_port: local port to bind to
404 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
405 address/port (not supported on Windows)
406 :return: a UDP socket
407
408 """
409 if family is AddressFamily.AF_UNSPEC and not local_host:
410 raise ValueError('Either "family" or "local_host" must be given')
411
412 if local_host:
413 gai_res = await getaddrinfo(
414 str(local_host),
415 local_port,
416 family=family,
417 type=socket.SOCK_DGRAM,
418 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
419 )
420 family = cast(AnyIPAddressFamily, gai_res[0][0])
421 local_address = gai_res[0][-1]
422 elif family is AddressFamily.AF_INET6:
423 local_address = ("::", 0)
424 else:
425 local_address = ("0.0.0.0", 0)
426
427 sock = await get_async_backend().create_udp_socket(
428 family, local_address, None, reuse_port
429 )
430 return cast(UDPSocket, sock)
431
432
433async def create_connected_udp_socket(
434 remote_host: IPAddressType,
435 remote_port: int,
436 *,
437 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
438 local_host: IPAddressType | None = None,
439 local_port: int = 0,
440 reuse_port: bool = False,
441) -> ConnectedUDPSocket:
442 """
443 Create a connected UDP socket.
444
445 Connected UDP sockets can only communicate with the specified remote host/port, an
446 any packets sent from other sources are dropped.
447
448 :param remote_host: remote host to set as the default target
449 :param remote_port: port on the remote host to set as the default target
450 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
451 determined from ``local_host`` or ``remote_host`` if omitted
452 :param local_host: IP address or host name of the local interface to bind to
453 :param local_port: local port to bind to
454 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
455 address/port (not supported on Windows)
456 :return: a connected UDP socket
457
458 """
459 local_address = None
460 if local_host:
461 gai_res = await getaddrinfo(
462 str(local_host),
463 local_port,
464 family=family,
465 type=socket.SOCK_DGRAM,
466 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
467 )
468 family = cast(AnyIPAddressFamily, gai_res[0][0])
469 local_address = gai_res[0][-1]
470
471 gai_res = await getaddrinfo(
472 str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM
473 )
474 family = cast(AnyIPAddressFamily, gai_res[0][0])
475 remote_address = gai_res[0][-1]
476
477 sock = await get_async_backend().create_udp_socket(
478 family, local_address, remote_address, reuse_port
479 )
480 return cast(ConnectedUDPSocket, sock)
481
482
483async def create_unix_datagram_socket(
484 *,
485 local_path: None | str | bytes | PathLike[Any] = None,
486 local_mode: int | None = None,
487) -> UNIXDatagramSocket:
488 """
489 Create a UNIX datagram socket.
490
491 Not available on Windows.
492
493 If ``local_path`` has been given, the socket will be bound to this path, making this
494 socket suitable for receiving datagrams from other processes. Other processes can
495 send datagrams to this socket only if ``local_path`` is set.
496
497 If a socket already exists on the file system in the ``local_path``, it will be
498 removed first.
499
500 :param local_path: the path on which to bind to
501 :param local_mode: permissions to set on the local socket
502 :return: a UNIX datagram socket
503
504 """
505 raw_socket = await setup_unix_local_socket(
506 local_path, local_mode, socket.SOCK_DGRAM
507 )
508 return await get_async_backend().create_unix_datagram_socket(raw_socket, None)
509
510
511async def create_connected_unix_datagram_socket(
512 remote_path: str | bytes | PathLike[Any],
513 *,
514 local_path: None | str | bytes | PathLike[Any] = None,
515 local_mode: int | None = None,
516) -> ConnectedUNIXDatagramSocket:
517 """
518 Create a connected UNIX datagram socket.
519
520 Connected datagram sockets can only communicate with the specified remote path.
521
522 If ``local_path`` has been given, the socket will be bound to this path, making
523 this socket suitable for receiving datagrams from other processes. Other processes
524 can send datagrams to this socket only if ``local_path`` is set.
525
526 If a socket already exists on the file system in the ``local_path``, it will be
527 removed first.
528
529 :param remote_path: the path to set as the default target
530 :param local_path: the path on which to bind to
531 :param local_mode: permissions to set on the local socket
532 :return: a connected UNIX datagram socket
533
534 """
535 remote_path = os.fspath(remote_path)
536 raw_socket = await setup_unix_local_socket(
537 local_path, local_mode, socket.SOCK_DGRAM
538 )
539 return await get_async_backend().create_unix_datagram_socket(
540 raw_socket, remote_path
541 )
542
543
544async def getaddrinfo(
545 host: bytes | str | None,
546 port: str | int | None,
547 *,
548 family: int | AddressFamily = 0,
549 type: int | SocketKind = 0,
550 proto: int = 0,
551 flags: int = 0,
552) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]:
553 """
554 Look up a numeric IP address given a host name.
555
556 Internationalized domain names are translated according to the (non-transitional)
557 IDNA 2008 standard.
558
559 .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of
560 (host, port), unlike what :func:`socket.getaddrinfo` does.
561
562 :param host: host name
563 :param port: port number
564 :param family: socket family (`'AF_INET``, ...)
565 :param type: socket type (``SOCK_STREAM``, ...)
566 :param proto: protocol number
567 :param flags: flags to pass to upstream ``getaddrinfo()``
568 :return: list of tuples containing (family, type, proto, canonname, sockaddr)
569
570 .. seealso:: :func:`socket.getaddrinfo`
571
572 """
573 # Handle unicode hostnames
574 if isinstance(host, str):
575 try:
576 encoded_host: bytes | None = host.encode("ascii")
577 except UnicodeEncodeError:
578 import idna
579
580 encoded_host = idna.encode(host, uts46=True)
581 else:
582 encoded_host = host
583
584 gai_res = await get_async_backend().getaddrinfo(
585 encoded_host, port, family=family, type=type, proto=proto, flags=flags
586 )
587 return [
588 (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr))
589 for family, type, proto, canonname, sockaddr in gai_res
590 # filter out IPv6 results when IPv6 is disabled
591 if not isinstance(sockaddr[0], int)
592 ]
593
594
595def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]:
596 """
597 Look up the host name of an IP address.
598
599 :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4)
600 :param flags: flags to pass to upstream ``getnameinfo()``
601 :return: a tuple of (host name, service name)
602
603 .. seealso:: :func:`socket.getnameinfo`
604
605 """
606 return get_async_backend().getnameinfo(sockaddr, flags)
607
608
609@deprecated("This function is deprecated; use `wait_readable` instead")
610def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
611 """
612 .. deprecated:: 4.7.0
613 Use :func:`wait_readable` instead.
614
615 Wait until the given socket has data to be read.
616
617 .. warning:: Only use this on raw sockets that have not been wrapped by any higher
618 level constructs like socket streams!
619
620 :param sock: a socket object
621 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
622 socket to become readable
623 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
624 to become readable
625
626 """
627 return get_async_backend().wait_readable(sock.fileno())
628
629
630@deprecated("This function is deprecated; use `wait_writable` instead")
631def wait_socket_writable(sock: socket.socket) -> Awaitable[None]:
632 """
633 .. deprecated:: 4.7.0
634 Use :func:`wait_writable` instead.
635
636 Wait until the given socket can be written to.
637
638 This does **NOT** work on Windows when using the asyncio backend with a proactor
639 event loop (default on py3.8+).
640
641 .. warning:: Only use this on raw sockets that have not been wrapped by any higher
642 level constructs like socket streams!
643
644 :param sock: a socket object
645 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
646 socket to become writable
647 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
648 to become writable
649
650 """
651 return get_async_backend().wait_writable(sock.fileno())
652
653
654def wait_readable(obj: FileDescriptorLike) -> Awaitable[None]:
655 """
656 Wait until the given object has data to be read.
657
658 On Unix systems, ``obj`` must either be an integer file descriptor, or else an
659 object with a ``.fileno()`` method which returns an integer file descriptor. Any
660 kind of file descriptor can be passed, though the exact semantics will depend on
661 your kernel. For example, this probably won't do anything useful for on-disk files.
662
663 On Windows systems, ``obj`` must either be an integer ``SOCKET`` handle, or else an
664 object with a ``.fileno()`` method which returns an integer ``SOCKET`` handle. File
665 descriptors aren't supported, and neither are handles that refer to anything besides
666 a ``SOCKET``.
667
668 On backends where this functionality is not natively provided (asyncio
669 ``ProactorEventLoop`` on Windows), it is provided using a separate selector thread
670 which is set to shut down when the interpreter shuts down.
671
672 .. warning:: Don't use this on raw sockets that have been wrapped by any higher
673 level constructs like socket streams!
674
675 :param obj: an object with a ``.fileno()`` method or an integer handle
676 :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
677 object to become readable
678 :raises ~anyio.BusyResourceError: if another task is already waiting for the object
679 to become readable
680
681 """
682 return get_async_backend().wait_readable(obj)
683
684
685def wait_writable(obj: FileDescriptorLike) -> Awaitable[None]:
686 """
687 Wait until the given object can be written to.
688
689 :param obj: an object with a ``.fileno()`` method or an integer handle
690 :raises ~anyio.ClosedResourceError: if the object was closed while waiting for the
691 object to become writable
692 :raises ~anyio.BusyResourceError: if another task is already waiting for the object
693 to become writable
694
695 .. seealso:: See the documentation of :func:`wait_readable` for the definition of
696 ``obj`` and notes on backend compatibility.
697
698 .. warning:: Don't use this on raw sockets that have been wrapped by any higher
699 level constructs like socket streams!
700
701 """
702 return get_async_backend().wait_writable(obj)
703
704
705#
706# Private API
707#
708
709
710def convert_ipv6_sockaddr(
711 sockaddr: tuple[str, int, int, int] | tuple[str, int],
712) -> tuple[str, int]:
713 """
714 Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format.
715
716 If the scope ID is nonzero, it is added to the address, separated with ``%``.
717 Otherwise the flow id and scope id are simply cut off from the tuple.
718 Any other kinds of socket addresses are returned as-is.
719
720 :param sockaddr: the result of :meth:`~socket.socket.getsockname`
721 :return: the converted socket address
722
723 """
724 # This is more complicated than it should be because of MyPy
725 if isinstance(sockaddr, tuple) and len(sockaddr) == 4:
726 host, port, flowinfo, scope_id = sockaddr
727 if scope_id:
728 # PyPy (as of v7.3.11) leaves the interface name in the result, so
729 # we discard it and only get the scope ID from the end
730 # (https://foss.heptapod.net/pypy/pypy/-/issues/3938)
731 host = host.split("%")[0]
732
733 # Add scope_id to the address
734 return f"{host}%{scope_id}", port
735 else:
736 return host, port
737 else:
738 return sockaddr
739
740
741async def setup_unix_local_socket(
742 path: None | str | bytes | PathLike[Any],
743 mode: int | None,
744 socktype: int,
745) -> socket.socket:
746 """
747 Create a UNIX local socket object, deleting the socket at the given path if it
748 exists.
749
750 Not available on Windows.
751
752 :param path: path of the socket
753 :param mode: permissions to set on the socket
754 :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM
755
756 """
757 path_str: str | None
758 if path is not None:
759 path_str = os.fsdecode(path)
760
761 # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call
762 if not path_str.startswith("\0"):
763 # Copied from pathlib...
764 try:
765 stat_result = os.stat(path)
766 except OSError as e:
767 if e.errno not in (
768 errno.ENOENT,
769 errno.ENOTDIR,
770 errno.EBADF,
771 errno.ELOOP,
772 ):
773 raise
774 else:
775 if stat.S_ISSOCK(stat_result.st_mode):
776 os.unlink(path)
777 else:
778 path_str = None
779
780 raw_socket = socket.socket(socket.AF_UNIX, socktype)
781 raw_socket.setblocking(False)
782
783 if path_str is not None:
784 try:
785 await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True)
786 if mode is not None:
787 await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True)
788 except BaseException:
789 raw_socket.close()
790 raise
791
792 return raw_socket