Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpcore/backends/sync.py: 30%
53 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1import socket
2import ssl
3import sys
4import typing
6from .._exceptions import (
7 ConnectError,
8 ConnectTimeout,
9 ExceptionMapping,
10 ReadError,
11 ReadTimeout,
12 WriteError,
13 WriteTimeout,
14 map_exceptions,
15)
16from .._utils import is_socket_readable
17from .base import NetworkBackend, NetworkStream
20class SyncStream(NetworkStream):
21 def __init__(self, sock: socket.socket) -> None:
22 self._sock = sock
24 def read(self, max_bytes: int, timeout: typing.Optional[float] = None) -> bytes:
25 exc_map: ExceptionMapping = {socket.timeout: ReadTimeout, OSError: ReadError}
26 with map_exceptions(exc_map):
27 self._sock.settimeout(timeout)
28 return self._sock.recv(max_bytes)
30 def write(self, buffer: bytes, timeout: typing.Optional[float] = None) -> None:
31 if not buffer:
32 return
34 exc_map: ExceptionMapping = {socket.timeout: WriteTimeout, OSError: WriteError}
35 with map_exceptions(exc_map):
36 while buffer:
37 self._sock.settimeout(timeout)
38 n = self._sock.send(buffer)
39 buffer = buffer[n:]
41 def close(self) -> None:
42 self._sock.close()
44 def start_tls(
45 self,
46 ssl_context: ssl.SSLContext,
47 server_hostname: typing.Optional[str] = None,
48 timeout: typing.Optional[float] = None,
49 ) -> NetworkStream:
50 exc_map: ExceptionMapping = {
51 socket.timeout: ConnectTimeout,
52 OSError: ConnectError,
53 }
54 with map_exceptions(exc_map):
55 try:
56 self._sock.settimeout(timeout)
57 sock = ssl_context.wrap_socket(
58 self._sock, server_hostname=server_hostname
59 )
60 except Exception as exc: # pragma: nocover
61 self.close()
62 raise exc
63 return SyncStream(sock)
65 def get_extra_info(self, info: str) -> typing.Any:
66 if info == "ssl_object" and isinstance(self._sock, ssl.SSLSocket):
67 return self._sock._sslobj # type: ignore
68 if info == "client_addr":
69 return self._sock.getsockname()
70 if info == "server_addr":
71 return self._sock.getpeername()
72 if info == "socket":
73 return self._sock
74 if info == "is_readable":
75 return is_socket_readable(self._sock)
76 return None
79class SyncBackend(NetworkBackend):
80 def connect_tcp(
81 self,
82 host: str,
83 port: int,
84 timeout: typing.Optional[float] = None,
85 local_address: typing.Optional[str] = None,
86 ) -> NetworkStream:
87 address = (host, port)
88 source_address = None if local_address is None else (local_address, 0)
89 exc_map: ExceptionMapping = {
90 socket.timeout: ConnectTimeout,
91 OSError: ConnectError,
92 }
93 with map_exceptions(exc_map):
94 sock = socket.create_connection(
95 address, timeout, source_address=source_address
96 )
97 return SyncStream(sock)
99 def connect_unix_socket(
100 self, path: str, timeout: typing.Optional[float] = None
101 ) -> NetworkStream: # pragma: nocover
102 if sys.platform == "win32":
103 raise RuntimeError(
104 "Attempted to connect to a UNIX socket on a Windows system."
105 )
107 exc_map: ExceptionMapping = {
108 socket.timeout: ConnectTimeout,
109 OSError: ConnectError,
110 }
111 with map_exceptions(exc_map):
112 sock = socket.socket(socket.AF_UNIX, socket.SOCK_STREAM)
113 sock.settimeout(timeout)
114 sock.connect(path)
115 return SyncStream(sock)