Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/_parsers/hiredis.py: 26%

141 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-04-23 06:16 +0000

1import asyncio 

2import socket 

3import sys 

4from typing import Callable, List, Optional, TypedDict, Union 

5 

6if sys.version_info.major >= 3 and sys.version_info.minor >= 11: 

7 from asyncio import timeout as async_timeout 

8else: 

9 from async_timeout import timeout as async_timeout 

10 

11from ..exceptions import ConnectionError, InvalidResponse, RedisError 

12from ..typing import EncodableT 

13from ..utils import HIREDIS_AVAILABLE 

14from .base import AsyncBaseParser, BaseParser 

15from .socket import ( 

16 NONBLOCKING_EXCEPTION_ERROR_NUMBERS, 

17 NONBLOCKING_EXCEPTIONS, 

18 SENTINEL, 

19 SERVER_CLOSED_CONNECTION_ERROR, 

20) 

21 

22 

23class _HiredisReaderArgs(TypedDict, total=False): 

24 protocolError: Callable[[str], Exception] 

25 replyError: Callable[[str], Exception] 

26 encoding: Optional[str] 

27 errors: Optional[str] 

28 

29 

30class _HiredisParser(BaseParser): 

31 "Parser class for connections using Hiredis" 

32 

33 def __init__(self, socket_read_size): 

34 if not HIREDIS_AVAILABLE: 

35 raise RedisError("Hiredis is not installed") 

36 self.socket_read_size = socket_read_size 

37 self._buffer = bytearray(socket_read_size) 

38 

39 def __del__(self): 

40 try: 

41 self.on_disconnect() 

42 except Exception: 

43 pass 

44 

45 def on_connect(self, connection, **kwargs): 

46 import hiredis 

47 

48 self._sock = connection._sock 

49 self._socket_timeout = connection.socket_timeout 

50 kwargs = { 

51 "protocolError": InvalidResponse, 

52 "replyError": self.parse_error, 

53 "errors": connection.encoder.encoding_errors, 

54 } 

55 

56 if connection.encoder.decode_responses: 

57 kwargs["encoding"] = connection.encoder.encoding 

58 self._reader = hiredis.Reader(**kwargs) 

59 self._next_response = False 

60 

61 def on_disconnect(self): 

62 self._sock = None 

63 self._reader = None 

64 self._next_response = False 

65 

66 def can_read(self, timeout): 

67 if not self._reader: 

68 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

69 

70 if self._next_response is False: 

71 self._next_response = self._reader.gets() 

72 if self._next_response is False: 

73 return self.read_from_socket(timeout=timeout, raise_on_timeout=False) 

74 return True 

75 

76 def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True): 

77 sock = self._sock 

78 custom_timeout = timeout is not SENTINEL 

79 try: 

80 if custom_timeout: 

81 sock.settimeout(timeout) 

82 bufflen = self._sock.recv_into(self._buffer) 

83 if bufflen == 0: 

84 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

85 self._reader.feed(self._buffer, 0, bufflen) 

86 # data was read from the socket and added to the buffer. 

87 # return True to indicate that data was read. 

88 return True 

89 except socket.timeout: 

90 if raise_on_timeout: 

91 raise TimeoutError("Timeout reading from socket") 

92 return False 

93 except NONBLOCKING_EXCEPTIONS as ex: 

94 # if we're in nonblocking mode and the recv raises a 

95 # blocking error, simply return False indicating that 

96 # there's no data to be read. otherwise raise the 

97 # original exception. 

98 allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1) 

99 if not raise_on_timeout and ex.errno == allowed: 

100 return False 

101 raise ConnectionError(f"Error while reading from socket: {ex.args}") 

102 finally: 

103 if custom_timeout: 

104 sock.settimeout(self._socket_timeout) 

105 

106 def read_response(self, disable_decoding=False): 

107 if not self._reader: 

108 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

109 

110 # _next_response might be cached from a can_read() call 

111 if self._next_response is not False: 

112 response = self._next_response 

113 self._next_response = False 

114 return response 

115 

116 if disable_decoding: 

117 response = self._reader.gets(False) 

118 else: 

119 response = self._reader.gets() 

120 

121 while response is False: 

122 self.read_from_socket() 

123 if disable_decoding: 

124 response = self._reader.gets(False) 

125 else: 

126 response = self._reader.gets() 

127 # if the response is a ConnectionError or the response is a list and 

128 # the first item is a ConnectionError, raise it as something bad 

129 # happened 

130 if isinstance(response, ConnectionError): 

131 raise response 

132 elif ( 

133 isinstance(response, list) 

134 and response 

135 and isinstance(response[0], ConnectionError) 

136 ): 

137 raise response[0] 

138 return response 

139 

140 

141class _AsyncHiredisParser(AsyncBaseParser): 

142 """Async implementation of parser class for connections using Hiredis""" 

143 

144 __slots__ = ("_reader",) 

145 

146 def __init__(self, socket_read_size: int): 

147 if not HIREDIS_AVAILABLE: 

148 raise RedisError("Hiredis is not available.") 

149 super().__init__(socket_read_size=socket_read_size) 

150 self._reader = None 

151 

152 def on_connect(self, connection): 

153 import hiredis 

154 

155 self._stream = connection._reader 

156 kwargs: _HiredisReaderArgs = { 

157 "protocolError": InvalidResponse, 

158 "replyError": self.parse_error, 

159 } 

160 if connection.encoder.decode_responses: 

161 kwargs["encoding"] = connection.encoder.encoding 

162 kwargs["errors"] = connection.encoder.encoding_errors 

163 

164 self._reader = hiredis.Reader(**kwargs) 

165 self._connected = True 

166 

167 def on_disconnect(self): 

168 self._connected = False 

169 

170 async def can_read_destructive(self): 

171 if not self._connected: 

172 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

173 if self._reader.gets(): 

174 return True 

175 try: 

176 async with async_timeout(0): 

177 return await self.read_from_socket() 

178 except asyncio.TimeoutError: 

179 return False 

180 

181 async def read_from_socket(self): 

182 buffer = await self._stream.read(self._read_size) 

183 if not buffer or not isinstance(buffer, bytes): 

184 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None 

185 self._reader.feed(buffer) 

186 # data was read from the socket and added to the buffer. 

187 # return True to indicate that data was read. 

188 return True 

189 

190 async def read_response( 

191 self, disable_decoding: bool = False 

192 ) -> Union[EncodableT, List[EncodableT]]: 

193 # If `on_disconnect()` has been called, prohibit any more reads 

194 # even if they could happen because data might be present. 

195 # We still allow reads in progress to finish 

196 if not self._connected: 

197 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None 

198 

199 if disable_decoding: 

200 response = self._reader.gets(False) 

201 else: 

202 response = self._reader.gets() 

203 while response is False: 

204 await self.read_from_socket() 

205 if disable_decoding: 

206 response = self._reader.gets(False) 

207 else: 

208 response = self._reader.gets() 

209 

210 # if the response is a ConnectionError or the response is a list and 

211 # the first item is a ConnectionError, raise it as something bad 

212 # happened 

213 if isinstance(response, ConnectionError): 

214 raise response 

215 elif ( 

216 isinstance(response, list) 

217 and response 

218 and isinstance(response[0], ConnectionError) 

219 ): 

220 raise response[0] 

221 return response