Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/sync.py: 29%
56 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1import socket
2import ssl
3import sys
4import typing
6from .._exceptions import (
7 ConnectError,
8 ConnectTimeout,
9 ExceptionMapping,
10 ReadError,
11 ReadTimeout,
12 WriteError,
13 WriteTimeout,
14 map_exceptions,
15)
16from .._utils import is_socket_readable
17from .base import SOCKET_OPTION, NetworkBackend, NetworkStream
20class SyncStream(NetworkStream):
21 def __init__(self, sock: socket.socket) -> None:
22 self._sock = sock
24 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes:
25 exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError}
26 with map_exceptions(exc_map):
27 self._sock.settimeout(timeout)
28 return self._sock.recv(max_bytes)
30 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None:
31 if not buffer:
32 return
34 exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError}
35 with map_exceptions(exc_map):
36 while buffer:
37 self._sock.settimeout(timeout)
38 n = self._sock.send(buffer)
39 buffer = buffer[n:]
41 def close(self) -> None:
42 self._sock.close()
44 def start_tls(
45 self,
46 ssl_context: ssl.SSLContext,
47 server_hostname: typing.Optional[str] = None,
48 timeout: typing.Optional[float] = None,
49 ) -> NetworkStream:
50 exc_map: ExceptionMapping = {
51 socket.timeout: ConnectTimeout,
52 OSError: ConnectError,
53 }
54 with map_exceptions(exc_map):
55 try:
56 self._sock.settimeout(timeout)
57 sock = ssl_context.wrap_socket(
58 self._sock, server_hostname=server_hostname
59 )
60 except Exception as exc: # pragma: nocover
61 self.close()
62 raise exc
63 return SyncStream(sock)
65 def get_extra_info(self, info: str) -> typing.Any:
66 if info == "ssl_object" and isinstance(self._sock, ssl.SSLSocket):
67 return self._sock._sslobj # type: ignore
68 if info == "client_addr":
69 return self._sock.getsockname()
70 if info == "server_addr":
71 return self._sock.getpeername()
72 if info == "socket":
73 return self._sock
74 if info == "is_readable":
75 return is_socket_readable(self._sock)
76 return None
79class SyncBackend(NetworkBackend):
80 def connect_tcp(
81 self,
82 host: str,
83 port: int,
84 timeout: typing.Optional[float] = None,
85 local_address: typing.Optional[str] = None,
86 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
87 ) -> NetworkStream:
88 # Note that we automatically include `TCP_NODELAY`
89 # in addition to any other custom socket options.
90 if socket_options is None:
91 socket_options = [] # pragma: no cover
92 address = (host, port)
93 source_address = None if local_address is None else (local_address, 0)
94 exc_map: ExceptionMapping = {
95 socket.timeout: ConnectTimeout,
96 OSError: ConnectError,
97 }
99 with map_exceptions(exc_map):
100 sock = socket.create_connection(
101 address,
102 timeout,
103 source_address=source_address,
104 )
105 for option in socket_options:
106 sock.setsockopt(*option) # pragma: no cover
107 sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
108 return SyncStream(sock)
110 def connect_unix_socket(
111 self,
112 path: str,
113 timeout: typing.Optional[float] = None,
114 socket_options: typing.Optional[typing.Iterable[SOCKET_OPTION]] = None,
115 ) -> NetworkStream: # pragma: nocover
116 if sys.platform == "win32":
117 raise RuntimeError(
118 "Attempted to connect to a UNIX socket on a Windows system."
119 )
120 if socket_options is None:
121 socket_options = []
123 exc_map: ExceptionMapping = {
124 socket.timeout: ConnectTimeout,
125 OSError: ConnectError,
126 }
127 with map_exceptions(exc_map):
128 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
129 for option in socket_options:
130 sock.setsockopt(*option)
131 sock.settimeout(timeout)
132 sock.connect(path)
133 return SyncStream(sock)