Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp2.py: 15%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from typing import Any, Union
3from ..exceptions import ConnectionError, InvalidResponse, ResponseError
4from ..typing import EncodableT
5from ..utils import SENTINEL
6from .base import _AsyncRESPBase, _RESPBase
7from .socket import SERVER_CLOSED_CONNECTION_ERROR
10class _RESP2Parser(_RESPBase):
11 """RESP2 protocol implementation"""
13 def read_response(
14 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL
15 ):
16 pos = self._buffer.get_pos() if self._buffer else None
17 try:
18 result = self._read_response(
19 disable_decoding=disable_decoding, timeout=timeout
20 )
21 except BaseException:
22 if self._buffer:
23 self._buffer.rewind(pos)
24 raise
25 else:
26 self._buffer.purge()
27 return result
29 def _read_response(
30 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL
31 ):
32 raw = self._buffer.readline(timeout=timeout)
33 if not raw:
34 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
36 byte, response = raw[:1], raw[1:]
38 # server returned an error
39 if byte == b"-":
40 response = response.decode("utf-8", errors="replace")
41 error = self.parse_error(response)
42 # if the error is a ConnectionError, raise immediately so the user
43 # is notified
44 if isinstance(error, ConnectionError):
45 raise error
46 # otherwise, we're dealing with a ResponseError that might belong
47 # inside a pipeline response. the connection's read_response()
48 # and/or the pipeline's execute() will raise this error if
49 # necessary, so just return the exception instance here.
50 return error
51 # single value
52 elif byte == b"+":
53 pass
54 # int value
55 elif byte == b":":
56 return int(response)
57 # bulk response
58 elif byte == b"$" and response == b"-1":
59 return None
60 elif byte == b"$":
61 response = self._buffer.read(int(response), timeout=timeout)
62 # multi-bulk response
63 elif byte == b"*" and response == b"-1":
64 return None
65 elif byte == b"*":
66 response = [
67 self._read_response(disable_decoding=disable_decoding, timeout=timeout)
68 for i in range(int(response))
69 ]
70 else:
71 raise InvalidResponse(f"Protocol Error: {raw!r}")
73 if disable_decoding is False:
74 response = self.encoder.decode(response)
75 return response
78class _AsyncRESP2Parser(_AsyncRESPBase):
79 """Async class for the RESP2 protocol"""
81 async def read_response(self, disable_decoding: bool = False):
82 if not self._connected:
83 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
84 if self._chunks:
85 # augment parsing buffer with previously read data
86 self._buffer += b"".join(self._chunks)
87 self._chunks.clear()
88 self._pos = 0
89 response = await self._read_response(disable_decoding=disable_decoding)
90 # Successfully parsing a response allows us to clear our parsing buffer
91 self._clear()
92 return response
94 async def _read_response(
95 self, disable_decoding: bool = False
96 ) -> Union[EncodableT, ResponseError, None]:
97 raw = await self._readline()
98 response: Any
99 byte, response = raw[:1], raw[1:]
101 # server returned an error
102 if byte == b"-":
103 response = response.decode("utf-8", errors="replace")
104 error = self.parse_error(response)
105 # if the error is a ConnectionError, raise immediately so the user
106 # is notified
107 if isinstance(error, ConnectionError):
108 self._clear() # Successful parse
109 raise error
110 # otherwise, we're dealing with a ResponseError that might belong
111 # inside a pipeline response. the connection's read_response()
112 # and/or the pipeline's execute() will raise this error if
113 # necessary, so just return the exception instance here.
114 return error
115 # single value
116 elif byte == b"+":
117 pass
118 # int value
119 elif byte == b":":
120 return int(response)
121 # bulk response
122 elif byte == b"$" and response == b"-1":
123 return None
124 elif byte == b"$":
125 response = await self._read(int(response))
126 # multi-bulk response
127 elif byte == b"*" and response == b"-1":
128 return None
129 elif byte == b"*":
130 response = [
131 (await self._read_response(disable_decoding))
132 for _ in range(int(response)) # noqa
133 ]
134 else:
135 raise InvalidResponse(f"Protocol Error: {raw!r}")
137 if disable_decoding is False:
138 response = self.encoder.decode(response)
139 return response