Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_sockets.py: 23%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import errno
4import os
5import socket
6import ssl
7import stat
8import sys
9from collections.abc import Awaitable
10from ipaddress import IPv6Address, ip_address
11from os import PathLike, chmod
12from socket import AddressFamily, SocketKind
13from typing import Any, Literal, cast, overload
15from .. import to_thread
16from ..abc import (
17 ConnectedUDPSocket,
18 ConnectedUNIXDatagramSocket,
19 IPAddressType,
20 IPSockAddrType,
21 SocketListener,
22 SocketStream,
23 UDPSocket,
24 UNIXDatagramSocket,
25 UNIXSocketStream,
26)
27from ..streams.stapled import MultiListener
28from ..streams.tls import TLSStream
29from ._eventloop import get_async_backend
30from ._resources import aclose_forcefully
31from ._synchronization import Event
32from ._tasks import create_task_group, move_on_after
34if sys.version_info < (3, 11):
35 from exceptiongroup import ExceptionGroup
37IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41) # https://bugs.python.org/issue29515
39AnyIPAddressFamily = Literal[
40 AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6
41]
42IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6]
45# tls_hostname given
46@overload
47async def connect_tcp(
48 remote_host: IPAddressType,
49 remote_port: int,
50 *,
51 local_host: IPAddressType | None = ...,
52 ssl_context: ssl.SSLContext | None = ...,
53 tls_standard_compatible: bool = ...,
54 tls_hostname: str,
55 happy_eyeballs_delay: float = ...,
56) -> TLSStream: ...
59# ssl_context given
60@overload
61async def connect_tcp(
62 remote_host: IPAddressType,
63 remote_port: int,
64 *,
65 local_host: IPAddressType | None = ...,
66 ssl_context: ssl.SSLContext,
67 tls_standard_compatible: bool = ...,
68 tls_hostname: str | None = ...,
69 happy_eyeballs_delay: float = ...,
70) -> TLSStream: ...
73# tls=True
74@overload
75async def connect_tcp(
76 remote_host: IPAddressType,
77 remote_port: int,
78 *,
79 local_host: IPAddressType | None = ...,
80 tls: Literal[True],
81 ssl_context: ssl.SSLContext | None = ...,
82 tls_standard_compatible: bool = ...,
83 tls_hostname: str | None = ...,
84 happy_eyeballs_delay: float = ...,
85) -> TLSStream: ...
88# tls=False
89@overload
90async def connect_tcp(
91 remote_host: IPAddressType,
92 remote_port: int,
93 *,
94 local_host: IPAddressType | None = ...,
95 tls: Literal[False],
96 ssl_context: ssl.SSLContext | None = ...,
97 tls_standard_compatible: bool = ...,
98 tls_hostname: str | None = ...,
99 happy_eyeballs_delay: float = ...,
100) -> SocketStream: ...
103# No TLS arguments
104@overload
105async def connect_tcp(
106 remote_host: IPAddressType,
107 remote_port: int,
108 *,
109 local_host: IPAddressType | None = ...,
110 happy_eyeballs_delay: float = ...,
111) -> SocketStream: ...
114async def connect_tcp(
115 remote_host: IPAddressType,
116 remote_port: int,
117 *,
118 local_host: IPAddressType | None = None,
119 tls: bool = False,
120 ssl_context: ssl.SSLContext | None = None,
121 tls_standard_compatible: bool = True,
122 tls_hostname: str | None = None,
123 happy_eyeballs_delay: float = 0.25,
124) -> SocketStream | TLSStream:
125 """
126 Connect to a host using the TCP protocol.
128 This function implements the stateless version of the Happy Eyeballs algorithm (RFC
129 6555). If ``remote_host`` is a host name that resolves to multiple IP addresses,
130 each one is tried until one connection attempt succeeds. If the first attempt does
131 not connected within 250 milliseconds, a second attempt is started using the next
132 address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if
133 available) is tried first.
135 When the connection has been established, a TLS handshake will be done if either
136 ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.
138 :param remote_host: the IP address or host name to connect to
139 :param remote_port: port on the target host to connect to
140 :param local_host: the interface address or name to bind the socket to before
141 connecting
142 :param tls: ``True`` to do a TLS handshake with the connected stream and return a
143 :class:`~anyio.streams.tls.TLSStream` instead
144 :param ssl_context: the SSL context object to use (if omitted, a default context is
145 created)
146 :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake
147 before closing the stream and requires that the server does this as well.
148 Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
149 Some protocols, such as HTTP, require this option to be ``False``.
150 See :meth:`~ssl.SSLContext.wrap_socket` for details.
151 :param tls_hostname: host name to check the server certificate against (defaults to
152 the value of ``remote_host``)
153 :param happy_eyeballs_delay: delay (in seconds) before starting the next connection
154 attempt
155 :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
156 :raises OSError: if the connection attempt fails
158 """
159 # Placed here due to https://github.com/python/mypy/issues/7057
160 connected_stream: SocketStream | None = None
162 async def try_connect(remote_host: str, event: Event) -> None:
163 nonlocal connected_stream
164 try:
165 stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
166 except OSError as exc:
167 oserrors.append(exc)
168 return
169 else:
170 if connected_stream is None:
171 connected_stream = stream
172 tg.cancel_scope.cancel()
173 else:
174 await stream.aclose()
175 finally:
176 event.set()
178 asynclib = get_async_backend()
179 local_address: IPSockAddrType | None = None
180 family = socket.AF_UNSPEC
181 if local_host:
182 gai_res = await getaddrinfo(str(local_host), None)
183 family, *_, local_address = gai_res[0]
185 target_host = str(remote_host)
186 try:
187 addr_obj = ip_address(remote_host)
188 except ValueError:
189 # getaddrinfo() will raise an exception if name resolution fails
190 gai_res = await getaddrinfo(
191 target_host, remote_port, family=family, type=socket.SOCK_STREAM
192 )
194 # Organize the list so that the first address is an IPv6 address (if available)
195 # and the second one is an IPv4 addresses. The rest can be in whatever order.
196 v6_found = v4_found = False
197 target_addrs: list[tuple[socket.AddressFamily, str]] = []
198 for af, *rest, sa in gai_res:
199 if af == socket.AF_INET6 and not v6_found:
200 v6_found = True
201 target_addrs.insert(0, (af, sa[0]))
202 elif af == socket.AF_INET and not v4_found and v6_found:
203 v4_found = True
204 target_addrs.insert(1, (af, sa[0]))
205 else:
206 target_addrs.append((af, sa[0]))
207 else:
208 if isinstance(addr_obj, IPv6Address):
209 target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
210 else:
211 target_addrs = [(socket.AF_INET, addr_obj.compressed)]
213 oserrors: list[OSError] = []
214 async with create_task_group() as tg:
215 for i, (af, addr) in enumerate(target_addrs):
216 event = Event()
217 tg.start_soon(try_connect, addr, event)
218 with move_on_after(happy_eyeballs_delay):
219 await event.wait()
221 if connected_stream is None:
222 cause = (
223 oserrors[0]
224 if len(oserrors) == 1
225 else ExceptionGroup("multiple connection attempts failed", oserrors)
226 )
227 raise OSError("All connection attempts failed") from cause
229 if tls or tls_hostname or ssl_context:
230 try:
231 return await TLSStream.wrap(
232 connected_stream,
233 server_side=False,
234 hostname=tls_hostname or str(remote_host),
235 ssl_context=ssl_context,
236 standard_compatible=tls_standard_compatible,
237 )
238 except BaseException:
239 await aclose_forcefully(connected_stream)
240 raise
242 return connected_stream
245async def connect_unix(path: str | bytes | PathLike[Any]) -> UNIXSocketStream:
246 """
247 Connect to the given UNIX socket.
249 Not available on Windows.
251 :param path: path to the socket
252 :return: a socket stream object
254 """
255 path = os.fspath(path)
256 return await get_async_backend().connect_unix(path)
259async def create_tcp_listener(
260 *,
261 local_host: IPAddressType | None = None,
262 local_port: int = 0,
263 family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC,
264 backlog: int = 65536,
265 reuse_port: bool = False,
266) -> MultiListener[SocketStream]:
267 """
268 Create a TCP socket listener.
270 :param local_port: port number to listen on
271 :param local_host: IP address of the interface to listen on. If omitted, listen on
272 all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address
273 family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6.
274 :param family: address family (used if ``local_host`` was omitted)
275 :param backlog: maximum number of queued incoming connections (up to a maximum of
276 2**16, or 65536)
277 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
278 address/port (not supported on Windows)
279 :return: a list of listener objects
281 """
282 asynclib = get_async_backend()
283 backlog = min(backlog, 65536)
284 local_host = str(local_host) if local_host is not None else None
285 gai_res = await getaddrinfo(
286 local_host,
287 local_port,
288 family=family,
289 type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0,
290 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
291 )
292 listeners: list[SocketListener] = []
293 try:
294 # The set() is here to work around a glibc bug:
295 # https://sourceware.org/bugzilla/show_bug.cgi?id=14969
296 sockaddr: tuple[str, int] | tuple[str, int, int, int]
297 for fam, kind, *_, sockaddr in sorted(set(gai_res)):
298 # Workaround for an uvloop bug where we don't get the correct scope ID for
299 # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to
300 # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539
301 if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM:
302 continue
304 raw_socket = socket.socket(fam)
305 raw_socket.setblocking(False)
307 # For Windows, enable exclusive address use. For others, enable address
308 # reuse.
309 if sys.platform == "win32":
310 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
311 else:
312 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
314 if reuse_port:
315 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
317 # If only IPv6 was requested, disable dual stack operation
318 if fam == socket.AF_INET6:
319 raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
321 # Workaround for #554
322 if "%" in sockaddr[0]:
323 addr, scope_id = sockaddr[0].split("%", 1)
324 sockaddr = (addr, sockaddr[1], 0, int(scope_id))
326 raw_socket.bind(sockaddr)
327 raw_socket.listen(backlog)
328 listener = asynclib.create_tcp_listener(raw_socket)
329 listeners.append(listener)
330 except BaseException:
331 for listener in listeners:
332 await listener.aclose()
334 raise
336 return MultiListener(listeners)
339async def create_unix_listener(
340 path: str | bytes | PathLike[Any],
341 *,
342 mode: int | None = None,
343 backlog: int = 65536,
344) -> SocketListener:
345 """
346 Create a UNIX socket listener.
348 Not available on Windows.
350 :param path: path of the socket
351 :param mode: permissions to set on the socket
352 :param backlog: maximum number of queued incoming connections (up to a maximum of
353 2**16, or 65536)
354 :return: a listener object
356 .. versionchanged:: 3.0
357 If a socket already exists on the file system in the given path, it will be
358 removed first.
360 """
361 backlog = min(backlog, 65536)
362 raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM)
363 try:
364 raw_socket.listen(backlog)
365 return get_async_backend().create_unix_listener(raw_socket)
366 except BaseException:
367 raw_socket.close()
368 raise
371async def create_udp_socket(
372 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
373 *,
374 local_host: IPAddressType | None = None,
375 local_port: int = 0,
376 reuse_port: bool = False,
377) -> UDPSocket:
378 """
379 Create a UDP socket.
381 If ``port`` has been given, the socket will be bound to this port on the local
382 machine, making this socket suitable for providing UDP based services.
384 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
385 determined from ``local_host`` if omitted
386 :param local_host: IP address or host name of the local interface to bind to
387 :param local_port: local port to bind to
388 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
389 address/port (not supported on Windows)
390 :return: a UDP socket
392 """
393 if family is AddressFamily.AF_UNSPEC and not local_host:
394 raise ValueError('Either "family" or "local_host" must be given')
396 if local_host:
397 gai_res = await getaddrinfo(
398 str(local_host),
399 local_port,
400 family=family,
401 type=socket.SOCK_DGRAM,
402 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
403 )
404 family = cast(AnyIPAddressFamily, gai_res[0][0])
405 local_address = gai_res[0][-1]
406 elif family is AddressFamily.AF_INET6:
407 local_address = ("::", 0)
408 else:
409 local_address = ("0.0.0.0", 0)
411 sock = await get_async_backend().create_udp_socket(
412 family, local_address, None, reuse_port
413 )
414 return cast(UDPSocket, sock)
417async def create_connected_udp_socket(
418 remote_host: IPAddressType,
419 remote_port: int,
420 *,
421 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
422 local_host: IPAddressType | None = None,
423 local_port: int = 0,
424 reuse_port: bool = False,
425) -> ConnectedUDPSocket:
426 """
427 Create a connected UDP socket.
429 Connected UDP sockets can only communicate with the specified remote host/port, an
430 any packets sent from other sources are dropped.
432 :param remote_host: remote host to set as the default target
433 :param remote_port: port on the remote host to set as the default target
434 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
435 determined from ``local_host`` or ``remote_host`` if omitted
436 :param local_host: IP address or host name of the local interface to bind to
437 :param local_port: local port to bind to
438 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
439 address/port (not supported on Windows)
440 :return: a connected UDP socket
442 """
443 local_address = None
444 if local_host:
445 gai_res = await getaddrinfo(
446 str(local_host),
447 local_port,
448 family=family,
449 type=socket.SOCK_DGRAM,
450 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
451 )
452 family = cast(AnyIPAddressFamily, gai_res[0][0])
453 local_address = gai_res[0][-1]
455 gai_res = await getaddrinfo(
456 str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM
457 )
458 family = cast(AnyIPAddressFamily, gai_res[0][0])
459 remote_address = gai_res[0][-1]
461 sock = await get_async_backend().create_udp_socket(
462 family, local_address, remote_address, reuse_port
463 )
464 return cast(ConnectedUDPSocket, sock)
467async def create_unix_datagram_socket(
468 *,
469 local_path: None | str | bytes | PathLike[Any] = None,
470 local_mode: int | None = None,
471) -> UNIXDatagramSocket:
472 """
473 Create a UNIX datagram socket.
475 Not available on Windows.
477 If ``local_path`` has been given, the socket will be bound to this path, making this
478 socket suitable for receiving datagrams from other processes. Other processes can
479 send datagrams to this socket only if ``local_path`` is set.
481 If a socket already exists on the file system in the ``local_path``, it will be
482 removed first.
484 :param local_path: the path on which to bind to
485 :param local_mode: permissions to set on the local socket
486 :return: a UNIX datagram socket
488 """
489 raw_socket = await setup_unix_local_socket(
490 local_path, local_mode, socket.SOCK_DGRAM
491 )
492 return await get_async_backend().create_unix_datagram_socket(raw_socket, None)
495async def create_connected_unix_datagram_socket(
496 remote_path: str | bytes | PathLike[Any],
497 *,
498 local_path: None | str | bytes | PathLike[Any] = None,
499 local_mode: int | None = None,
500) -> ConnectedUNIXDatagramSocket:
501 """
502 Create a connected UNIX datagram socket.
504 Connected datagram sockets can only communicate with the specified remote path.
506 If ``local_path`` has been given, the socket will be bound to this path, making
507 this socket suitable for receiving datagrams from other processes. Other processes
508 can send datagrams to this socket only if ``local_path`` is set.
510 If a socket already exists on the file system in the ``local_path``, it will be
511 removed first.
513 :param remote_path: the path to set as the default target
514 :param local_path: the path on which to bind to
515 :param local_mode: permissions to set on the local socket
516 :return: a connected UNIX datagram socket
518 """
519 remote_path = os.fspath(remote_path)
520 raw_socket = await setup_unix_local_socket(
521 local_path, local_mode, socket.SOCK_DGRAM
522 )
523 return await get_async_backend().create_unix_datagram_socket(
524 raw_socket, remote_path
525 )
528async def getaddrinfo(
529 host: bytes | str | None,
530 port: str | int | None,
531 *,
532 family: int | AddressFamily = 0,
533 type: int | SocketKind = 0,
534 proto: int = 0,
535 flags: int = 0,
536) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]:
537 """
538 Look up a numeric IP address given a host name.
540 Internationalized domain names are translated according to the (non-transitional)
541 IDNA 2008 standard.
543 .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of
544 (host, port), unlike what :func:`socket.getaddrinfo` does.
546 :param host: host name
547 :param port: port number
548 :param family: socket family (`'AF_INET``, ...)
549 :param type: socket type (``SOCK_STREAM``, ...)
550 :param proto: protocol number
551 :param flags: flags to pass to upstream ``getaddrinfo()``
552 :return: list of tuples containing (family, type, proto, canonname, sockaddr)
554 .. seealso:: :func:`socket.getaddrinfo`
556 """
557 # Handle unicode hostnames
558 if isinstance(host, str):
559 try:
560 encoded_host: bytes | None = host.encode("ascii")
561 except UnicodeEncodeError:
562 import idna
564 encoded_host = idna.encode(host, uts46=True)
565 else:
566 encoded_host = host
568 gai_res = await get_async_backend().getaddrinfo(
569 encoded_host, port, family=family, type=type, proto=proto, flags=flags
570 )
571 return [
572 (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr))
573 for family, type, proto, canonname, sockaddr in gai_res
574 ]
577def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]:
578 """
579 Look up the host name of an IP address.
581 :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4)
582 :param flags: flags to pass to upstream ``getnameinfo()``
583 :return: a tuple of (host name, service name)
585 .. seealso:: :func:`socket.getnameinfo`
587 """
588 return get_async_backend().getnameinfo(sockaddr, flags)
591def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
592 """
593 Wait until the given socket has data to be read.
595 This does **NOT** work on Windows when using the asyncio backend with a proactor
596 event loop (default on py3.8+).
598 .. warning:: Only use this on raw sockets that have not been wrapped by any higher
599 level constructs like socket streams!
601 :param sock: a socket object
602 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
603 socket to become readable
604 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
605 to become readable
607 """
608 return get_async_backend().wait_socket_readable(sock)
611def wait_socket_writable(sock: socket.socket) -> Awaitable[None]:
612 """
613 Wait until the given socket can be written to.
615 This does **NOT** work on Windows when using the asyncio backend with a proactor
616 event loop (default on py3.8+).
618 .. warning:: Only use this on raw sockets that have not been wrapped by any higher
619 level constructs like socket streams!
621 :param sock: a socket object
622 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
623 socket to become writable
624 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
625 to become writable
627 """
628 return get_async_backend().wait_socket_writable(sock)
631#
632# Private API
633#
636def convert_ipv6_sockaddr(
637 sockaddr: tuple[str, int, int, int] | tuple[str, int],
638) -> tuple[str, int]:
639 """
640 Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format.
642 If the scope ID is nonzero, it is added to the address, separated with ``%``.
643 Otherwise the flow id and scope id are simply cut off from the tuple.
644 Any other kinds of socket addresses are returned as-is.
646 :param sockaddr: the result of :meth:`~socket.socket.getsockname`
647 :return: the converted socket address
649 """
650 # This is more complicated than it should be because of MyPy
651 if isinstance(sockaddr, tuple) and len(sockaddr) == 4:
652 host, port, flowinfo, scope_id = sockaddr
653 if scope_id:
654 # PyPy (as of v7.3.11) leaves the interface name in the result, so
655 # we discard it and only get the scope ID from the end
656 # (https://foss.heptapod.net/pypy/pypy/-/issues/3938)
657 host = host.split("%")[0]
659 # Add scope_id to the address
660 return f"{host}%{scope_id}", port
661 else:
662 return host, port
663 else:
664 return sockaddr
667async def setup_unix_local_socket(
668 path: None | str | bytes | PathLike[Any],
669 mode: int | None,
670 socktype: int,
671) -> socket.socket:
672 """
673 Create a UNIX local socket object, deleting the socket at the given path if it
674 exists.
676 Not available on Windows.
678 :param path: path of the socket
679 :param mode: permissions to set on the socket
680 :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM
682 """
683 path_str: str | None
684 if path is not None:
685 path_str = os.fsdecode(path)
687 # Linux abstract namespace sockets aren't backed by a concrete file so skip stat call
688 if not path_str.startswith("\0"):
689 # Copied from pathlib...
690 try:
691 stat_result = os.stat(path)
692 except OSError as e:
693 if e.errno not in (
694 errno.ENOENT,
695 errno.ENOTDIR,
696 errno.EBADF,
697 errno.ELOOP,
698 ):
699 raise
700 else:
701 if stat.S_ISSOCK(stat_result.st_mode):
702 os.unlink(path)
703 else:
704 path_str = None
706 raw_socket = socket.socket(socket.AF_UNIX, socktype)
707 raw_socket.setblocking(False)
709 if path_str is not None:
710 try:
711 await to_thread.run_sync(raw_socket.bind, path_str, abandon_on_cancel=True)
712 if mode is not None:
713 await to_thread.run_sync(chmod, path_str, mode, abandon_on_cancel=True)
714 except BaseException:
715 raw_socket.close()
716 raise
718 return raw_socket