Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

144 statements  

1from logging import getLogger 

2from typing import Any, Union 

3 

4from ..exceptions import ConnectionError, InvalidResponse, ResponseError 

5from ..typing import EncodableT 

6from .base import ( 

7 AsyncPushNotificationsParser, 

8 PushNotificationsParser, 

9 _AsyncRESPBase, 

10 _RESPBase, 

11) 

12from .socket import SERVER_CLOSED_CONNECTION_ERROR 

13 

14 

15class _RESP3Parser(_RESPBase, PushNotificationsParser): 

16 """RESP3 protocol implementation""" 

17 

18 def __init__(self, socket_read_size): 

19 super().__init__(socket_read_size) 

20 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

21 self.node_moving_push_handler_func = None 

22 self.maintenance_push_handler_func = None 

23 self.oss_cluster_maint_push_handler_func = None 

24 self.invalidation_push_handler_func = None 

25 

26 def handle_pubsub_push_response(self, response): 

27 logger = getLogger("push_response") 

28 logger.debug("Push response: " + str(response)) 

29 return response 

30 

31 def read_response(self, disable_decoding=False, push_request=False): 

32 pos = self._buffer.get_pos() if self._buffer is not None else None 

33 try: 

34 result = self._read_response( 

35 disable_decoding=disable_decoding, push_request=push_request 

36 ) 

37 except BaseException: 

38 if self._buffer is not None: 

39 self._buffer.rewind(pos) 

40 raise 

41 else: 

42 if self._buffer is not None: 

43 try: 

44 self._buffer.purge() 

45 except AttributeError: 

46 # Buffer may have been set to None by another thread after 

47 # the check above; result is still valid so we don't raise 

48 pass 

49 return result 

50 

51 def _read_response(self, disable_decoding=False, push_request=False): 

52 raw = self._buffer.readline() 

53 if not raw: 

54 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

55 

56 byte, response = raw[:1], raw[1:] 

57 

58 # server returned an error 

59 if byte in (b"-", b"!"): 

60 if byte == b"!": 

61 response = self._buffer.read(int(response)) 

62 response = response.decode("utf-8", errors="replace") 

63 error = self.parse_error(response) 

64 # if the error is a ConnectionError, raise immediately so the user 

65 # is notified 

66 if isinstance(error, ConnectionError): 

67 raise error 

68 # otherwise, we're dealing with a ResponseError that might belong 

69 # inside a pipeline response. the connection's read_response() 

70 # and/or the pipeline's execute() will raise this error if 

71 # necessary, so just return the exception instance here. 

72 return error 

73 # single value 

74 elif byte == b"+": 

75 pass 

76 # null value 

77 elif byte == b"_": 

78 return None 

79 # int and big int values 

80 elif byte in (b":", b"("): 

81 return int(response) 

82 # double value 

83 elif byte == b",": 

84 return float(response) 

85 # bool value 

86 elif byte == b"#": 

87 return response == b"t" 

88 # bulk response 

89 elif byte == b"$": 

90 response = self._buffer.read(int(response)) 

91 # verbatim string response 

92 elif byte == b"=": 

93 response = self._buffer.read(int(response))[4:] 

94 # array response 

95 elif byte == b"*": 

96 response = [ 

97 self._read_response(disable_decoding=disable_decoding) 

98 for _ in range(int(response)) 

99 ] 

100 # set response 

101 elif byte == b"~": 

102 # redis can return unhashable types (like dict) in a set, 

103 # so we return sets as list, all the time, for predictability 

104 response = [ 

105 self._read_response(disable_decoding=disable_decoding) 

106 for _ in range(int(response)) 

107 ] 

108 # map response 

109 elif byte == b"%": 

110 # We cannot use a dict-comprehension to parse stream. 

111 # Evaluation order of key:val expression in dict comprehension only 

112 # became defined to be left-right in version 3.8 

113 resp_dict = {} 

114 for _ in range(int(response)): 

115 key = self._read_response(disable_decoding=disable_decoding) 

116 resp_dict[key] = self._read_response( 

117 disable_decoding=disable_decoding, push_request=push_request 

118 ) 

119 response = resp_dict 

120 # push response 

121 elif byte == b">": 

122 response = [ 

123 self._read_response( 

124 disable_decoding=disable_decoding, push_request=push_request 

125 ) 

126 for _ in range(int(response)) 

127 ] 

128 response = self.handle_push_response(response) 

129 

130 # if this is a push request return the push response 

131 if push_request: 

132 return response 

133 

134 return self._read_response( 

135 disable_decoding=disable_decoding, 

136 push_request=push_request, 

137 ) 

