Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 11%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from logging import getLogger
2from typing import Any, Union
4from ..exceptions import ConnectionError, InvalidResponse, ResponseError
5from ..typing import EncodableT
6from .base import (
7 AsyncPushNotificationsParser,
8 PushNotificationsParser,
9 _AsyncRESPBase,
10 _RESPBase,
11)
12from .socket import SERVER_CLOSED_CONNECTION_ERROR
15class _RESP3Parser(_RESPBase, PushNotificationsParser):
16 """RESP3 protocol implementation"""
18 def __init__(self, socket_read_size):
19 super().__init__(socket_read_size)
20 self.pubsub_push_handler_func = self.handle_pubsub_push_response
21 self.node_moving_push_handler_func = None
22 self.maintenance_push_handler_func = None
23 self.oss_cluster_maint_push_handler_func = None
24 self.invalidation_push_handler_func = None
26 def handle_pubsub_push_response(self, response):
27 logger = getLogger("push_response")
28 logger.debug("Push response: " + str(response))
29 return response
31 def read_response(self, disable_decoding=False, push_request=False):
32 pos = self._buffer.get_pos() if self._buffer is not None else None
33 try:
34 result = self._read_response(
35 disable_decoding=disable_decoding, push_request=push_request
36 )
37 except BaseException:
38 if self._buffer is not None:
39 self._buffer.rewind(pos)
40 raise
41 else:
42 if self._buffer is not None:
43 try:
44 self._buffer.purge()
45 except AttributeError:
46 # Buffer may have been set to None by another thread after
47 # the check above; result is still valid so we don't raise
48 pass
49 return result
51 def _read_response(self, disable_decoding=False, push_request=False):
52 raw = self._buffer.readline()
53 if not raw:
54 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
56 byte, response = raw[:1], raw[1:]
58 # server returned an error
59 if byte in (b"-", b"!"):
60 if byte == b"!":
61 response = self._buffer.read(int(response))
62 response = response.decode("utf-8", errors="replace")
63 error = self.parse_error(response)
64 # if the error is a ConnectionError, raise immediately so the user
65 # is notified
66 if isinstance(error, ConnectionError):
67 raise error
68 # otherwise, we're dealing with a ResponseError that might belong
69 # inside a pipeline response. the connection's read_response()
70 # and/or the pipeline's execute() will raise this error if
71 # necessary, so just return the exception instance here.
72 return error
73 # single value
74 elif byte == b"+":
75 pass
76 # null value
77 elif byte == b"_":
78 return None
79 # int and big int values
80 elif byte in (b":", b"("):
81 return int(response)
82 # double value
83 elif byte == b",":
84 return float(response)
85 # bool value
86 elif byte == b"#":
87 return response == b"t"
88 # bulk response
89 elif byte == b"$":
90 response = self._buffer.read(int(response))
91 # verbatim string response
92 elif byte == b"=":
93 response = self._buffer.read(int(response))[4:]
94 # array response
95 elif byte == b"*":
96 response = [
97 self._read_response(disable_decoding=disable_decoding)
98 for _ in range(int(response))
99 ]
100 # set response
101 elif byte == b"~":
102 # redis can return unhashable types (like dict) in a set,
103 # so we return sets as list, all the time, for predictability
104 response = [
105 self._read_response(disable_decoding=disable_decoding)
106 for _ in range(int(response))
107 ]
108 # map response
109 elif byte == b"%":
110 # We cannot use a dict-comprehension to parse stream.
111 # Evaluation order of key:val expression in dict comprehension only
112 # became defined to be left-right in version 3.8
113 resp_dict = {}
114 for _ in range(int(response)):
115 key = self._read_response(disable_decoding=disable_decoding)
116 resp_dict[key] = self._read_response(
117 disable_decoding=disable_decoding, push_request=push_request
118 )
119 response = resp_dict
120 # push response
121 elif byte == b">":
122 response = [
123 self._read_response(
124 disable_decoding=disable_decoding, push_request=push_request
125 )
126 for _ in range(int(response))
127 ]
128 response = self.handle_push_response(response)
130 # if this is a push request return the push response
131 if push_request:
132 return response
134 return self._read_response(
135 disable_decoding=disable_decoding,
136 push_request=push_request,
137 )
138 else:
139 raise InvalidResponse(f"Protocol Error: {raw!r}")
141 if isinstance(response, bytes) and disable_decoding is False:
142 response = self.encoder.decode(response)
144 return response
147class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser):
148 def __init__(self, socket_read_size):
149 super().__init__(socket_read_size)
150 self.pubsub_push_handler_func = self.handle_pubsub_push_response
151 self.invalidation_push_handler_func = None
153 async def handle_pubsub_push_response(self, response):
154 logger = getLogger("push_response")
155 logger.debug("Push response: " + str(response))
156 return response
158 async def read_response(
159 self, disable_decoding: bool = False, push_request: bool = False
160 ):
161 if self._chunks:
162 # augment parsing buffer with previously read data
163 self._buffer += b"".join(self._chunks)
164 self._chunks.clear()
165 self._pos = 0
166 response = await self._read_response(
167 disable_decoding=disable_decoding, push_request=push_request
168 )
169 # Successfully parsing a response allows us to clear our parsing buffer
170 self._clear()
171 return response
173 async def _read_response(
174 self, disable_decoding: bool = False, push_request: bool = False
175 ) -> Union[EncodableT, ResponseError, None]:
176 if not self._stream or not self.encoder:
177 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
178 raw = await self._readline()
179 response: Any
180 byte, response = raw[:1], raw[1:]
182 # if byte not in (b"-", b"+", b":", b"$", b"*"):
183 # raise InvalidResponse(f"Protocol Error: {raw!r}")
185 # server returned an error
186 if byte in (b"-", b"!"):
187 if byte == b"!":
188 response = await self._read(int(response))
189 response = response.decode("utf-8", errors="replace")
190 error = self.parse_error(response)
191 # if the error is a ConnectionError, raise immediately so the user
192 # is notified
193 if isinstance(error, ConnectionError):
194 self._clear() # Successful parse
195 raise error
196 # otherwise, we're dealing with a ResponseError that might belong
197 # inside a pipeline response. the connection's read_response()
198 # and/or the pipeline's execute() will raise this error if
199 # necessary, so just return the exception instance here.
200 return error
201 # single value
202 elif byte == b"+":
203 pass
204 # null value
205 elif byte == b"_":
206 return None
207 # int and big int values
208 elif byte in (b":", b"("):
209 return int(response)
210 # double value
211 elif byte == b",":
212 return float(response)
213 # bool value
214 elif byte == b"#":
215 return response == b"t"
216 # bulk response
217 elif byte == b"$":
218 response = await self._read(int(response))
219 # verbatim string response
220 elif byte == b"=":
221 response = (await self._read(int(response)))[4:]
222 # array response
223 elif byte == b"*":
224 response = [
225 (await self._read_response(disable_decoding=disable_decoding))
226 for _ in range(int(response))
227 ]
228 # set response
229 elif byte == b"~":
230 # redis can return unhashable types (like dict) in a set,
231 # so we always convert to a list, to have predictable return types
232 response = [
233 (await self._read_response(disable_decoding=disable_decoding))
234 for _ in range(int(response))
235 ]
236 # map response
237 elif byte == b"%":
238 # We cannot use a dict-comprehension to parse stream.
239 # Evaluation order of key:val expression in dict comprehension only
240 # became defined to be left-right in version 3.8
241 resp_dict = {}
242 for _ in range(int(response)):
243 key = await self._read_response(disable_decoding=disable_decoding)
244 resp_dict[key] = await self._read_response(
245 disable_decoding=disable_decoding, push_request=push_request
246 )
247 response = resp_dict
248 # push response
249 elif byte == b">":
250 response = [
251 (
252 await self._read_response(
253 disable_decoding=disable_decoding, push_request=push_request
254 )
255 )
256 for _ in range(int(response))
257 ]
258 response = await self.handle_push_response(response)
259 if not push_request:
260 return await self._read_response(
261 disable_decoding=disable_decoding, push_request=push_request
262 )
263 else:
264 return response
265 else:
266 raise InvalidResponse(f"Protocol Error: {raw!r}")
268 if isinstance(response, bytes) and disable_decoding is False:
269 response = self.encoder.decode(response)
270 return response