Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 12%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

145 statements  

1from logging import getLogger 

2from typing import Any, Union 

3 

4from ..exceptions import ConnectionError, InvalidResponse, ResponseError 

5from ..typing import EncodableT 

6from ..utils import SENTINEL 

7from .base import ( 

8 AsyncPushNotificationsParser, 

9 PushNotificationsParser, 

10 _AsyncRESPBase, 

11 _RESPBase, 

12) 

13from .socket import SERVER_CLOSED_CONNECTION_ERROR 

14 

15 

16class _RESP3Parser(_RESPBase, PushNotificationsParser): 

17 """RESP3 protocol implementation""" 

18 

19 def __init__(self, socket_read_size): 

20 super().__init__(socket_read_size) 

21 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

22 self.node_moving_push_handler_func = None 

23 self.maintenance_push_handler_func = None 

24 self.oss_cluster_maint_push_handler_func = None 

25 self.invalidation_push_handler_func = None 

26 

27 def handle_pubsub_push_response(self, response): 

28 logger = getLogger("push_response") 

29 logger.debug("Push response: " + str(response)) 

30 return response 

31 

32 def read_response( 

33 self, 

34 disable_decoding=False, 

35 push_request=False, 

36 timeout: Union[float, object] = SENTINEL, 

37 ): 

38 pos = self._buffer.get_pos() if self._buffer is not None else None 

39 try: 

40 result = self._read_response( 

41 disable_decoding=disable_decoding, 

42 push_request=push_request, 

43 timeout=timeout, 

44 ) 

45 except BaseException: 

46 if self._buffer is not None: 

47 self._buffer.rewind(pos) 

48 raise 

49 else: 

50 if self._buffer is not None: 

51 try: 

52 self._buffer.purge() 

53 except AttributeError: 

54 # Buffer may have been set to None by another thread after 

55 # the check above; result is still valid so we don't raise 

56 pass 

57 return result 

58 

59 def _read_response( 

60 self, 

61 disable_decoding=False, 

62 push_request=False, 

63 timeout: Union[float, object] = SENTINEL, 

64 ): 

65 raw = self._buffer.readline(timeout=timeout) 

66 if not raw: 

67 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

68 

69 byte, response = raw[:1], raw[1:] 

70 

71 # server returned an error 

72 if byte in (b"-", b"!"): 

73 if byte == b"!": 

74 response = self._buffer.read(int(response), timeout=timeout) 

75 response = response.decode("utf-8", errors="replace") 

76 error = self.parse_error(response) 

77 # if the error is a ConnectionError, raise immediately so the user 

78 # is notified 

79 if isinstance(error, ConnectionError): 

80 raise error 

81 # otherwise, we're dealing with a ResponseError that might belong 

82 # inside a pipeline response. the connection's read_response() 

83 # and/or the pipeline's execute() will raise this error if 

84 # necessary, so just return the exception instance here. 

85 return error 

86 # single value 

87 elif byte == b"+": 

88 pass 

89 # null value 

90 elif byte == b"_": 

91 return None 

92 # int and big int values 

93 elif byte in (b":", b"("): 

94 return int(response) 

95 # double value 

96 elif byte == b",": 

97 return float(response) 

98 # bool value 

99 elif byte == b"#": 

100 return response == b"t" 

101 # bulk response 

102 elif byte == b"$": 

103 response = self._buffer.read(int(response), timeout=timeout) 

104 # verbatim string response 

105 elif byte == b"=": 

106 response = self._buffer.read(int(response), timeout=timeout)[4:] 

107 # array response 

108 elif byte == b"*": 

109 response = [ 

110 self._read_response(disable_decoding=disable_decoding, timeout=timeout) 

111 for _ in range(int(response)) 

112 ] 

113 # set response 

114 elif byte == b"~": 

115 # redis can return unhashable types (like dict) in a set, 

116 # so we return sets as list, all the time, for predictability 

117 response = [ 

118 self._read_response(disable_decoding=disable_decoding, timeout=timeout) 

119 for _ in range(int(response)) 

120 ] 

121 # map response 

122 elif byte == b"%": 

123 # We cannot use a dict-comprehension to parse stream. 

124 # Evaluation order of key:val expression in dict comprehension only 

125 # became defined to be left-right in version 3.8 

126 resp_dict = {} 

127 for _ in range(int(response)): 

128 key = self._read_response( 

129 disable_decoding=disable_decoding, timeout=timeout 

130 ) 

131 resp_dict[key] = self._read_response( 

132 disable_decoding=disable_decoding, 

133 push_request=push_request, 

134 timeout=timeout, 

135 ) 

136 response = resp_dict 

137 # push response 

138 elif byte == b">": 

139 response = [ 

140 self._read_response( 

141 disable_decoding=disable_decoding, 

142 push_request=push_request, 

143 timeout=timeout, 

144 ) 

145 for _ in range(int(response)) 

146 ] 

