Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp3.py: 11%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

140 statements  

1from logging import getLogger 

2from typing import Any, Union 

3 

4from ..exceptions import ConnectionError, InvalidResponse, ResponseError 

5from ..typing import EncodableT 

6from .base import ( 

7 AsyncPushNotificationsParser, 

8 PushNotificationsParser, 

9 _AsyncRESPBase, 

10 _RESPBase, 

11) 

12from .socket import SERVER_CLOSED_CONNECTION_ERROR 

13 

14 

15class _RESP3Parser(_RESPBase, PushNotificationsParser): 

16 """RESP3 protocol implementation""" 

17 

18 def __init__(self, socket_read_size): 

19 super().__init__(socket_read_size) 

20 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

21 self.node_moving_push_handler_func = None 

22 self.maintenance_push_handler_func = None 

23 self.oss_cluster_maint_push_handler_func = None 

24 self.invalidation_push_handler_func = None 

25 

26 def handle_pubsub_push_response(self, response): 

27 logger = getLogger("push_response") 

28 logger.debug("Push response: " + str(response)) 

29 return response 

30 

31 def read_response(self, disable_decoding=False, push_request=False): 

32 pos = self._buffer.get_pos() if self._buffer else None 

33 try: 

34 result = self._read_response( 

35 disable_decoding=disable_decoding, push_request=push_request 

36 ) 

37 except BaseException: 

38 if self._buffer: 

39 self._buffer.rewind(pos) 

40 raise 

41 else: 

42 self._buffer.purge() 

43 return result 

44 

45 def _read_response(self, disable_decoding=False, push_request=False): 

46 raw = self._buffer.readline() 

47 if not raw: 

48 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

49 

50 byte, response = raw[:1], raw[1:] 

51 

52 # server returned an error 

53 if byte in (b"-", b"!"): 

54 if byte == b"!": 

55 response = self._buffer.read(int(response)) 

56 response = response.decode("utf-8", errors="replace") 

57 error = self.parse_error(response) 

58 # if the error is a ConnectionError, raise immediately so the user 

59 # is notified 

60 if isinstance(error, ConnectionError): 

61 raise error 

62 # otherwise, we're dealing with a ResponseError that might belong 

63 # inside a pipeline response. the connection's read_response() 

64 # and/or the pipeline's execute() will raise this error if 

65 # necessary, so just return the exception instance here. 

66 return error 

67 # single value 

68 elif byte == b"+": 

69 pass 

70 # null value 

71 elif byte == b"_": 

72 return None 

73 # int and big int values 

74 elif byte in (b":", b"("): 

75 return int(response) 

76 # double value 

77 elif byte == b",": 

78 return float(response) 

79 # bool value 

80 elif byte == b"#": 

81 return response == b"t" 

82 # bulk response 

83 elif byte == b"$": 

84 response = self._buffer.read(int(response)) 

85 # verbatim string response 

86 elif byte == b"=": 

87 response = self._buffer.read(int(response))[4:] 

88 # array response 

89 elif byte == b"*": 

90 response = [ 

91 self._read_response(disable_decoding=disable_decoding) 

92 for _ in range(int(response)) 

93 ] 

94 # set response 

95 elif byte == b"~": 

96 # redis can return unhashable types (like dict) in a set, 

97 # so we return sets as list, all the time, for predictability 

98 response = [ 

99 self._read_response(disable_decoding=disable_decoding) 

100 for _ in range(int(response)) 

101 ] 

102 # map response 

103 elif byte == b"%": 

104 # We cannot use a dict-comprehension to parse stream. 

105 # Evaluation order of key:val expression in dict comprehension only 

106 # became defined to be left-right in version 3.8 

107 resp_dict = {} 

108 for _ in range(int(response)): 

109 key = self._read_response(disable_decoding=disable_decoding) 

110 resp_dict[key] = self._read_response( 

111 disable_decoding=disable_decoding, push_request=push_request 

112 ) 

113 response = resp_dict 

114 # push response 

115 elif byte == b">": 

116 response = [ 

117 self._read_response( 

118 disable_decoding=disable_decoding, push_request=push_request 

119 ) 

120 for _ in range(int(response)) 

121 ] 

122 response = self.handle_push_response(response) 

123 

124 # if this is a push request return the push response 

125 if push_request: 

126 return response 

127 

128 return self._read_response( 

129 disable_decoding=disable_decoding, 

130 push_request=push_request, 

131 ) 

132 else: 

133 raise InvalidResponse(f"Protocol Error: {raw!r}") 

134 

