Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp2.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

81 statements  

1from typing import Any, Union 

2 

3from ..exceptions import ConnectionError, InvalidResponse, ResponseError 

4from ..typing import EncodableT 

5from ..utils import SENTINEL 

6from .base import _AsyncRESPBase, _RESPBase 

7from .socket import SERVER_CLOSED_CONNECTION_ERROR 

8 

9 

10class _RESP2Parser(_RESPBase): 

11 """RESP2 protocol implementation""" 

12 

13 def read_response( 

14 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL 

15 ): 

16 pos = self._buffer.get_pos() if self._buffer else None 

17 try: 

18 result = self._read_response( 

19 disable_decoding=disable_decoding, timeout=timeout 

20 ) 

21 except BaseException: 

22 if self._buffer: 

23 self._buffer.rewind(pos) 

24 raise 

25 else: 

26 self._buffer.purge() 

27 return result 

28 

29 def _read_response( 

30 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL 

31 ): 

32 raw = self._buffer.readline(timeout=timeout) 

33 if not raw: 

34 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

35 

36 byte, response = raw[:1], raw[1:] 

37 

38 # server returned an error 

39 if byte == b"-": 

40 response = response.decode("utf-8", errors="replace") 

41 error = self.parse_error(response) 

42 # if the error is a ConnectionError, raise immediately so the user 

43 # is notified 

44 if isinstance(error, ConnectionError): 

45 raise error 

46 # otherwise, we're dealing with a ResponseError that might belong 

47 # inside a pipeline response. the connection's read_response() 

48 # and/or the pipeline's execute() will raise this error if 

49 # necessary, so just return the exception instance here. 

50 return error 

51 # single value 

52 elif byte == b"+": 

53 pass 

54 # int value 

55 elif byte == b":": 

56 return int(response) 

57 # bulk response 

58 elif byte == b"$" and response == b"-1": 

59 return None 

60 elif byte == b"$": 

61 response = self._buffer.read(int(response), timeout=timeout) 

62 # multi-bulk response 

63 elif byte == b"*" and response == b"-1": 

64 return None 

65 elif byte == b"*": 

66 response = [ 

67 self._read_response(disable_decoding=disable_decoding, timeout=timeout) 

68 for i in range(int(response)) 

69 ] 

70 else: 

71 raise InvalidResponse(f"Protocol Error: {raw!r}") 

72 

73 if disable_decoding is False: 

74 response = self.encoder.decode(response) 

75 return response 

76 

77 

78class _AsyncRESP2Parser(_AsyncRESPBase): 

79 """Async class for the RESP2 protocol""" 

80 

81 async def read_response(self, disable_decoding: bool = False): 

82 if not self._connected: 

83 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

84 if self._chunks: 

85 # augment parsing buffer with previously read data 

86 self._buffer += b"".join(self._chunks) 

87 self._chunks.clear() 

88 self._pos = 0 

89 response = await self._read_response(disable_decoding=disable_decoding) 

90 # Successfully parsing a response allows us to clear our parsing buffer 

91 self._clear() 

92 return response 

93 

94 async def _read_response( 

95 self, disable_decoding: bool = False 

96 ) -> Union[EncodableT, ResponseError, None]: 

97 raw = await self._readline() 

98 response: Any 

99 byte, response = raw[:1], raw[1:] 

100 

101 # server returned an error 

102 if byte == b"-": 

103 response = response.decode("utf-8", errors="replace") 

104 error = self.parse_error(response) 

105 # if the error is a ConnectionError, raise immediately so the user 

106 # is notified 

107 if isinstance(error, ConnectionError): 

108 self._clear() # Successful parse 

109 raise error 

110 # otherwise, we're dealing with a ResponseError that might belong 

111 # inside a pipeline response. the connection's read_response() 

112 # and/or the pipeline's execute() will raise this error if 

113 # necessary, so just return the exception instance here. 

114 return error 

115 # single value 

116 elif byte == b"+": 

117 pass 

118 # int value 

119 elif byte == b":": 

120 return int(response) 

121 # bulk response 

122 elif byte == b"$" and response == b"-1": 

123 return None 

124 elif byte == b"$": 

125 response = await self._read(int(response)) 

126 # multi-bulk response 

127 elif byte == b"*" and response == b"-1": 

128 return None 

129 elif byte == b"*": 

130 response = [ 

131 (await self._read_response(disable_decoding)) 

132 for _ in range(int(response)) # noqa 

133 ] 

134 else: 

135 raise InvalidResponse(f"Protocol Error: {raw!r}") 

136 

137 if disable_decoding is False: 

138 response = self.encoder.decode(response) 

139 return response