Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 11%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from logging import getLogger
2from typing import Any, Union
4from ..exceptions import ConnectionError, InvalidResponse, ResponseError
5from ..typing import EncodableT
6from .base import (
7 AsyncPushNotificationsParser,
8 PushNotificationsParser,
9 _AsyncRESPBase,
10 _RESPBase,
11)
12from .socket import SENTINEL, SERVER_CLOSED_CONNECTION_ERROR
15class _RESP3Parser(_RESPBase, PushNotificationsParser):
16 """RESP3 protocol implementation"""
18 def __init__(self, socket_read_size):
19 super().__init__(socket_read_size)
20 self.pubsub_push_handler_func = self.handle_pubsub_push_response
21 self.node_moving_push_handler_func = None
22 self.maintenance_push_handler_func = None
23 self.oss_cluster_maint_push_handler_func = None
24 self.invalidation_push_handler_func = None
26 def handle_pubsub_push_response(self, response):
27 logger = getLogger("push_response")
28 logger.debug("Push response: " + str(response))
29 return response
31 def read_response(
32 self,
33 disable_decoding=False,
34 push_request=False,
35 timeout: Union[float, object] = SENTINEL,
36 ):
37 pos = self._buffer.get_pos() if self._buffer is not None else None
38 try:
39 result = self._read_response(
40 disable_decoding=disable_decoding,
41 push_request=push_request,
42 timeout=timeout,
43 )
44 except BaseException:
45 if self._buffer is not None:
46 self._buffer.rewind(pos)
47 raise
48 else:
49 if self._buffer is not None:
50 try:
51 self._buffer.purge()
52 except AttributeError:
53 # Buffer may have been set to None by another thread after
54 # the check above; result is still valid so we don't raise
55 pass
56 return result
58 def _read_response(
59 self,
60 disable_decoding=False,
61 push_request=False,
62 timeout: Union[float, object] = SENTINEL,
63 ):
64 raw = self._buffer.readline(timeout=timeout)
65 if not raw:
66 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
68 byte, response = raw[:1], raw[1:]
70 # server returned an error
71 if byte in (b"-", b"!"):
72 if byte == b"!":
73 response = self._buffer.read(int(response), timeout=timeout)
74 response = response.decode("utf-8", errors="replace")
75 error = self.parse_error(response)
76 # if the error is a ConnectionError, raise immediately so the user
77 # is notified
78 if isinstance(error, ConnectionError):
79 raise error
80 # otherwise, we're dealing with a ResponseError that might belong
81 # inside a pipeline response. the connection's read_response()
82 # and/or the pipeline's execute() will raise this error if
83 # necessary, so just return the exception instance here.
84 return error
85 # single value
86 elif byte == b"+":
87 pass
88 # null value
89 elif byte == b"_":
90 return None
91 # int and big int values
92 elif byte in (b":", b"("):
93 return int(response)
94 # double value
95 elif byte == b",":
96 return float(response)
97 # bool value
98 elif byte == b"#":
99 return response == b"t"
100 # bulk response
101 elif byte == b"$":
102 response = self._buffer.read(int(response), timeout=timeout)
103 # verbatim string response
104 elif byte == b"=":
105 response = self._buffer.read(int(response), timeout=timeout)[4:]
106 # array response
107 elif byte == b"*":
108 response = [
109 self._read_response(disable_decoding=disable_decoding, timeout=timeout)
110 for _ in range(int(response))
111 ]
112 # set response
113 elif byte == b"~":
114 # redis can return unhashable types (like dict) in a set,
115 # so we return sets as list, all the time, for predictability
116 response = [
117 self._read_response(disable_decoding=disable_decoding, timeout=timeout)
118 for _ in range(int(response))
119 ]
120 # map response
121 elif byte == b"%":
122 # We cannot use a dict-comprehension to parse stream.
123 # Evaluation order of key:val expression in dict comprehension only
124 # became defined to be left-right in version 3.8
125 resp_dict = {}
126 for _ in range(int(response)):
127 key = self._read_response(
128 disable_decoding=disable_decoding, timeout=timeout
129 )
130 resp_dict[key] = self._read_response(
131 disable_decoding=disable_decoding,
132 push_request=push_request,
133 timeout=timeout,
134 )
135 response = resp_dict
136 # push response
137 elif byte == b">":
138 response = [
139 self._read_response(
140 disable_decoding=disable_decoding,
141 push_request=push_request,
142 timeout=timeout,
143 )
144 for _ in range(int(response))
145 ]
146 response = self.handle_push_response(response)
148 # if this is a push request return the push response
149 if push_request:
150 return response
152 return self._read_response(
153 disable_decoding=disable_decoding,
154 push_request=push_request,
155 )
156 else:
157 raise InvalidResponse(f"Protocol Error: {raw!r}")
159 if isinstance(response, bytes) and disable_decoding is False:
160 response = self.encoder.decode(response)
162 return response
165class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser):
166 def __init__(self, socket_read_size):
167 super().__init__(socket_read_size)
168 self.pubsub_push_handler_func = self.handle_pubsub_push_response
169 self.invalidation_push_handler_func = None
171 async def handle_pubsub_push_response(self, response):
172 logger = getLogger("push_response")
173 logger.debug("Push response: " + str(response))
174 return response
176 async def read_response(
177 self, disable_decoding: bool = False, push_request: bool = False
178 ):
179 if self._chunks:
180 # augment parsing buffer with previously read data
181 self._buffer += b"".join(self._chunks)
182 self._chunks.clear()
183 self._pos = 0
184 response = await self._read_response(
185 disable_decoding=disable_decoding, push_request=push_request
186 )
187 # Successfully parsing a response allows us to clear our parsing buffer
188 self._clear()
189 return response
191 async def _read_response(
192 self, disable_decoding: bool = False, push_request: bool = False
193 ) -> Union[EncodableT, ResponseError, None]:
194 if not self._stream or not self.encoder:
195 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
196 raw = await self._readline()
197 response: Any
198 byte, response = raw[:1], raw[1:]
200 # if byte not in (b"-", b"+", b":", b"$", b"*"):
201 # raise InvalidResponse(f"Protocol Error: {raw!r}")
203 # server returned an error
204 if byte in (b"-", b"!"):
205 if byte == b"!":
206 response = await self._read(int(response))
207 response = response.decode("utf-8", errors="replace")
208 error = self.parse_error(response)
209 # if the error is a ConnectionError, raise immediately so the user
210 # is notified
211 if isinstance(error, ConnectionError):
212 self._clear() # Successful parse
213 raise error
214 # otherwise, we're dealing with a ResponseError that might belong
215 # inside a pipeline response. the connection's read_response()
216 # and/or the pipeline's execute() will raise this error if
217 # necessary, so just return the exception instance here.
218 return error
219 # single value
220 elif byte == b"+":
221 pass
222 # null value
223 elif byte == b"_":
224 return None
225 # int and big int values
226 elif byte in (b":", b"("):
227 return int(response)
228 # double value
229 elif byte == b",":
230 return float(response)
231 # bool value
232 elif byte == b"#":
233 return response == b"t"
234 # bulk response
235 elif byte == b"$":
236 response = await self._read(int(response))
237 # verbatim string response
238 elif byte == b"=":
239 response = (await self._read(int(response)))[4:]
240 # array response
241 elif byte == b"*":
242 response = [
243 (await self._read_response(disable_decoding=disable_decoding))
244 for _ in range(int(response))
245 ]
246 # set response
247 elif byte == b"~":
248 # redis can return unhashable types (like dict) in a set,
249 # so we always convert to a list, to have predictable return types
250 response = [
251 (await self._read_response(disable_decoding=disable_decoding))
252 for _ in range(int(response))
253 ]
254 # map response
255 elif byte == b"%":
256 # We cannot use a dict-comprehension to parse stream.
257 # Evaluation order of key:val expression in dict comprehension only
258 # became defined to be left-right in version 3.8
259 resp_dict = {}
260 for _ in range(int(response)):
261 key = await self._read_response(disable_decoding=disable_decoding)
262 resp_dict[key] = await self._read_response(
263 disable_decoding=disable_decoding, push_request=push_request
264 )
265 response = resp_dict
266 # push response
267 elif byte == b">":
268 response = [
269 (
270 await self._read_response(
271 disable_decoding=disable_decoding, push_request=push_request
272 )
273 )
274 for _ in range(int(response))
275 ]
276 response = await self.handle_push_response(response)
277 if not push_request:
278 return await self._read_response(
279 disable_decoding=disable_decoding, push_request=push_request
280 )
281 else:
282 return response
283 else:
284 raise InvalidResponse(f"Protocol Error: {raw!r}")
286 if isinstance(response, bytes) and disable_decoding is False:
287 response = self.encoder.decode(response)
288 return response