Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/redis/_parsers/resp2.py: 14%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

80 statements  

1from typing import Any, Union 

2 

3from ..exceptions import ConnectionError, InvalidResponse, ResponseError 

4from ..typing import EncodableT 

5from .base import _AsyncRESPBase, _RESPBase 

6from .socket import SENTINEL, SERVER_CLOSED_CONNECTION_ERROR 

7 

8 

9class _RESP2Parser(_RESPBase): 

10 """RESP2 protocol implementation""" 

11 

12 def read_response( 

13 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL 

14 ): 

15 pos = self._buffer.get_pos() if self._buffer else None 

16 try: 

17 result = self._read_response( 

18 disable_decoding=disable_decoding, timeout=timeout 

19 ) 

20 except BaseException: 

21 if self._buffer: 

22 self._buffer.rewind(pos) 

23 raise 

24 else: 

25 self._buffer.purge() 

26 return result 

27 

28 def _read_response( 

29 self, disable_decoding=False, timeout: Union[float, object] = SENTINEL 

30 ): 

31 raw = self._buffer.readline(timeout=timeout) 

32 if not raw: 

33 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

34 

35 byte, response = raw[:1], raw[1:] 

36 

37 # server returned an error 

38 if byte == b"-": 

39 response = response.decode("utf-8", errors="replace") 

40 error = self.parse_error(response) 

41 # if the error is a ConnectionError, raise immediately so the user 

42 # is notified 

43 if isinstance(error, ConnectionError): 

44 raise error 

45 # otherwise, we're dealing with a ResponseError that might belong 

46 # inside a pipeline response. the connection's read_response() 

47 # and/or the pipeline's execute() will raise this error if 

48 # necessary, so just return the exception instance here. 

49 return error 

50 # single value 

51 elif byte == b"+": 

52 pass 

53 # int value 

54 elif byte == b":": 

55 return int(response) 

56 # bulk response 

57 elif byte == b"$" and response == b"-1": 

58 return None 

59 elif byte == b"$": 

60 response = self._buffer.read(int(response), timeout=timeout) 

61 # multi-bulk response 

62 elif byte == b"*" and response == b"-1": 

63 return None 

64 elif byte == b"*": 

65 response = [ 

66 self._read_response(disable_decoding=disable_decoding, timeout=timeout) 

67 for i in range(int(response)) 

68 ] 

69 else: 

70 raise InvalidResponse(f"Protocol Error: {raw!r}") 

71 

72 if disable_decoding is False: 

73 response = self.encoder.decode(response) 

74 return response 

75 

76 

77class _AsyncRESP2Parser(_AsyncRESPBase): 

78 """Async class for the RESP2 protocol""" 

79 

80 async def read_response(self, disable_decoding: bool = False): 

81 if not self._connected: 

82 raise ConnectionError(SERVER_CLOSED_CONNECTION_ERROR) 

83 if self._chunks: 

84 # augment parsing buffer with previously read data 

85 self._buffer += b"".join(self._chunks) 

86 self._chunks.clear() 

87 self._pos = 0 

88 response = await self._read_response(disable_decoding=disable_decoding) 

89 # Successfully parsing a response allows us to clear our parsing buffer 

90 self._clear() 

91 return response 

92 

93 async def _read_response( 

94 self, disable_decoding: bool = False 

95 ) -> Union[EncodableT, ResponseError, None]: 

96 raw = await self._readline() 

97 response: Any 

98 byte, response = raw[:1], raw[1:] 

99 

100 # server returned an error 

101 if byte == b"-": 

102 response = response.decode("utf-8", errors="replace") 

103 error = self.parse_error(response) 

104 # if the error is a ConnectionError, raise immediately so the user 

105 # is notified 

106 if isinstance(error, ConnectionError): 

107 self._clear() # Successful parse 

108 raise error 

109 # otherwise, we're dealing with a ResponseError that might belong 

110 # inside a pipeline response. the connection's read_response() 

111 # and/or the pipeline's execute() will raise this error if 

112 # necessary, so just return the exception instance here. 

113 return error 

114 # single value 

115 elif byte == b"+": 

116 pass 

117 # int value 

118 elif byte == b":": 

119 return int(response) 

120 # bulk response 

121 elif byte == b"$" and response == b"-1": 

122 return None 

123 elif byte == b"$": 

124 response = await self._read(int(response)) 

125 # multi-bulk response 

126 elif byte == b"*" and response == b"-1": 

127 return None 

128 elif byte == b"*": 

129 response = [ 

130 (await self._read_response(disable_decoding)) 

131 for _ in range(int(response)) # noqa 

132 ] 

133 else: 

134 raise InvalidResponse(f"Protocol Error: {raw!r}") 

135 

136 if disable_decoding is False: 

137 response = self.encoder.decode(response) 

138 return response