138 else: 

139 raise InvalidResponse(f"Protocol Error: {raw!r}") 

140 

141 if isinstance(response, bytes) and disable_decoding is False: 

142 response = self.encoder.decode(response) 

143 

144 return response 

145 

146 

147class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser): 

148 def __init__(self, socket_read_size): 

149 super().__init__(socket_read_size) 

150 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

151 self.invalidation_push_handler_func = None 

152 

153 async def handle_pubsub_push_response(self, response): 

154 logger = getLogger("push_response") 

155 logger.debug("Push response: " + str(response)) 

156 return response 

157 

158 async def read_response( 

159 self, disable_decoding: bool = False, push_request: bool = False 

160 ): 

161 if self._chunks: 

162 # augment parsing buffer with previously read data 

163 self._buffer += b"".join(self._chunks) 

164 self._chunks.clear() 

165 self._pos = 0 

166 response = await self._read_response( 

167 disable_decoding=disable_decoding, push_request=push_request 

168 ) 

169 # Successfully parsing a response allows us to clear our parsing buffer 

170 self._clear() 

171 return response 

172 

173 async def _read_response( 

174 self, disable_decoding: bool = False, push_request: bool = False 

175 ) -> Union[EncodableT, ResponseError, None]: 

176 if not self._stream or not self.encoder: 

177 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

178 raw = await self._readline() 

179 response: Any 

180 byte, response = raw[:1], raw[1:] 

181 

182 # if byte not in (b"-", b"+", b":", b"$", b"*"): 

183 # raise InvalidResponse(f"Protocol Error: {raw!r}") 

184 

185 # server returned an error 

186 if byte in (b"-", b"!"): 

187 if byte == b"!": 

188 response = await self._read(int(response)) 

189 response = response.decode("utf-8", errors="replace") 

190 error = self.parse_error(response) 

191 # if the error is a ConnectionError, raise immediately so the user 

192 # is notified 

193 if isinstance(error, ConnectionError): 

194 self._clear() # Successful parse 

195 raise error 

196 # otherwise, we're dealing with a ResponseError that might belong 

197 # inside a pipeline response. the connection's read_response() 

198 # and/or the pipeline's execute() will raise this error if 

199 # necessary, so just return the exception instance here. 

200 return error 

201 # single value 

202 elif byte == b"+": 

203 pass 

204 # null value 

205 elif byte == b"_": 

206 return None 

207 # int and big int values 

208 elif byte in (b":", b"("): 

209 return int(response) 

210 # double value 

211 elif byte == b",": 

212 return float(response) 

213 # bool value 

214 elif byte == b"#": 

215 return response == b"t" 

216 # bulk response 

217 elif byte == b"$": 

218 response = await self._read(int(response)) 

219 # verbatim string response 

220 elif byte == b"=": 

221 response = (await self._read(int(response)))[4:] 

222 # array response 

223 elif byte == b"*": 

224 response = [ 

225 (await self._read_response(disable_decoding=disable_decoding)) 

226 for _ in range(int(response)) 

227 ] 

228 # set response 

229 elif byte == b"~": 

230 # redis can return unhashable types (like dict) in a set, 

231 # so we always convert to a list, to have predictable return types 

232 response = [ 

233 (await self._read_response(disable_decoding=disable_decoding)) 

234 for _ in range(int(response)) 

235 ] 

236 # map response 

237 elif byte == b"%": 

238 # We cannot use a dict-comprehension to parse stream. 

239 # Evaluation order of key:val expression in dict comprehension only 

240 # became defined to be left-right in version 3.8 

241 resp_dict = {} 

242 for _ in range(int(response)): 

243 key = await self._read_response(disable_decoding=disable_decoding) 

244 resp_dict[key] = await self._read_response( 

245 disable_decoding=disable_decoding, push_request=push_request 

246 ) 

247 response = resp_dict 

248 # push response 

249 elif byte == b">": 

250 response = [ 

251 ( 

252 await self._read_response( 

253 disable_decoding=disable_decoding, push_request=push_request 

254 ) 

255 ) 

256 for _ in range(int(response)) 

257 ] 

258 response = await self.handle_push_response(response) 

259 if not push_request: 

260 return await self._read_response( 

261 disable_decoding=disable_decoding, push_request=push_request 

262 ) 

263 else: 

264 return response 

265 else: 

266 raise InvalidResponse(f"Protocol Error: {raw!r}") 

267 

268 if isinstance(response, bytes) and disable_decoding is False: 

269 response = self.encoder.decode(response) 

270 return response