Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/anyio/_core/_sockets.py: 23%
207 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 06:38 +0000
1from __future__ import annotations
3import socket
4import ssl
5import sys
6from collections.abc import Awaitable
7from ipaddress import IPv6Address, ip_address
8from os import PathLike, chmod
9from pathlib import Path
10from socket import AddressFamily, SocketKind
11from typing import Literal, Tuple, cast, overload
13from .. import to_thread
14from ..abc import (
15 ConnectedUDPSocket,
16 ConnectedUNIXDatagramSocket,
17 IPAddressType,
18 IPSockAddrType,
19 SocketListener,
20 SocketStream,
21 UDPSocket,
22 UNIXDatagramSocket,
23 UNIXSocketStream,
24)
25from ..streams.stapled import MultiListener
26from ..streams.tls import TLSStream
27from ._eventloop import get_async_backend
28from ._resources import aclose_forcefully
29from ._synchronization import Event
30from ._tasks import create_task_group, move_on_after
32if sys.version_info < (3, 11):
33 from exceptiongroup import ExceptionGroup
35IPPROTO_IPV6 = getattr(socket, "IPPROTO_IPV6", 41) # https://bugs.python.org/issue29515
37AnyIPAddressFamily = Literal[
38 AddressFamily.AF_UNSPEC, AddressFamily.AF_INET, AddressFamily.AF_INET6
39]
40IPAddressFamily = Literal[AddressFamily.AF_INET, AddressFamily.AF_INET6]
43# tls_hostname given
44@overload
45async def connect_tcp(
46 remote_host: IPAddressType,
47 remote_port: int,
48 *,
49 local_host: IPAddressType | None = ...,
50 ssl_context: ssl.SSLContext | None = ...,
51 tls_standard_compatible: bool = ...,
52 tls_hostname: str,
53 happy_eyeballs_delay: float = ...,
54) -> TLSStream:
55 ...
58# ssl_context given
59@overload
60async def connect_tcp(
61 remote_host: IPAddressType,
62 remote_port: int,
63 *,
64 local_host: IPAddressType | None = ...,
65 ssl_context: ssl.SSLContext,
66 tls_standard_compatible: bool = ...,
67 tls_hostname: str | None = ...,
68 happy_eyeballs_delay: float = ...,
69) -> TLSStream:
70 ...
73# tls=True
74@overload
75async def connect_tcp(
76 remote_host: IPAddressType,
77 remote_port: int,
78 *,
79 local_host: IPAddressType | None = ...,
80 tls: Literal[True],
81 ssl_context: ssl.SSLContext | None = ...,
82 tls_standard_compatible: bool = ...,
83 tls_hostname: str | None = ...,
84 happy_eyeballs_delay: float = ...,
85) -> TLSStream:
86 ...
89# tls=False
90@overload
91async def connect_tcp(
92 remote_host: IPAddressType,
93 remote_port: int,
94 *,
95 local_host: IPAddressType | None = ...,
96 tls: Literal[False],
97 ssl_context: ssl.SSLContext | None = ...,
98 tls_standard_compatible: bool = ...,
99 tls_hostname: str | None = ...,
100 happy_eyeballs_delay: float = ...,
101) -> SocketStream:
102 ...
105# No TLS arguments
106@overload
107async def connect_tcp(
108 remote_host: IPAddressType,
109 remote_port: int,
110 *,
111 local_host: IPAddressType | None = ...,
112 happy_eyeballs_delay: float = ...,
113) -> SocketStream:
114 ...
117async def connect_tcp(
118 remote_host: IPAddressType,
119 remote_port: int,
120 *,
121 local_host: IPAddressType | None = None,
122 tls: bool = False,
123 ssl_context: ssl.SSLContext | None = None,
124 tls_standard_compatible: bool = True,
125 tls_hostname: str | None = None,
126 happy_eyeballs_delay: float = 0.25,
127) -> SocketStream | TLSStream:
128 """
129 Connect to a host using the TCP protocol.
131 This function implements the stateless version of the Happy Eyeballs algorithm (RFC
132 6555). If ``remote_host`` is a host name that resolves to multiple IP addresses,
133 each one is tried until one connection attempt succeeds. If the first attempt does
134 not connected within 250 milliseconds, a second attempt is started using the next
135 address in the list, and so on. On IPv6 enabled systems, an IPv6 address (if
136 available) is tried first.
138 When the connection has been established, a TLS handshake will be done if either
139 ``ssl_context`` or ``tls_hostname`` is not ``None``, or if ``tls`` is ``True``.
141 :param remote_host: the IP address or host name to connect to
142 :param remote_port: port on the target host to connect to
143 :param local_host: the interface address or name to bind the socket to before
144 connecting
145 :param tls: ``True`` to do a TLS handshake with the connected stream and return a
146 :class:`~anyio.streams.tls.TLSStream` instead
147 :param ssl_context: the SSL context object to use (if omitted, a default context is
148 created)
149 :param tls_standard_compatible: If ``True``, performs the TLS shutdown handshake
150 before closing the stream and requires that the server does this as well.
151 Otherwise, :exc:`~ssl.SSLEOFError` may be raised during reads from the stream.
152 Some protocols, such as HTTP, require this option to be ``False``.
153 See :meth:`~ssl.SSLContext.wrap_socket` for details.
154 :param tls_hostname: host name to check the server certificate against (defaults to
155 the value of ``remote_host``)
156 :param happy_eyeballs_delay: delay (in seconds) before starting the next connection
157 attempt
158 :return: a socket stream object if no TLS handshake was done, otherwise a TLS stream
159 :raises OSError: if the connection attempt fails
161 """
162 # Placed here due to https://github.com/python/mypy/issues/7057
163 connected_stream: SocketStream | None = None
165 async def try_connect(remote_host: str, event: Event) -> None:
166 nonlocal connected_stream
167 try:
168 stream = await asynclib.connect_tcp(remote_host, remote_port, local_address)
169 except OSError as exc:
170 oserrors.append(exc)
171 return
172 else:
173 if connected_stream is None:
174 connected_stream = stream
175 tg.cancel_scope.cancel()
176 else:
177 await stream.aclose()
178 finally:
179 event.set()
181 asynclib = get_async_backend()
182 local_address: IPSockAddrType | None = None
183 family = socket.AF_UNSPEC
184 if local_host:
185 gai_res = await getaddrinfo(str(local_host), None)
186 family, *_, local_address = gai_res[0]
188 target_host = str(remote_host)
189 try:
190 addr_obj = ip_address(remote_host)
191 except ValueError:
192 # getaddrinfo() will raise an exception if name resolution fails
193 gai_res = await getaddrinfo(
194 target_host, remote_port, family=family, type=socket.SOCK_STREAM
195 )
197 # Organize the list so that the first address is an IPv6 address (if available)
198 # and the second one is an IPv4 addresses. The rest can be in whatever order.
199 v6_found = v4_found = False
200 target_addrs: list[tuple[socket.AddressFamily, str]] = []
201 for af, *rest, sa in gai_res:
202 if af == socket.AF_INET6 and not v6_found:
203 v6_found = True
204 target_addrs.insert(0, (af, sa[0]))
205 elif af == socket.AF_INET and not v4_found and v6_found:
206 v4_found = True
207 target_addrs.insert(1, (af, sa[0]))
208 else:
209 target_addrs.append((af, sa[0]))
210 else:
211 if isinstance(addr_obj, IPv6Address):
212 target_addrs = [(socket.AF_INET6, addr_obj.compressed)]
213 else:
214 target_addrs = [(socket.AF_INET, addr_obj.compressed)]
216 oserrors: list[OSError] = []
217 async with create_task_group() as tg:
218 for i, (af, addr) in enumerate(target_addrs):
219 event = Event()
220 tg.start_soon(try_connect, addr, event)
221 with move_on_after(happy_eyeballs_delay):
222 await event.wait()
224 if connected_stream is None:
225 cause = (
226 oserrors[0]
227 if len(oserrors) == 1
228 else ExceptionGroup("multiple connection attempts failed", oserrors)
229 )
230 raise OSError("All connection attempts failed") from cause
232 if tls or tls_hostname or ssl_context:
233 try:
234 return await TLSStream.wrap(
235 connected_stream,
236 server_side=False,
237 hostname=tls_hostname or str(remote_host),
238 ssl_context=ssl_context,
239 standard_compatible=tls_standard_compatible,
240 )
241 except BaseException:
242 await aclose_forcefully(connected_stream)
243 raise
245 return connected_stream
248async def connect_unix(path: str | PathLike[str]) -> UNIXSocketStream:
249 """
250 Connect to the given UNIX socket.
252 Not available on Windows.
254 :param path: path to the socket
255 :return: a socket stream object
257 """
258 path = str(Path(path))
259 return await get_async_backend().connect_unix(path)
262async def create_tcp_listener(
263 *,
264 local_host: IPAddressType | None = None,
265 local_port: int = 0,
266 family: AnyIPAddressFamily = socket.AddressFamily.AF_UNSPEC,
267 backlog: int = 65536,
268 reuse_port: bool = False,
269) -> MultiListener[SocketStream]:
270 """
271 Create a TCP socket listener.
273 :param local_port: port number to listen on
274 :param local_host: IP address of the interface to listen on. If omitted, listen on
275 all IPv4 and IPv6 interfaces. To listen on all interfaces on a specific address
276 family, use ``0.0.0.0`` for IPv4 or ``::`` for IPv6.
277 :param family: address family (used if ``local_host`` was omitted)
278 :param backlog: maximum number of queued incoming connections (up to a maximum of
279 2**16, or 65536)
280 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
281 address/port (not supported on Windows)
282 :return: a list of listener objects
284 """
285 asynclib = get_async_backend()
286 backlog = min(backlog, 65536)
287 local_host = str(local_host) if local_host is not None else None
288 gai_res = await getaddrinfo(
289 local_host,
290 local_port,
291 family=family,
292 type=socket.SocketKind.SOCK_STREAM if sys.platform == "win32" else 0,
293 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
294 )
295 listeners: list[SocketListener] = []
296 try:
297 # The set() is here to work around a glibc bug:
298 # https://sourceware.org/bugzilla/show_bug.cgi?id=14969
299 sockaddr: tuple[str, int] | tuple[str, int, int, int]
300 for fam, kind, *_, sockaddr in sorted(set(gai_res)):
301 # Workaround for an uvloop bug where we don't get the correct scope ID for
302 # IPv6 link-local addresses when passing type=socket.SOCK_STREAM to
303 # getaddrinfo(): https://github.com/MagicStack/uvloop/issues/539
304 if sys.platform != "win32" and kind is not SocketKind.SOCK_STREAM:
305 continue
307 raw_socket = socket.socket(fam)
308 raw_socket.setblocking(False)
310 # For Windows, enable exclusive address use. For others, enable address
311 # reuse.
312 if sys.platform == "win32":
313 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_EXCLUSIVEADDRUSE, 1)
314 else:
315 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
317 if reuse_port:
318 raw_socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEPORT, 1)
320 # If only IPv6 was requested, disable dual stack operation
321 if fam == socket.AF_INET6:
322 raw_socket.setsockopt(IPPROTO_IPV6, socket.IPV6_V6ONLY, 1)
324 # Workaround for #554
325 if "%" in sockaddr[0]:
326 addr, scope_id = sockaddr[0].split("%", 1)
327 sockaddr = (addr, sockaddr[1], 0, int(scope_id))
329 raw_socket.bind(sockaddr)
330 raw_socket.listen(backlog)
331 listener = asynclib.create_tcp_listener(raw_socket)
332 listeners.append(listener)
333 except BaseException:
334 for listener in listeners:
335 await listener.aclose()
337 raise
339 return MultiListener(listeners)
342async def create_unix_listener(
343 path: str | PathLike[str], *, mode: int | None = None, backlog: int = 65536
344) -> SocketListener:
345 """
346 Create a UNIX socket listener.
348 Not available on Windows.
350 :param path: path of the socket
351 :param mode: permissions to set on the socket
352 :param backlog: maximum number of queued incoming connections (up to a maximum of
353 2**16, or 65536)
354 :return: a listener object
356 .. versionchanged:: 3.0
357 If a socket already exists on the file system in the given path, it will be
358 removed first.
360 """
361 backlog = min(backlog, 65536)
362 raw_socket = await setup_unix_local_socket(path, mode, socket.SOCK_STREAM)
363 try:
364 raw_socket.listen(backlog)
365 return get_async_backend().create_unix_listener(raw_socket)
366 except BaseException:
367 raw_socket.close()
368 raise
371async def create_udp_socket(
372 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
373 *,
374 local_host: IPAddressType | None = None,
375 local_port: int = 0,
376 reuse_port: bool = False,
377) -> UDPSocket:
378 """
379 Create a UDP socket.
381 If ``port`` has been given, the socket will be bound to this port on the local
382 machine, making this socket suitable for providing UDP based services.
384 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
385 determined from ``local_host`` if omitted
386 :param local_host: IP address or host name of the local interface to bind to
387 :param local_port: local port to bind to
388 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
389 address/port (not supported on Windows)
390 :return: a UDP socket
392 """
393 if family is AddressFamily.AF_UNSPEC and not local_host:
394 raise ValueError('Either "family" or "local_host" must be given')
396 if local_host:
397 gai_res = await getaddrinfo(
398 str(local_host),
399 local_port,
400 family=family,
401 type=socket.SOCK_DGRAM,
402 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
403 )
404 family = cast(AnyIPAddressFamily, gai_res[0][0])
405 local_address = gai_res[0][-1]
406 elif family is AddressFamily.AF_INET6:
407 local_address = ("::", 0)
408 else:
409 local_address = ("0.0.0.0", 0)
411 sock = await get_async_backend().create_udp_socket(
412 family, local_address, None, reuse_port
413 )
414 return cast(UDPSocket, sock)
417async def create_connected_udp_socket(
418 remote_host: IPAddressType,
419 remote_port: int,
420 *,
421 family: AnyIPAddressFamily = AddressFamily.AF_UNSPEC,
422 local_host: IPAddressType | None = None,
423 local_port: int = 0,
424 reuse_port: bool = False,
425) -> ConnectedUDPSocket:
426 """
427 Create a connected UDP socket.
429 Connected UDP sockets can only communicate with the specified remote host/port, an
430 any packets sent from other sources are dropped.
432 :param remote_host: remote host to set as the default target
433 :param remote_port: port on the remote host to set as the default target
434 :param family: address family (``AF_INET`` or ``AF_INET6``) – automatically
435 determined from ``local_host`` or ``remote_host`` if omitted
436 :param local_host: IP address or host name of the local interface to bind to
437 :param local_port: local port to bind to
438 :param reuse_port: ``True`` to allow multiple sockets to bind to the same
439 address/port (not supported on Windows)
440 :return: a connected UDP socket
442 """
443 local_address = None
444 if local_host:
445 gai_res = await getaddrinfo(
446 str(local_host),
447 local_port,
448 family=family,
449 type=socket.SOCK_DGRAM,
450 flags=socket.AI_PASSIVE | socket.AI_ADDRCONFIG,
451 )
452 family = cast(AnyIPAddressFamily, gai_res[0][0])
453 local_address = gai_res[0][-1]
455 gai_res = await getaddrinfo(
456 str(remote_host), remote_port, family=family, type=socket.SOCK_DGRAM
457 )
458 family = cast(AnyIPAddressFamily, gai_res[0][0])
459 remote_address = gai_res[0][-1]
461 sock = await get_async_backend().create_udp_socket(
462 family, local_address, remote_address, reuse_port
463 )
464 return cast(ConnectedUDPSocket, sock)
467async def create_unix_datagram_socket(
468 *,
469 local_path: None | str | PathLike[str] = None,
470 local_mode: int | None = None,
471) -> UNIXDatagramSocket:
472 """
473 Create a UNIX datagram socket.
475 Not available on Windows.
477 If ``local_path`` has been given, the socket will be bound to this path, making this
478 socket suitable for receiving datagrams from other processes. Other processes can
479 send datagrams to this socket only if ``local_path`` is set.
481 If a socket already exists on the file system in the ``local_path``, it will be
482 removed first.
484 :param local_path: the path on which to bind to
485 :param local_mode: permissions to set on the local socket
486 :return: a UNIX datagram socket
488 """
489 raw_socket = await setup_unix_local_socket(
490 local_path, local_mode, socket.SOCK_DGRAM
491 )
492 return await get_async_backend().create_unix_datagram_socket(raw_socket, None)
495async def create_connected_unix_datagram_socket(
496 remote_path: str | PathLike[str],
497 *,
498 local_path: None | str | PathLike[str] = None,
499 local_mode: int | None = None,
500) -> ConnectedUNIXDatagramSocket:
501 """
502 Create a connected UNIX datagram socket.
504 Connected datagram sockets can only communicate with the specified remote path.
506 If ``local_path`` has been given, the socket will be bound to this path, making
507 this socket suitable for receiving datagrams from other processes. Other processes
508 can send datagrams to this socket only if ``local_path`` is set.
510 If a socket already exists on the file system in the ``local_path``, it will be
511 removed first.
513 :param remote_path: the path to set as the default target
514 :param local_path: the path on which to bind to
515 :param local_mode: permissions to set on the local socket
516 :return: a connected UNIX datagram socket
518 """
519 remote_path = str(Path(remote_path))
520 raw_socket = await setup_unix_local_socket(
521 local_path, local_mode, socket.SOCK_DGRAM
522 )
523 return await get_async_backend().create_unix_datagram_socket(
524 raw_socket, remote_path
525 )
528async def getaddrinfo(
529 host: bytes | str | None,
530 port: str | int | None,
531 *,
532 family: int | AddressFamily = 0,
533 type: int | SocketKind = 0,
534 proto: int = 0,
535 flags: int = 0,
536) -> list[tuple[AddressFamily, SocketKind, int, str, tuple[str, int]]]:
537 """
538 Look up a numeric IP address given a host name.
540 Internationalized domain names are translated according to the (non-transitional)
541 IDNA 2008 standard.
543 .. note:: 4-tuple IPv6 socket addresses are automatically converted to 2-tuples of
544 (host, port), unlike what :func:`socket.getaddrinfo` does.
546 :param host: host name
547 :param port: port number
548 :param family: socket family (`'AF_INET``, ...)
549 :param type: socket type (``SOCK_STREAM``, ...)
550 :param proto: protocol number
551 :param flags: flags to pass to upstream ``getaddrinfo()``
552 :return: list of tuples containing (family, type, proto, canonname, sockaddr)
554 .. seealso:: :func:`socket.getaddrinfo`
556 """
557 # Handle unicode hostnames
558 if isinstance(host, str):
559 try:
560 encoded_host: bytes | None = host.encode("ascii")
561 except UnicodeEncodeError:
562 import idna
564 encoded_host = idna.encode(host, uts46=True)
565 else:
566 encoded_host = host
568 gai_res = await get_async_backend().getaddrinfo(
569 encoded_host, port, family=family, type=type, proto=proto, flags=flags
570 )
571 return [
572 (family, type, proto, canonname, convert_ipv6_sockaddr(sockaddr))
573 for family, type, proto, canonname, sockaddr in gai_res
574 ]
577def getnameinfo(sockaddr: IPSockAddrType, flags: int = 0) -> Awaitable[tuple[str, str]]:
578 """
579 Look up the host name of an IP address.
581 :param sockaddr: socket address (e.g. (ipaddress, port) for IPv4)
582 :param flags: flags to pass to upstream ``getnameinfo()``
583 :return: a tuple of (host name, service name)
585 .. seealso:: :func:`socket.getnameinfo`
587 """
588 return get_async_backend().getnameinfo(sockaddr, flags)
591def wait_socket_readable(sock: socket.socket) -> Awaitable[None]:
592 """
593 Wait until the given socket has data to be read.
595 This does **NOT** work on Windows when using the asyncio backend with a proactor
596 event loop (default on py3.8+).
598 .. warning:: Only use this on raw sockets that have not been wrapped by any higher
599 level constructs like socket streams!
601 :param sock: a socket object
602 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
603 socket to become readable
604 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
605 to become readable
607 """
608 return get_async_backend().wait_socket_readable(sock)
611def wait_socket_writable(sock: socket.socket) -> Awaitable[None]:
612 """
613 Wait until the given socket can be written to.
615 This does **NOT** work on Windows when using the asyncio backend with a proactor
616 event loop (default on py3.8+).
618 .. warning:: Only use this on raw sockets that have not been wrapped by any higher
619 level constructs like socket streams!
621 :param sock: a socket object
622 :raises ~anyio.ClosedResourceError: if the socket was closed while waiting for the
623 socket to become writable
624 :raises ~anyio.BusyResourceError: if another task is already waiting for the socket
625 to become writable
627 """
628 return get_async_backend().wait_socket_writable(sock)
631#
632# Private API
633#
636def convert_ipv6_sockaddr(
637 sockaddr: tuple[str, int, int, int] | tuple[str, int]
638) -> tuple[str, int]:
639 """
640 Convert a 4-tuple IPv6 socket address to a 2-tuple (address, port) format.
642 If the scope ID is nonzero, it is added to the address, separated with ``%``.
643 Otherwise the flow id and scope id are simply cut off from the tuple.
644 Any other kinds of socket addresses are returned as-is.
646 :param sockaddr: the result of :meth:`~socket.socket.getsockname`
647 :return: the converted socket address
649 """
650 # This is more complicated than it should be because of MyPy
651 if isinstance(sockaddr, tuple) and len(sockaddr) == 4:
652 host, port, flowinfo, scope_id = cast(Tuple[str, int, int, int], sockaddr)
653 if scope_id:
654 # PyPy (as of v7.3.11) leaves the interface name in the result, so
655 # we discard it and only get the scope ID from the end
656 # (https://foss.heptapod.net/pypy/pypy/-/issues/3938)
657 host = host.split("%")[0]
659 # Add scope_id to the address
660 return f"{host}%{scope_id}", port
661 else:
662 return host, port
663 else:
664 return cast(Tuple[str, int], sockaddr)
667async def setup_unix_local_socket(
668 path: None | str | PathLike[str],
669 mode: int | None,
670 socktype: int,
671) -> socket.socket:
672 """
673 Create a UNIX local socket object, deleting the socket at the given path if it
674 exists.
676 Not available on Windows.
678 :param path: path of the socket
679 :param mode: permissions to set on the socket
680 :param socktype: socket.SOCK_STREAM or socket.SOCK_DGRAM
682 """
683 if path is not None:
684 path_str = str(path)
685 path = Path(path)
686 if path.is_socket():
687 path.unlink()
688 else:
689 path_str = None
691 raw_socket = socket.socket(socket.AF_UNIX, socktype)
692 raw_socket.setblocking(False)
694 if path_str is not None:
695 try:
696 await to_thread.run_sync(raw_socket.bind, path_str, cancellable=True)
697 if mode is not None:
698 await to_thread.run_sync(chmod, path_str, mode, cancellable=True)
699 except BaseException:
700 raw_socket.close()
701 raise
703 return raw_socket