Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

144 statements  

1from logging import getLogger 

2from typing import Any, Union 

3 

4from ..exceptions import ConnectionError, InvalidResponse, ResponseError 

5from ..typing import EncodableT 

6from .base import ( 

7 AsyncPushNotificationsParser, 

8 PushNotificationsParser, 

9 _AsyncRESPBase, 

10 _RESPBase, 

11) 

12from .socket import SENTINEL, SERVER_CLOSED_CONNECTION_ERROR 

13 

14 

15class _RESP3Parser(_RESPBase, PushNotificationsParser): 

16 """RESP3 protocol implementation""" 

17 

18 def __init__(self, socket_read_size): 

19 super().__init__(socket_read_size) 

20 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

21 self.node_moving_push_handler_func = None 

22 self.maintenance_push_handler_func = None 

23 self.oss_cluster_maint_push_handler_func = None 

24 self.invalidation_push_handler_func = None 

25 

26 def handle_pubsub_push_response(self, response): 

27 logger = getLogger("push_response") 

28 logger.debug("Push response: " + str(response)) 

29 return response 

30 

31 def read_response( 

32 self, 

33 disable_decoding=False, 

34 push_request=False, 

35 timeout: Union[float, object] = SENTINEL, 

36 ): 

37 pos = self._buffer.get_pos() if self._buffer is not None else None 

38 try: 

39 result = self._read_response( 

40 disable_decoding=disable_decoding, 

41 push_request=push_request, 

42 timeout=timeout, 

43 ) 

44 except BaseException: 

45 if self._buffer is not None: 

46 self._buffer.rewind(pos) 

47 raise 

48 else: 

49 if self._buffer is not None: 

50 try: 

51 self._buffer.purge() 

52 except AttributeError: 

53 # Buffer may have been set to None by another thread after 

54 # the check above; result is still valid so we don't raise 

55 pass 

56 return result 

57 

58 def _read_response( 

59 self, 

60 disable_decoding=False, 

61 push_request=False, 

62 timeout: Union[float, object] = SENTINEL, 

63 ): 

64 raw = self._buffer.readline(timeout=timeout) 

65 if not raw: 

66 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

67 

68 byte, response = raw[:1], raw[1:] 

69 

70 # server returned an error 

71 if byte in (b"-", b"!"): 

72 if byte == b"!": 

73 response = self._buffer.read(int(response), timeout=timeout) 

74 response = response.decode("utf-8", errors="replace") 

75 error = self.parse_error(response) 

76 # if the error is a ConnectionError, raise immediately so the user 

77 # is notified 

78 if isinstance(error, ConnectionError): 

79 raise error 

80 # otherwise, we're dealing with a ResponseError that might belong 

81 # inside a pipeline response. the connection's read_response() 

82 # and/or the pipeline's execute() will raise this error if 

83 # necessary, so just return the exception instance here. 

84 return error 

85 # single value 

86 elif byte == b"+": 

87 pass 

88 # null value 

89 elif byte == b"_": 

90 return None 

91 # int and big int values 

92 elif byte in (b":", b"("): 

93 return int(response) 

94 # double value 

95 elif byte == b",": 

96 return float(response) 

97 # bool value 

98 elif byte == b"#": 

99 return response == b"t" 

100 # bulk response 

101 elif byte == b"$": 

102 response = self._buffer.read(int(response), timeout=timeout) 

103 # verbatim string response 

104 elif byte == b"=": 

105 response = self._buffer.read(int(response), timeout=timeout)[4:] 

106 # array response 

107 elif byte == b"*": 

108 response = [ 

109 self._read_response(disable_decoding=disable_decoding, timeout=timeout) 

110 for _ in range(int(response)) 

111 ] 

112 # set response 

113 elif byte == b"~": 

114 # redis can return unhashable types (like dict) in a set, 

115 # so we return sets as list, all the time, for predictability 

116 response = [ 

117 self._read_response(disable_decoding=disable_decoding, timeout=timeout) 

118 for _ in range(int(response)) 

119 ] 

120 # map response 

121 elif byte == b"%": 

122 # We cannot use a dict-comprehension to parse stream. 

123 # Evaluation order of key:val expression in dict comprehension only 

124 # became defined to be left-right in version 3.8 

125 resp_dict = {} 

126 for _ in range(int(response)): 

127 key = self._read_response( 

128 disable_decoding=disable_decoding, timeout=timeout 

129 ) 

130 resp_dict[key] = self._read_response( 

131 disable_decoding=disable_decoding, 

132 push_request=push_request, 

133 timeout=timeout, 

134 ) 

135 response = resp_dict 

136 # push response 

137 elif byte == b">": 

138 response = [ 

139 self._read_response( 

140 disable_decoding=disable_decoding, 

141 push_request=push_request, 

142 timeout=timeout, 

143 ) 

144 for _ in range(int(response)) 

145 ] 

