Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/_parsers/socket.py: 29%
98 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import errno
2import io
3import socket
4from io import SEEK_END
5from typing import Optional, Union
7from ..exceptions import ConnectionError, TimeoutError
8from ..utils import SSL_AVAILABLE
10NONBLOCKING_EXCEPTION_ERROR_NUMBERS = {BlockingIOError: errno.EWOULDBLOCK}
12if SSL_AVAILABLE:
13 import ssl
15 if hasattr(ssl, "SSLWantReadError"):
16 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantReadError] = 2
17 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantWriteError] = 2
18 else:
19 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLError] = 2
21NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys())
23SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."
24SENTINEL = object()
26SYM_CRLF = b"\r\n"
29class SocketBuffer:
30 def __init__(
31 self, socket: socket.socket, socket_read_size: int, socket_timeout: float
32 ):
33 self._sock = socket
34 self.socket_read_size = socket_read_size
35 self.socket_timeout = socket_timeout
36 self._buffer = io.BytesIO()
38 def unread_bytes(self) -> int:
39 """
40 Remaining unread length of buffer
41 """
42 pos = self._buffer.tell()
43 end = self._buffer.seek(0, SEEK_END)
44 self._buffer.seek(pos)
45 return end - pos
47 def _read_from_socket(
48 self,
49 length: Optional[int] = None,
50 timeout: Union[float, object] = SENTINEL,
51 raise_on_timeout: Optional[bool] = True,
52 ) -> bool:
53 sock = self._sock
54 socket_read_size = self.socket_read_size
55 marker = 0
56 custom_timeout = timeout is not SENTINEL
58 buf = self._buffer
59 current_pos = buf.tell()
60 buf.seek(0, SEEK_END)
61 if custom_timeout:
62 sock.settimeout(timeout)
63 try:
64 while True:
65 data = self._sock.recv(socket_read_size)
66 # an empty string indicates the server shutdown the socket
67 if isinstance(data, bytes) and len(data) == 0:
68 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
69 buf.write(data)
70 data_length = len(data)
71 marker += data_length
73 if length is not None and length > marker:
74 continue
75 return True
76 except socket.timeout:
77 if raise_on_timeout:
78 raise TimeoutError("Timeout reading from socket")
79 return False
80 except NONBLOCKING_EXCEPTIONS as ex:
81 # if we're in nonblocking mode and the recv raises a
82 # blocking error, simply return False indicating that
83 # there's no data to be read. otherwise raise the
84 # original exception.
85 allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1)
86 if not raise_on_timeout and ex.errno == allowed:
87 return False
88 raise ConnectionError(f"Error while reading from socket: {ex.args}")
89 finally:
90 buf.seek(current_pos)
91 if custom_timeout:
92 sock.settimeout(self.socket_timeout)
94 def can_read(self, timeout: float) -> bool:
95 return bool(self.unread_bytes()) or self._read_from_socket(
96 timeout=timeout, raise_on_timeout=False
97 )
99 def read(self, length: int) -> bytes:
100 length = length + 2 # make sure to read the \r\n terminator
101 # BufferIO will return less than requested if buffer is short
102 data = self._buffer.read(length)
103 missing = length - len(data)
104 if missing:
105 # fill up the buffer and read the remainder
106 self._read_from_socket(missing)
107 data += self._buffer.read(missing)
108 return data[:-2]
110 def readline(self) -> bytes:
111 buf = self._buffer
112 data = buf.readline()
113 while not data.endswith(SYM_CRLF):
114 # there's more data in the socket that we need
115 self._read_from_socket()
116 data += buf.readline()
118 return data[:-2]
120 def get_pos(self) -> int:
121 """
122 Get current read position
123 """
124 return self._buffer.tell()
126 def rewind(self, pos: int) -> None:
127 """
128 Rewind the buffer to a specific position, to re-start reading
129 """
130 self._buffer.seek(pos)
132 def purge(self) -> None:
133 """
134 After a successful read, purge the read part of buffer
135 """
136 unread = self.unread_bytes()
138 # Only if we have read all of the buffer do we truncate, to
139 # reduce the amount of memory thrashing. This heuristic
140 # can be changed or removed later.
141 if unread > 0:
142 return
144 if unread > 0:
145 # move unread data to the front
146 view = self._buffer.getbuffer()
147 view[:unread] = view[-unread:]
148 self._buffer.truncate(unread)
149 self._buffer.seek(0)
151 def close(self) -> None:
152 try:
153 self._buffer.close()
154 except Exception:
155 # issue #633 suggests the purge/close somehow raised a
156 # BadFileDescriptor error. Perhaps the client ran out of
157 # memory or something else? It's probably OK to ignore
158 # any error being raised from purge/close since we're
159 # removing the reference to the instance below.
160 pass
161 self._buffer = None
162 self._sock = None