147 response = self.handle_push_response(response) 

148 

149 # if this is a push request return the push response 

150 if push_request: 

151 return response 

152 

153 return self._read_response( 

154 disable_decoding=disable_decoding, 

155 push_request=push_request, 

156 ) 

157 else: 

158 raise InvalidResponse(f"Protocol Error: {raw!r}") 

159 

160 if isinstance(response, bytes) and disable_decoding is False: 

161 response = self.encoder.decode(response) 

162 

163 return response 

164 

165 

166class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser): 

167 def __init__(self, socket_read_size): 

168 super().__init__(socket_read_size) 

169 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

170 self.invalidation_push_handler_func = None 

171 

172 async def handle_pubsub_push_response(self, response): 

173 logger = getLogger("push_response") 

174 logger.debug("Push response: " + str(response)) 

175 return response 

176 

177 async def read_response( 

178 self, disable_decoding: bool = False, push_request: bool = False 

179 ): 

180 if self._chunks: 

181 # augment parsing buffer with previously read data 

182 self._buffer += b"".join(self._chunks) 

183 self._chunks.clear() 

184 self._pos = 0 

185 response = await self._read_response( 

186 disable_decoding=disable_decoding, push_request=push_request 

187 ) 

188 # Successfully parsing a response allows us to clear our parsing buffer 

189 self._clear() 

190 return response 

191 

192 async def _read_response( 

193 self, disable_decoding: bool = False, push_request: bool = False 

194 ) -> Union[EncodableT, ResponseError, None]: 

195 if not self._stream or not self.encoder: 

196 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

197 raw = await self._readline() 

198 response: Any 

199 byte, response = raw[:1], raw[1:] 

200 

201 # if byte not in (b"-", b"+", b":", b"$", b"*"): 

202 # raise InvalidResponse(f"Protocol Error: {raw!r}") 

203 

204 # server returned an error 

205 if byte in (b"-", b"!"): 

206 if byte == b"!": 

207 response = await self._read(int(response)) 

208 response = response.decode("utf-8", errors="replace") 

209 error = self.parse_error(response) 

210 # if the error is a ConnectionError, raise immediately so the user 

211 # is notified 

212 if isinstance(error, ConnectionError): 

213 self._clear() # Successful parse 

214 raise error 

215 # otherwise, we're dealing with a ResponseError that might belong 

216 # inside a pipeline response. the connection's read_response() 

217 # and/or the pipeline's execute() will raise this error if 

218 # necessary, so just return the exception instance here. 

219 return error 

220 # single value 

221 elif byte == b"+": 

222 pass 

223 # null value 

224 elif byte == b"_": 

225 return None 

226 # int and big int values 

227 elif byte in (b":", b"("): 

228 return int(response) 

229 # double value 

230 elif byte == b",": 

231 return float(response) 

232 # bool value 

233 elif byte == b"#": 

234 return response == b"t" 

235 # bulk response 

236 elif byte == b"$": 

237 response = await self._read(int(response)) 

238 # verbatim string response 

239 elif byte == b"=": 

240 response = (await self._read(int(response)))[4:] 

241 # array response 

242 elif byte == b"*": 

243 response = [ 

244 (await self._read_response(disable_decoding=disable_decoding)) 

245 for _ in range(int(response)) 

246 ] 

247 # set response 

248 elif byte == b"~": 

249 # redis can return unhashable types (like dict) in a set, 

250 # so we always convert to a list, to have predictable return types 

251 response = [ 

252 (await self._read_response(disable_decoding=disable_decoding)) 

253 for _ in range(int(response)) 

254 ] 

255 # map response 

256 elif byte == b"%": 

257 # We cannot use a dict-comprehension to parse stream. 

258 # Evaluation order of key:val expression in dict comprehension only 

259 # became defined to be left-right in version 3.8 

260 resp_dict = {} 

261 for _ in range(int(response)): 

262 key = await self._read_response(disable_decoding=disable_decoding) 

263 resp_dict[key] = await self._read_response( 

264 disable_decoding=disable_decoding, push_request=push_request 

265 ) 

266 response = resp_dict 

267 # push response 

268 elif byte == b">": 

269 response = [ 

270 ( 

271 await self._read_response( 

272 disable_decoding=disable_decoding, push_request=push_request 

273 ) 

274 ) 

275 for _ in range(int(response)) 

276 ] 

277 response = await self.handle_push_response(response) 

278 if not push_request: 

279 return await self._read_response( 

280 disable_decoding=disable_decoding, push_request=push_request 

281 ) 

282 else: 

283 return response 

284 else: 

285 raise InvalidResponse(f"Protocol Error: {raw!r}") 

286 

287 if isinstance(response, bytes) and disable_decoding is False: 

288 response = self.encoder.decode(response) 

289 return response