1import errno
2import io
3import socket
4from io import SEEK_END
5from typing import Optional, Union
6
7from ..exceptions import ConnectionError, TimeoutError
8from ..utils import SENTINEL, SSL_AVAILABLE
9
10NONBLOCKING_EXCEPTION_ERROR_NUMBERS = {BlockingIOError: errno.EWOULDBLOCK}
11
12if SSL_AVAILABLE:
13 import ssl
14
15 if hasattr(ssl, "SSLWantReadError"):
16 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantReadError] = 2
17 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLWantWriteError] = 2
18 else:
19 NONBLOCKING_EXCEPTION_ERROR_NUMBERS[ssl.SSLError] = 2
20
21NONBLOCKING_EXCEPTIONS = tuple(NONBLOCKING_EXCEPTION_ERROR_NUMBERS.keys())
22
23SERVER_CLOSED_CONNECTION_ERROR = "Connection closed by server."
24
25SYM_CRLF = b"\r\n"
26
27
28class SocketBuffer:
29 def __init__(
30 self, socket: socket.socket, socket_read_size: int, socket_timeout: float
31 ):
32 self._sock = socket
33 self.socket_read_size = socket_read_size
34 self.socket_timeout = socket_timeout
35 self._buffer = io.BytesIO()
36
37 def unread_bytes(self) -> int:
38 """
39 Remaining unread length of buffer
40 """
41 pos = self._buffer.tell()
42 end = self._buffer.seek(0, SEEK_END)
43 self._buffer.seek(pos)
44 return end - pos
45
46 def _read_from_socket(
47 self,
48 length: Optional[int] = None,
49 timeout: Union[float, object] = SENTINEL,
50 raise_on_timeout: Optional[bool] = True,
51 ) -> bool:
52 sock = self._sock
53 socket_read_size = self.socket_read_size
54 marker = 0
55 custom_timeout = timeout is not SENTINEL
56
57 buf = self._buffer
58 current_pos = buf.tell()
59 buf.seek(0, SEEK_END)
60 if custom_timeout:
61 sock.settimeout(timeout)
62 try:
63 while True:
64 data = sock.recv(socket_read_size)
65 # an empty string indicates the server shutdown the socket
66 if isinstance(data, bytes) and len(data) == 0:
67 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
68 buf.write(data)
69 data_length = len(data)
70 marker += data_length
71
72 if length is not None and length > marker:
73 continue
74 return True
75 except socket.timeout:
76 if raise_on_timeout:
77 raise TimeoutError("Timeout reading from socket")
78 return False
79 except NONBLOCKING_EXCEPTIONS as ex:
80 # if we're in nonblocking mode and the recv raises a
81 # blocking error, simply return False indicating that
82 # there's no data to be read. otherwise raise the
83 # original exception.
84 allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1)
85 if ex.errno == allowed:
86 if not raise_on_timeout:
87 return False
88 if timeout == 0:
89 raise TimeoutError("Timeout reading from socket")
90 raise ConnectionError(f"Error while reading from socket: {ex.args}")
91 finally:
92 buf.seek(current_pos)
93 if custom_timeout:
94 sock.settimeout(self.socket_timeout)
95
96 def can_read(self, timeout: float = 0) -> bool:
97 return bool(self.unread_bytes()) or self._read_from_socket(
98 timeout=timeout, raise_on_timeout=False
99 )
100
101 def read(self, length: int, timeout: Union[float, object] = SENTINEL) -> bytes:
102 length = length + 2 # make sure to read the \r\n terminator
103 # BufferIO will return less than requested if buffer is short
104 data = self._buffer.read(length)
105 missing = length - len(data)
106 if missing:
107 # fill up the buffer and read the remainder
108 self._read_from_socket(length=missing, timeout=timeout)
109 data += self._buffer.read(missing)
110 return data[:-2]
111
112 def readline(self, timeout: Union[float, object] = SENTINEL) -> bytes:
113 buf = self._buffer
114 data = buf.readline()
115 while not data.endswith(SYM_CRLF):
116 # there's more data in the socket that we need
117 self._read_from_socket(timeout=timeout)
118 data += buf.readline()
119
120 return data[:-2]
121
122 def get_pos(self) -> int:
123 """
124 Get current read position
125 """
126 return self._buffer.tell()
127
128 def rewind(self, pos: int) -> None:
129 """
130 Rewind the buffer to a specific position, to re-start reading
131 """
132 self._buffer.seek(pos)
133
134 def purge(self) -> None:
135 """
136 After a successful read, purge the read part of buffer
137 """
138 unread = self.unread_bytes()
139
140 # Only if we have read all of the buffer do we truncate, to
141 # reduce the amount of memory thrashing. This heuristic
142 # can be changed or removed later.
143 if unread > 0:
144 return
145
146 if unread > 0:
147 # move unread data to the front
148 view = self._buffer.getbuffer()
149 view[:unread] = view[-unread:]
150 self._buffer.truncate(unread)
151 self._buffer.seek(0)
152
153 def close(self) -> None:
154 try:
155 self._buffer.close()
156 except Exception:
157 # issue #633 suggests the purge/close somehow raised a
158 # BadFileDescriptor error. Perhaps the client ran out of
159 # memory or something else? It's probably OK to ignore
160 # any error being raised from purge/close since we're
161 # removing the reference to the instance below.
162 pass
163 self._buffer = None
164 self._sock = None