Coverage for /pythoncovmergedfiles/medio/medio/src/httplib2/httplib2/decode.py: 27%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

115 statements  

1from typing import Protocol 

2import zlib 

3 

4 

5class DecodeRatioError(Exception): 

6 """Output-to-input amplification ratio exceeded the configured limit.""" 

7 

8 

9class DecodeLimitError(Exception): 

10 """Total output length exceeded the hard limit.""" 

11 

12 

13class DecoderProtocol(Protocol): 

14 @property 

15 def needs_input(self) -> bool: 

16 ... 

17 

18 def decode(self, b: bytes) -> bytes: 

19 ... 

20 

21 def flush(self) -> bytes: 

22 ... 

23 

24 def consume_bytes(self, data: bytes, chunk_size: int = 64 << 10) -> bytes: 

25 out = bytearray() 

26 if chunk_size == 0: 

27 chunk_size = len(data) 

28 for i in range(0, len(data), chunk_size): 

29 chunk = data[i : i + chunk_size] 

30 out.extend(self.decode(chunk)) 

31 out.extend(self.flush()) 

32 return bytes(out) 

33 

34 

35class ZlibDecoder(DecoderProtocol): 

36 """ 

37 Thin wrapper around zlib.Decompressor conforming to the Decoder interface. 

38 

39 Note: zlib pushes all available decompressed data immediately upon receiving 

40 input. It never holds back output requiring `decode(b"")` to extract it. 

41 Thus, `needs_input` naturally remains True. 

42 """ 

43 

44 __slots__ = ("_decoder",) 

45 

46 WBITS_DEFLATE = -15 

47 WBITS_ZLIB = 15 

48 WBITS_GZIP = 15 | 16 

49 WBITS_AUTO_GZIP_ZLIB = 15 | 32 # but not deflate 

50 

51 def __init__(self, wbits: int = WBITS_AUTO_GZIP_ZLIB): 

52 self._decoder: zlib._Decompress | None = zlib.decompressobj(wbits) 

53 

54 @property 

55 def needs_input(self) -> bool: 

56 if self._decoder is None: 

57 raise RuntimeError("used after flush()") 

58 return not self._decoder.eof 

59 

60 def decode(self, b: bytes) -> bytes: 

61 if self._decoder is None: 

62 raise RuntimeError("used after flush()") 

63 return self._decoder.decompress(b) 

64 

65 def flush(self) -> bytes: 

66 if self._decoder is None: 

67 raise RuntimeError("used after flush()") 

68 result = self._decoder.flush() 

69 self._decoder = None 

70 return result 

71 

72 

73def DeflateDecoder() -> ZlibDecoder: 

74 return ZlibDecoder(ZlibDecoder.WBITS_DEFLATE) 

75 

76 

77class LimitDecoder(DecoderProtocol): 

78 __slots__ = ( 

79 "_decoder", 

80 "_ratio", 

81 "_chunk_size", 

82 "_safe_limit", 

83 "_hard_limit", 

84 "_consumed_length", 

85 "_output_length", 

86 "_input_buffer", 

87 "_flushed", 

88 ) 

89 

90 def __init__( 

91 self, 

92 decoder: DecoderProtocol, 

93 ratio: float = 100, 

94 chunk_size: int = 64 << 10, 

95 safe_limit: int = 10 << 20, 

96 hard_limit: int = 10 << 30, 

97 ) -> None: 

98 if ratio < 0: 

99 raise ValueError(f"LimitDecoder() ratio={ratio} expected >= 0") 

100 if chunk_size < 0: 

101 raise ValueError(f"LimitDecoder() chunk_size={chunk_size} expected >= 0") 

102 if safe_limit < 0: 

103 raise ValueError(f"LimitDecoder() safe_limit={safe_limit} expected >= 0") 

104 if hard_limit < 0: 

105 raise ValueError(f"LimitDecoder() safe_limit={safe_limit} expected >= 0") 

106 

107 self._decoder: DecoderProtocol = decoder 

108 self._ratio: float = ratio 

109 self._chunk_size: int = chunk_size 

110 self._safe_limit: int = safe_limit 

111 self._hard_limit: int = hard_limit 

112 self._consumed_length: int = 0 

113 self._output_length: int = 0 

114 self._input_buffer: bytearray = bytearray() 

115 self._flushed: bool = False 

116 

117 def _check_limits(self) -> None: 

118 if (self._hard_limit > 0) and (self._output_length > self._hard_limit): 

119 raise DecodeLimitError(f"Output length {self._output_length} exceeds hard limit {self._hard_limit}") 

120 if (self._safe_limit > 0) and (self._output_length < self._safe_limit): 

121 return 

122 if (self._ratio > 0) and (self._output_length > self._consumed_length * self._ratio): 

123 actual_ratio = self._output_length / self._consumed_length if self._consumed_length > 0 else float("inf") 

124 raise DecodeRatioError( 

125 f"Amplification ratio {actual_ratio:.1f} ({self._output_length}/{self._consumed_length})" 

126 f" exceeds limit {self._ratio}" 

127 ) 

128 

129 @property 

130 def needs_input(self) -> bool: 

131 return self._decoder.needs_input 

132 

133 def decode(self, b: bytes) -> bytes: 

134 if self._flushed: 

135 raise RuntimeError("decode() called after flush()") 

136 output = self._pump(b) 

137 return bytes(output) 

138 

139 def flush(self) -> bytes: 

140 if self._flushed: 

141 raise RuntimeError("flush() called more than once") 

142 self._flushed = True 

143 

144 output = self._pump(b"") 

145 

146 data = self._decoder.flush() 

147 output.extend(data) 

148 self._output_length += len(data) 

149 self._check_limits() 

150 

151 return bytes(output) 

152 

153 def _pump(self, b: bytes) -> bytearray: 

154 self._input_buffer.extend(b) 

155 

156 output = bytearray() 

157 while True: 

158 if not self._decoder.needs_input: 

159 data = self._decoder.decode(b"") 

160 if data: 

161 output.extend(data) 

162 self._output_length += len(data) 

163 self._check_limits() 

164 continue 

165 

166 if self._input_buffer: 

167 chunk = bytes(self._input_buffer[: self._chunk_size]) 

168 del self._input_buffer[: self._chunk_size] 

169 

170 data = self._decoder.decode(chunk) 

171 self._consumed_length += len(chunk) 

172 

173 if data: 

174 output.extend(data) 

175 self._output_length += len(data) 

176 self._check_limits() 

177 

178 continue 

179 

180 # neither input nor decoder progress 

181 break 

182 

183 return output