135 if isinstance(response, bytes) and disable_decoding is False: 

136 response = self.encoder.decode(response) 

137 

138 return response 

139 

140 

141class _AsyncRESP3Parser(_AsyncRESPBase, AsyncPushNotificationsParser): 

142 def __init__(self, socket_read_size): 

143 super().__init__(socket_read_size) 

144 self.pubsub_push_handler_func = self.handle_pubsub_push_response 

145 self.invalidation_push_handler_func = None 

146 

147 async def handle_pubsub_push_response(self, response): 

148 logger = getLogger("push_response") 

149 logger.debug("Push response: " + str(response)) 

150 return response 

151 

152 async def read_response( 

153 self, disable_decoding: bool = False, push_request: bool = False 

154 ): 

155 if self._chunks: 

156 # augment parsing buffer with previously read data 

157 self._buffer += b"".join(self._chunks) 

158 self._chunks.clear() 

159 self._pos = 0 

160 response = await self._read_response( 

161 disable_decoding=disable_decoding, push_request=push_request 

162 ) 

163 # Successfully parsing a response allows us to clear our parsing buffer 

164 self._clear() 

165 return response 

166 

167 async def _read_response( 

168 self, disable_decoding: bool = False, push_request: bool = False 

169 ) -> Union[EncodableT, ResponseError, None]: 

170 if not self._stream or not self.encoder: 

171 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

172 raw = await self._readline() 

173 response: Any 

174 byte, response = raw[:1], raw[1:] 

175 

176 # if byte not in (b"-", b"+", b":", b"$", b"*"): 

177 # raise InvalidResponse(f"Protocol Error: {raw!r}") 

178 

179 # server returned an error 

180 if byte in (b"-", b"!"): 

181 if byte == b"!": 

182 response = await self._read(int(response)) 

183 response = response.decode("utf-8", errors="replace") 

184 error = self.parse_error(response) 

185 # if the error is a ConnectionError, raise immediately so the user 

186 # is notified 

187 if isinstance(error, ConnectionError): 

188 self._clear() # Successful parse 

189 raise error 

190 # otherwise, we're dealing with a ResponseError that might belong 

191 # inside a pipeline response. the connection's read_response() 

192 # and/or the pipeline's execute() will raise this error if 

193 # necessary, so just return the exception instance here. 

194 return error 

195 # single value 

196 elif byte == b"+": 

197 pass 

198 # null value 

199 elif byte == b"_": 

200 return None 

201 # int and big int values 

202 elif byte in (b":", b"("): 

203 return int(response) 

204 # double value 

205 elif byte == b",": 

206 return float(response) 

207 # bool value 

208 elif byte == b"#": 

209 return response == b"t" 

210 # bulk response 

211 elif byte == b"$": 

212 response = await self._read(int(response)) 

213 # verbatim string response 

214 elif byte == b"=": 

215 response = (await self._read(int(response)))[4:] 

216 # array response 

217 elif byte == b"*": 

218 response = [ 

219 (await self._read_response(disable_decoding=disable_decoding)) 

220 for _ in range(int(response)) 

221 ] 

222 # set response 

223 elif byte == b"~": 

224 # redis can return unhashable types (like dict) in a set, 

225 # so we always convert to a list, to have predictable return types 

226 response = [ 

227 (await self._read_response(disable_decoding=disable_decoding)) 

228 for _ in range(int(response)) 

229 ] 

230 # map response 

231 elif byte == b"%": 

232 # We cannot use a dict-comprehension to parse stream. 

233 # Evaluation order of key:val expression in dict comprehension only 

234 # became defined to be left-right in version 3.8 

235 resp_dict = {} 

236 for _ in range(int(response)): 

237 key = await self._read_response(disable_decoding=disable_decoding) 

238 resp_dict[key] = await self._read_response( 

239 disable_decoding=disable_decoding, push_request=push_request 

240 ) 

241 response = resp_dict 

242 # push response 

243 elif byte == b">": 

244 response = [ 

245 ( 

246 await self._read_response( 

247 disable_decoding=disable_decoding, push_request=push_request 

248 ) 

249 ) 

250 for _ in range(int(response)) 

251 ] 

252 response = await self.handle_push_response(response) 

253 if not push_request: 

254 return await self._read_response( 

255 disable_decoding=disable_decoding, push_request=push_request 

256 ) 

257 else: 

258 return response 

259 else: 

260 raise InvalidResponse(f"Protocol Error: {raw!r}") 

261 

262 if isinstance(response, bytes) and disable_decoding is False: 

263 response = self.encoder.decode(response) 

264 return response