Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 11%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from logging import getLogger
2from typing import Any, Union
4from ..exceptions import ConnectionError, InvalidResponse, ResponseError
5from ..typing import EncodableT
6from .base import (
7 AsyncPushNotificationsParser,
8 PushNotificationsParser,
9 _AsyncRESPBase,
10 _RESPBase,
11)
12from .socket import SERVER_CLOSED_CONNECTION_ERROR
15class _RESP3Parser(_RESPBase, PushNotificationsParser):
16 """RESP3 protocol implementation"""
18 def __init__(self, socket_read_size):
19 super().__init__(socket_read_size)
20 self.pubsub_push_handler_func = self.handle_pubsub_push_response
21 self.node_moving_push_handler_func = None
22 self.maintenance_push_handler_func = None
23 self.oss_cluster_maint_push_handler_func = None
24 self.invalidation_push_handler_func = None
26 def handle_pubsub_push_response(self, response):
27 logger = getLogger("push_response")
28 logger.debug("Push response: " + str(response))
29 return response
31 def read_response(self, disable_decoding=False, push_request=False):
32 pos = self._buffer.get_pos() if self._buffer else None
33 try:
34 result = self._read_response(
35 disable_decoding=disable_decoding, push_request=push_request
36 )
37 except BaseException:
38 if self._buffer:
39 self._buffer.rewind(pos)
40 raise
41 else:
42 self._buffer.purge()
43 return result
45 def _read_response(self, disable_decoding=False, push_request=False):
46 raw = self._buffer.readline()
47 if not raw:
48 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
50 byte, response = raw[:1], raw[1:]
52 # server returned an error
53 if byte in (b"-", b"!"):
54 if byte == b"!":
55 response = self._buffer.read(int(response))
56 response = response.decode("utf-8", errors="replace")
57 error = self.parse_error(response)
58 # if the error is a ConnectionError, raise immediately so the user
59 # is notified
60 if isinstance(error, ConnectionError):
61 raise error
62 # otherwise, we're dealing with a ResponseError that might belong
63 # inside a pipeline response. the connection's read_response()
64 # and/or the pipeline's execute() will raise this error if
65 # necessary, so just return the exception instance here.
66 return error
67 # single value
68 elif byte == b"+":
69 pass
70 # null value
71 elif byte == b"_":
72 return None
73 # int and big int values
74 elif byte in (b":", b"("):
75 return int(response)
76 # double value
77 elif byte == b",":
78 return float(response)
79 # bool value
80 elif byte == b"#":
81 return response == b"t"
82 # bulk response
83 elif byte == b"$":
84 response = self._buffer.read(int(response))
85 # verbatim string response
86 elif byte == b"=":
87 response = self._buffer.read(int(response))[4:]
88 # array response
89 elif byte == b"*":
90 response = [
91 self._read_response(disable_decoding=disable_decoding)
92 for _ in range(int(response))
93 ]
94 # set response
95 elif byte == b"~":
96 # redis can return unhashable types (like dict) in a set,
97 # so we return sets as list, all the time, for predictability
98 response = [
99 self._read_response(disable_decoding=disable_decoding)
100 for _ in range(int(response))
101 ]
102 # map response
103 elif byte == b"%":
104 # We cannot use a dict-comprehension to parse stream.
105 # Evaluation order of key:val expression in dict comprehension only
106 # became defined to be left-right in version 3.8
107 resp_dict = {}
108 for _ in range(int(response)):
109 key = self._read_response(disable_decoding=disable_decoding)
110 resp_dict[key] = self._read_response(
111 disable_decoding=disable_decoding, push_request=push_request
112 )
113 response = resp_dict
114 # push response
115 elif byte == b">":
116 response = [
117 self._read_response(
118 disable_decoding=disable_decoding, push_request=push_request
119 )
120 for _ in range(int(response))
121 ]
122 response = self.handle_push_response(response)
124 # if this is a push request return the push response
125 if push_request:
126 return response
128 return self._read_response(
129 disable_decoding=disable_decoding,
130 push_request=push_request,
131 )
132 else:
133 raise InvalidResponse(f"Protocol Error: {raw!r}")
135 if isinstance(response, bytes) and disable_decoding is False:
136 response = self.encoder.decode(response)
138 return response
141class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser):
142 def __init__(self, socket_read_size):
143 super().__init__(socket_read_size)
144 self.pubsub_push_handler_func = self.handle_pubsub_push_response
145 self.invalidation_push_handler_func = None
147 async def handle_pubsub_push_response(self, response):
148 logger = getLogger("push_response")
149 logger.debug("Push response: " + str(response))
150 return response
152 async def read_response(
153 self, disable_decoding: bool = False, push_request: bool = False
154 ):
155 if self._chunks:
156 # augment parsing buffer with previously read data
157 self._buffer += b"".join(self._chunks)
158 self._chunks.clear()
159 self._pos = 0
160 response = await self._read_response(
161 disable_decoding=disable_decoding, push_request=push_request
162 )
163 # Successfully parsing a response allows us to clear our parsing buffer
164 self._clear()
165 return response
167 async def _read_response(
168 self, disable_decoding: bool = False, push_request: bool = False
169 ) -> Union[EncodableT, ResponseError, None]:
170 if not self._stream or not self.encoder:
171 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
172 raw = await self._readline()
173 response: Any
174 byte, response = raw[:1], raw[1:]
176 # if byte not in (b"-", b"+", b":", b"$", b"*"):
177 # raise InvalidResponse(f"Protocol Error: {raw!r}")
179 # server returned an error
180 if byte in (b"-", b"!"):
181 if byte == b"!":
182 response = await self._read(int(response))
183 response = response.decode("utf-8", errors="replace")
184 error = self.parse_error(response)
185 # if the error is a ConnectionError, raise immediately so the user
186 # is notified
187 if isinstance(error, ConnectionError):
188 self._clear() # Successful parse
189 raise error
190 # otherwise, we're dealing with a ResponseError that might belong
191 # inside a pipeline response. the connection's read_response()
192 # and/or the pipeline's execute() will raise this error if
193 # necessary, so just return the exception instance here.
194 return error
195 # single value
196 elif byte == b"+":
197 pass
198 # null value
199 elif byte == b"_":
200 return None
201 # int and big int values
202 elif byte in (b":", b"("):
203 return int(response)
204 # double value
205 elif byte == b",":
206 return float(response)
207 # bool value
208 elif byte == b"#":
209 return response == b"t"
210 # bulk response
211 elif byte == b"$":
212 response = await self._read(int(response))
213 # verbatim string response
214 elif byte == b"=":
215 response = (await self._read(int(response)))[4:]
216 # array response
217 elif byte == b"*":
218 response = [
219 (await self._read_response(disable_decoding=disable_decoding))
220 for _ in range(int(response))
221 ]
222 # set response
223 elif byte == b"~":
224 # redis can return unhashable types (like dict) in a set,
225 # so we always convert to a list, to have predictable return types
226 response = [
227 (await self._read_response(disable_decoding=disable_decoding))
228 for _ in range(int(response))
229 ]
230 # map response
231 elif byte == b"%":
232 # We cannot use a dict-comprehension to parse stream.
233 # Evaluation order of key:val expression in dict comprehension only
234 # became defined to be left-right in version 3.8
235 resp_dict = {}
236 for _ in range(int(response)):
237 key = await self._read_response(disable_decoding=disable_decoding)
238 resp_dict[key] = await self._read_response(
239 disable_decoding=disable_decoding, push_request=push_request
240 )
241 response = resp_dict
242 # push response
243 elif byte == b">":
244 response = [
245 (
246 await self._read_response(
247 disable_decoding=disable_decoding, push_request=push_request
248 )
249 )
250 for _ in range(int(response))
251 ]
252 response = await self.handle_push_response(response)
253 if not push_request:
254 return await self._read_response(
255 disable_decoding=disable_decoding, push_request=push_request
256 )
257 else:
258 return response
259 else:
260 raise InvalidResponse(f"Protocol Error: {raw!r}")
262 if isinstance(response, bytes) and disable_decoding is False:
263 response = self.encoder.decode(response)
264 return response