Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 12%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from logging import getLogger
2from typing import Any, Union
4from ..exceptions import ConnectionError, InvalidResponse, ResponseError
5from ..typing import EncodableT
6from ..utils import SENTINEL
7from .base import (
8 AsyncPushNotificationsParser,
9 PushNotificationsParser,
10 _AsyncRESPBase,
11 _RESPBase,
12)
13from .socket import SERVER_CLOSED_CONNECTION_ERROR
16class _RESP3Parser(_RESPBase, PushNotificationsParser):
17 """RESP3 protocol implementation"""
19 def __init__(self, socket_read_size):
20 super().__init__(socket_read_size)
21 self.pubsub_push_handler_func = self.handle_pubsub_push_response
22 self.node_moving_push_handler_func = None
23 self.maintenance_push_handler_func = None
24 self.oss_cluster_maint_push_handler_func = None
25 self.invalidation_push_handler_func = None
27 def handle_pubsub_push_response(self, response):
28 logger = getLogger("push_response")
29 logger.debug("Push response: " + str(response))
30 return response
32 def read_response(
33 self,
34 disable_decoding=False,
35 push_request=False,
36 timeout: Union[float, object] = SENTINEL,
37 ):
38 pos = self._buffer.get_pos() if self._buffer is not None else None
39 try:
40 result = self._read_response(
41 disable_decoding=disable_decoding,
42 push_request=push_request,
43 timeout=timeout,
44 )
45 except BaseException:
46 if self._buffer is not None:
47 self._buffer.rewind(pos)
48 raise
49 else:
50 if self._buffer is not None:
51 try:
52 self._buffer.purge()
53 except AttributeError:
54 # Buffer may have been set to None by another thread after
55 # the check above; result is still valid so we don't raise
56 pass
57 return result
59 def _read_response(
60 self,
61 disable_decoding=False,
62 push_request=False,
63 timeout: Union[float, object] = SENTINEL,
64 ):
65 raw = self._buffer.readline(timeout=timeout)
66 if not raw:
67 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
69 byte, response = raw[:1], raw[1:]
71 # server returned an error
72 if byte in (b"-", b"!"):
73 if byte == b"!":
74 response = self._buffer.read(int(response), timeout=timeout)
75 response = response.decode("utf-8", errors="replace")
76 error = self.parse_error(response)
77 # if the error is a ConnectionError, raise immediately so the user
78 # is notified
79 if isinstance(error, ConnectionError):
80 raise error
81 # otherwise, we're dealing with a ResponseError that might belong
82 # inside a pipeline response. the connection's read_response()
83 # and/or the pipeline's execute() will raise this error if
84 # necessary, so just return the exception instance here.
85 return error
86 # single value
87 elif byte == b"+":
88 pass
89 # null value
90 elif byte == b"_":
91 return None
92 # int and big int values
93 elif byte in (b":", b"("):
94 return int(response)
95 # double value
96 elif byte == b",":
97 return float(response)
98 # bool value
99 elif byte == b"#":
100 return response == b"t"
101 # bulk response
102 elif byte == b"$":
103 response = self._buffer.read(int(response), timeout=timeout)
104 # verbatim string response
105 elif byte == b"=":
106 response = self._buffer.read(int(response), timeout=timeout)[4:]
107 # array response
108 elif byte == b"*":
109 response = [
110 self._read_response(disable_decoding=disable_decoding, timeout=timeout)
111 for _ in range(int(response))
112 ]
113 # set response
114 elif byte == b"~":
115 # redis can return unhashable types (like dict) in a set,
116 # so we return sets as list, all the time, for predictability
117 response = [
118 self._read_response(disable_decoding=disable_decoding, timeout=timeout)
119 for _ in range(int(response))
120 ]
121 # map response
122 elif byte == b"%":
123 # We cannot use a dict-comprehension to parse stream.
124 # Evaluation order of key:val expression in dict comprehension only
125 # became defined to be left-right in version 3.8
126 resp_dict = {}
127 for _ in range(int(response)):
128 key = self._read_response(
129 disable_decoding=disable_decoding, timeout=timeout
130 )
131 resp_dict[key] = self._read_response(
132 disable_decoding=disable_decoding,
133 push_request=push_request,
134 timeout=timeout,
135 )
136 response = resp_dict
137 # push response
138 elif byte == b">":
139 response = [
140 self._read_response(
141 disable_decoding=disable_decoding,
142 push_request=push_request,
143 timeout=timeout,
144 )
145 for _ in range(int(response))
146 ]
147 response = self.handle_push_response(response)
149 # if this is a push request return the push response
150 if push_request:
151 return response
153 return self._read_response(
154 disable_decoding=disable_decoding,
155 push_request=push_request,
156 )
157 else:
158 raise InvalidResponse(f"Protocol Error: {raw!r}")
160 if isinstance(response, bytes) and disable_decoding is False:
161 response = self.encoder.decode(response)
163 return response
166class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser):
167 def __init__(self, socket_read_size):
168 super().__init__(socket_read_size)
169 self.pubsub_push_handler_func = self.handle_pubsub_push_response
170 self.invalidation_push_handler_func = None
172 async def handle_pubsub_push_response(self, response):
173 logger = getLogger("push_response")
174 logger.debug("Push response: " + str(response))
175 return response
177 async def read_response(
178 self, disable_decoding: bool = False, push_request: bool = False
179 ):
180 if self._chunks:
181 # augment parsing buffer with previously read data
182 self._buffer += b"".join(self._chunks)
183 self._chunks.clear()
184 self._pos = 0
185 response = await self._read_response(
186 disable_decoding=disable_decoding, push_request=push_request
187 )
188 # Successfully parsing a response allows us to clear our parsing buffer
189 self._clear()
190 return response
192 async def _read_response(
193 self, disable_decoding: bool = False, push_request: bool = False
194 ) -> Union[EncodableT, ResponseError, None]:
195 if not self._stream or not self.encoder:
196 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
197 raw = await self._readline()
198 response: Any
199 byte, response = raw[:1], raw[1:]
201 # if byte not in (b"-", b"+", b":", b"$", b"*"):
202 # raise InvalidResponse(f"Protocol Error: {raw!r}")
204 # server returned an error
205 if byte in (b"-", b"!"):
206 if byte == b"!":
207 response = await self._read(int(response))
208 response = response.decode("utf-8", errors="replace")
209 error = self.parse_error(response)
210 # if the error is a ConnectionError, raise immediately so the user
211 # is notified
212 if isinstance(error, ConnectionError):
213 self._clear() # Successful parse
214 raise error
215 # otherwise, we're dealing with a ResponseError that might belong
216 # inside a pipeline response. the connection's read_response()
217 # and/or the pipeline's execute() will raise this error if
218 # necessary, so just return the exception instance here.
219 return error
220 # single value
221 elif byte == b"+":
222 pass
223 # null value
224 elif byte == b"_":
225 return None
226 # int and big int values
227 elif byte in (b":", b"("):
228 return int(response)
229 # double value
230 elif byte == b",":
231 return float(response)
232 # bool value
233 elif byte == b"#":
234 return response == b"t"
235 # bulk response
236 elif byte == b"$":
237 response = await self._read(int(response))
238 # verbatim string response
239 elif byte == b"=":
240 response = (await self._read(int(response)))[4:]
241 # array response
242 elif byte == b"*":
243 response = [
244 (await self._read_response(disable_decoding=disable_decoding))
245 for _ in range(int(response))
246 ]
247 # set response
248 elif byte == b"~":
249 # redis can return unhashable types (like dict) in a set,
250 # so we always convert to a list, to have predictable return types
251 response = [
252 (await self._read_response(disable_decoding=disable_decoding))
253 for _ in range(int(response))
254 ]
255 # map response
256 elif byte == b"%":
257 # We cannot use a dict-comprehension to parse stream.
258 # Evaluation order of key:val expression in dict comprehension only
259 # became defined to be left-right in version 3.8
260 resp_dict = {}
261 for _ in range(int(response)):
262 key = await self._read_response(disable_decoding=disable_decoding)
263 resp_dict[key] = await self._read_response(
264 disable_decoding=disable_decoding, push_request=push_request
265 )
266 response = resp_dict
267 # push response
268 elif byte == b">":
269 response = [
270 (
271 await self._read_response(
272 disable_decoding=disable_decoding, push_request=push_request
273 )
274 )
275 for _ in range(int(response))
276 ]
277 response = await self.handle_push_response(response)
278 if not push_request:
279 return await self._read_response(
280 disable_decoding=disable_decoding, push_request=push_request
281 )
282 else:
283 return response
284 else:
285 raise InvalidResponse(f"Protocol Error: {raw!r}")
287 if isinstance(response, bytes) and disable_decoding is False:
288 response = self.encoder.decode(response)
289 return response