Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp2.py: 14%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from typing import Any, Union
3from ..exceptions import ConnectionError, InvalidResponse, ResponseError
4from ..typing import EncodableT
5from .base import _AsyncRESPBase, _RESPBase
6from .socket import SENTINEL, SERVER_CLOSED_CONNECTION_ERROR
9class _RESP2Parser(_RESPBase):
10 """RESP2 protocol implementation"""
12 def read_response(
13 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL
14 ):
15 pos = self._buffer.get_pos() if self._buffer else None
16 try:
17 result = self._read_response(
18 disable_decoding=disable_decoding, timeout=timeout
19 )
20 except BaseException:
21 if self._buffer:
22 self._buffer.rewind(pos)
23 raise
24 else:
25 self._buffer.purge()
26 return result
28 def _read_response(
29 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL
30 ):
31 raw = self._buffer.readline(timeout=timeout)
32 if not raw:
33 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
35 byte, response = raw[:1], raw[1:]
37 # server returned an error
38 if byte == b"-":
39 response = response.decode("utf-8", errors="replace")
40 error = self.parse_error(response)
41 # if the error is a ConnectionError, raise immediately so the user
42 # is notified
43 if isinstance(error, ConnectionError):
44 raise error
45 # otherwise, we're dealing with a ResponseError that might belong
46 # inside a pipeline response. the connection's read_response()
47 # and/or the pipeline's execute() will raise this error if
48 # necessary, so just return the exception instance here.
49 return error
50 # single value
51 elif byte == b"+":
52 pass
53 # int value
54 elif byte == b":":
55 return int(response)
56 # bulk response
57 elif byte == b"$" and response == b"-1":
58 return None
59 elif byte == b"$":
60 response = self._buffer.read(int(response), timeout=timeout)
61 # multi-bulk response
62 elif byte == b"*" and response == b"-1":
63 return None
64 elif byte == b"*":
65 response = [
66 self._read_response(disable_decoding=disable_decoding, timeout=timeout)
67 for i in range(int(response))
68 ]
69 else:
70 raise InvalidResponse(f"Protocol Error: {raw!r}")
72 if disable_decoding is False:
73 response = self.encoder.decode(response)
74 return response
77class _AsyncRESP2Parser(_AsyncRESPBase):
78 """Async class for the RESP2 protocol"""
80 async def read_response(self, disable_decoding: bool = False):
81 if not self._connected:
82 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
83 if self._chunks:
84 # augment parsing buffer with previously read data
85 self._buffer += b"".join(self._chunks)
86 self._chunks.clear()
87 self._pos = 0
88 response = await self._read_response(disable_decoding=disable_decoding)
89 # Successfully parsing a response allows us to clear our parsing buffer
90 self._clear()
91 return response
93 async def _read_response(
94 self, disable_decoding: bool = False
95 ) -> Union[EncodableT, ResponseError, None]:
96 raw = await self._readline()
97 response: Any
98 byte, response = raw[:1], raw[1:]
100 # server returned an error
101 if byte == b"-":
102 response = response.decode("utf-8", errors="replace")
103 error = self.parse_error(response)
104 # if the error is a ConnectionError, raise immediately so the user
105 # is notified
106 if isinstance(error, ConnectionError):
107 self._clear() # Successful parse
108 raise error
109 # otherwise, we're dealing with a ResponseError that might belong
110 # inside a pipeline response. the connection's read_response()
111 # and/or the pipeline's execute() will raise this error if
112 # necessary, so just return the exception instance here.
113 return error
114 # single value
115 elif byte == b"+":
116 pass
117 # int value
118 elif byte == b":":
119 return int(response)
120 # bulk response
121 elif byte == b"$" and response == b"-1":
122 return None
123 elif byte == b"$":
124 response = await self._read(int(response))
125 # multi-bulk response
126 elif byte == b"*" and response == b"-1":
127 return None
128 elif byte == b"*":
129 response = [
130 (await self._read_response(disable_decoding))
131 for _ in range(int(response)) # noqa
132 ]
133 else:
134 raise InvalidResponse(f"Protocol Error: {raw!r}")
136 if disable_decoding is False:
137 response = self.encoder.decode(response)
138 return response