Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/redis/_parsers/hiredis.py: 26%
141 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-04-23 06:16 +0000
1import asyncio
2import socket
3import sys
4from typing import Callable, List, Optional, TypedDict, Union
6if sys.version_info.major >= 3 and sys.version_info.minor >= 11:
7 from asyncio import timeout as async_timeout
8else:
9 from async_timeout import timeout as async_timeout
11from ..exceptions import ConnectionError, InvalidResponse, RedisError
12from ..typing import EncodableT
13from ..utils import HIREDIS_AVAILABLE
14from .base import AsyncBaseParser, BaseParser
15from .socket import (
16 NONBLOCKING_EXCEPTION_ERROR_NUMBERS,
17 NONBLOCKING_EXCEPTIONS,
18 SENTINEL,
19 SERVER_CLOSED_CONNECTION_ERROR,
20)
23class _HiredisReaderArgs(TypedDict, total=False):
24 protocolError: Callable[[str], Exception]
25 replyError: Callable[[str], Exception]
26 encoding: Optional[str]
27 errors: Optional[str]
30class _HiredisParser(BaseParser):
31 "Parser class for connections using Hiredis"
33 def __init__(self, socket_read_size):
34 if not HIREDIS_AVAILABLE:
35 raise RedisError("Hiredis is not installed")
36 self.socket_read_size = socket_read_size
37 self._buffer = bytearray(socket_read_size)
39 def __del__(self):
40 try:
41 self.on_disconnect()
42 except Exception:
43 pass
45 def on_connect(self, connection, **kwargs):
46 import hiredis
48 self._sock = connection._sock
49 self._socket_timeout = connection.socket_timeout
50 kwargs = {
51 "protocolError": InvalidResponse,
52 "replyError": self.parse_error,
53 "errors": connection.encoder.encoding_errors,
54 }
56 if connection.encoder.decode_responses:
57 kwargs["encoding"] = connection.encoder.encoding
58 self._reader = hiredis.Reader(**kwargs)
59 self._next_response = False
61 def on_disconnect(self):
62 self._sock = None
63 self._reader = None
64 self._next_response = False
66 def can_read(self, timeout):
67 if not self._reader:
68 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
70 if self._next_response is False:
71 self._next_response = self._reader.gets()
72 if self._next_response is False:
73 return self.read_from_socket(timeout=timeout, raise_on_timeout=False)
74 return True
76 def read_from_socket(self, timeout=SENTINEL, raise_on_timeout=True):
77 sock = self._sock
78 custom_timeout = timeout is not SENTINEL
79 try:
80 if custom_timeout:
81 sock.settimeout(timeout)
82 bufflen = self._sock.recv_into(self._buffer)
83 if bufflen == 0:
84 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
85 self._reader.feed(self._buffer, 0, bufflen)
86 # data was read from the socket and added to the buffer.
87 # return True to indicate that data was read.
88 return True
89 except socket.timeout:
90 if raise_on_timeout:
91 raise TimeoutError("Timeout reading from socket")
92 return False
93 except NONBLOCKING_EXCEPTIONS as ex:
94 # if we're in nonblocking mode and the recv raises a
95 # blocking error, simply return False indicating that
96 # there's no data to be read. otherwise raise the
97 # original exception.
98 allowed = NONBLOCKING_EXCEPTION_ERROR_NUMBERS.get(ex.__class__, -1)
99 if not raise_on_timeout and ex.errno == allowed:
100 return False
101 raise ConnectionError(f"Error while reading from socket: {ex.args}")
102 finally:
103 if custom_timeout:
104 sock.settimeout(self._socket_timeout)
106 def read_response(self, disable_decoding=False):
107 if not self._reader:
108 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
110 # _next_response might be cached from a can_read() call
111 if self._next_response is not False:
112 response = self._next_response
113 self._next_response = False
114 return response
116 if disable_decoding:
117 response = self._reader.gets(False)
118 else:
119 response = self._reader.gets()
121 while response is False:
122 self.read_from_socket()
123 if disable_decoding:
124 response = self._reader.gets(False)
125 else:
126 response = self._reader.gets()
127 # if the response is a ConnectionError or the response is a list and
128 # the first item is a ConnectionError, raise it as something bad
129 # happened
130 if isinstance(response, ConnectionError):
131 raise response
132 elif (
133 isinstance(response, list)
134 and response
135 and isinstance(response[0], ConnectionError)
136 ):
137 raise response[0]
138 return response
141class _AsyncHiredisParser(AsyncBaseParser):
142 """Async implementation of parser class for connections using Hiredis"""
144 __slots__ = ("_reader",)
146 def __init__(self, socket_read_size: int):
147 if not HIREDIS_AVAILABLE:
148 raise RedisError("Hiredis is not available.")
149 super().__init__(socket_read_size=socket_read_size)
150 self._reader = None
152 def on_connect(self, connection):
153 import hiredis
155 self._stream = connection._reader
156 kwargs: _HiredisReaderArgs = {
157 "protocolError": InvalidResponse,
158 "replyError": self.parse_error,
159 }
160 if connection.encoder.decode_responses:
161 kwargs["encoding"] = connection.encoder.encoding
162 kwargs["errors"] = connection.encoder.encoding_errors
164 self._reader = hiredis.Reader(**kwargs)
165 self._connected = True
167 def on_disconnect(self):
168 self._connected = False
170 async def can_read_destructive(self):
171 if not self._connected:
172 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR)
173 if self._reader.gets():
174 return True
175 try:
176 async with async_timeout(0):
177 return await self.read_from_socket()
178 except asyncio.TimeoutError:
179 return False
181 async def read_from_socket(self):
182 buffer = await self._stream.read(self._read_size)
183 if not buffer or not isinstance(buffer, bytes):
184 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
185 self._reader.feed(buffer)
186 # data was read from the socket and added to the buffer.
187 # return True to indicate that data was read.
188 return True
190 async def read_response(
191 self, disable_decoding: bool = False
192 ) -> Union[EncodableT, List[EncodableT]]:
193 # If `on_disconnect()` has been called, prohibit any more reads
194 # even if they could happen because data might be present.
195 # We still allow reads in progress to finish
196 if not self._connected:
197 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) from None
199 if disable_decoding:
200 response = self._reader.gets(False)
201 else:
202 response = self._reader.gets()
203 while response is False:
204 await self.read_from_socket()
205 if disable_decoding:
206 response = self._reader.gets(False)
207 else:
208 response = self._reader.gets()
210 # if the response is a ConnectionError or the response is a list and
211 # the first item is a ConnectionError, raise it as something bad
212 # happened
213 if isinstance(response, ConnectionError):
214 raise response
215 elif (
216 isinstance(response, list)
217 and response
218 and isinstance(response[0], ConnectionError)
219 ):
220 raise response[0]
221 return response