146 response = self.handle_push_response(response) 

147 

148 # if this is a push request return the push response 

149 if push_request: 

150 return response 

151 

152 return self._read_response( 

153 disable_decoding=disable_decoding, 

154 push_request=push_request, 

155 ) 

156 else: 

157 raise InvalidResponse(f"Protocol Error: {raw!r}") 

158 

159 if isinstance(response, bytes) and disable_decoding is False: 

160 response = self.encoder.decode(response) 

161 

162 return response 

163 

164 

165class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser): 

166 def __init__(self, socket_read_size): 

167 super().__init__(socket_read_size) 

168 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

169 self.invalidation_push_handler_func = None 

170 

171 async def handle_pubsub_push_response(self, response): 

172 logger = getLogger("push_response") 

173 logger.debug("Push response: " + str(response)) 

174 return response 

175 

176 async def read_response( 

177 self, disable_decoding: bool = False, push_request: bool = False 

178 ): 

179 if self._chunks: 

180 # augment parsing buffer with previously read data 

181 self._buffer += b"".join(self._chunks) 

182 self._chunks.clear() 

183 self._pos = 0 

184 response = await self._read_response( 

185 disable_decoding=disable_decoding, push_request=push_request 

186 ) 

187 # Successfully parsing a response allows us to clear our parsing buffer 

188 self._clear() 

189 return response 

190 

191 async def _read_response( 

192 self, disable_decoding: bool = False, push_request: bool = False 

193 ) -> Union[EncodableT, ResponseError, None]: 

194 if not self._stream or not self.encoder: 

195 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

196 raw = await self._readline() 

197 response: Any 

198 byte, response = raw[:1], raw[1:] 

199 

200 # if byte not in (b"-", b"+", b":", b"$", b"*"): 

201 # raise InvalidResponse(f"Protocol Error: {raw!r}") 

202 

203 # server returned an error 

204 if byte in (b"-", b"!"): 

205 if byte == b"!": 

206 response = await self._read(int(response)) 

207 response = response.decode("utf-8", errors="replace") 

208 error = self.parse_error(response) 

209 # if the error is a ConnectionError, raise immediately so the user 

210 # is notified 

211 if isinstance(error, ConnectionError): 

212 self._clear() # Successful parse 

213 raise error 

214 # otherwise, we're dealing with a ResponseError that might belong 

215 # inside a pipeline response. the connection's read_response() 

216 # and/or the pipeline's execute() will raise this error if 

217 # necessary, so just return the exception instance here. 

218 return error 

219 # single value 

220 elif byte == b"+": 

221 pass 

222 # null value 

223 elif byte == b"_": 

224 return None 

225 # int and big int values 

226 elif byte in (b":", b"("): 

227 return int(response) 

228 # double value 

229 elif byte == b",": 

230 return float(response) 

231 # bool value 

232 elif byte == b"#": 

233 return response == b"t" 

234 # bulk response 

235 elif byte == b"$": 

236 response = await self._read(int(response)) 

237 # verbatim string response 

238 elif byte == b"=": 

239 response = (await self._read(int(response)))[4:] 

240 # array response 

241 elif byte == b"*": 

242 response = [ 

243 (await self._read_response(disable_decoding=disable_decoding)) 

244 for _ in range(int(response)) 

245 ] 

246 # set response 

247 elif byte == b"~": 

248 # redis can return unhashable types (like dict) in a set, 

249 # so we always convert to a list, to have predictable return types 

250 response = [ 

251 (await self._read_response(disable_decoding=disable_decoding)) 

252 for _ in range(int(response)) 

253 ] 

254 # map response 

255 elif byte == b"%": 

256 # We cannot use a dict-comprehension to parse stream. 

257 # Evaluation order of key:val expression in dict comprehension only 

258 # became defined to be left-right in version 3.8 

259 resp_dict = {} 

260 for _ in range(int(response)): 

261 key = await self._read_response(disable_decoding=disable_decoding) 

262 resp_dict[key] = await self._read_response( 

263 disable_decoding=disable_decoding, push_request=push_request 

264 ) 

265 response = resp_dict 

266 # push response 

267 elif byte == b">": 

268 response = [ 

269 ( 

270 await self._read_response( 

271 disable_decoding=disable_decoding, push_request=push_request 

272 ) 

273 ) 

274 for _ in range(int(response)) 

275 ] 

276 response = await self.handle_push_response(response) 

277 if not push_request: 

278 return await self._read_response( 

279 disable_decoding=disable_decoding, push_request=push_request 

280 ) 

281 else: 

282 return response 

283 else: 

284 raise InvalidResponse(f"Protocol Error: {raw!r}") 

285 

286 if isinstance(response, bytes) and disable_decoding is False: 

287 response = self.encoder.decode(response) 

288 return response