Coverage for /pythoncovmergedfiles/medio/medio/src/httplib2/httplib2/decode.py: 27%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from typing import Protocol
2import zlib
5class DecodeRatioError(Exception):
6 """Output-to-input amplification ratio exceeded the configured limit."""
9class DecodeLimitError(Exception):
10 """Total output length exceeded the hard limit."""
13class DecoderProtocol(Protocol):
14 @property
15 def needs_input(self) -> bool:
16 ...
18 def decode(self, b: bytes) -> bytes:
19 ...
21 def flush(self) -> bytes:
22 ...
24 def consume_bytes(self, data: bytes, chunk_size: int = 64 << 10) -> bytes:
25 out = bytearray()
26 if chunk_size == 0:
27 chunk_size = len(data)
28 for i in range(0, len(data), chunk_size):
29 chunk = data[i : i + chunk_size]
30 out.extend(self.decode(chunk))
31 out.extend(self.flush())
32 return bytes(out)
35class ZlibDecoder(DecoderProtocol):
36 """
37 Thin wrapper around zlib.Decompressor conforming to the Decoder interface.
39 Note: zlib pushes all available decompressed data immediately upon receiving
40 input. It never holds back output requiring `decode(b"")` to extract it.
41 Thus, `needs_input` naturally remains True.
42 """
44 __slots__ = ("_decoder",)
46 WBITS_DEFLATE = -15
47 WBITS_ZLIB = 15
48 WBITS_GZIP = 15 | 16
49 WBITS_AUTO_GZIP_ZLIB = 15 | 32 # but not deflate
51 def __init__(self, wbits: int = WBITS_AUTO_GZIP_ZLIB):
52 self._decoder: zlib._Decompress | None = zlib.decompressobj(wbits)
54 @property
55 def needs_input(self) -> bool:
56 if self._decoder is None:
57 raise RuntimeError("used after flush()")
58 return not self._decoder.eof
60 def decode(self, b: bytes) -> bytes:
61 if self._decoder is None:
62 raise RuntimeError("used after flush()")
63 return self._decoder.decompress(b)
65 def flush(self) -> bytes:
66 if self._decoder is None:
67 raise RuntimeError("used after flush()")
68 result = self._decoder.flush()
69 self._decoder = None
70 return result
73def DeflateDecoder() -> ZlibDecoder:
74 return ZlibDecoder(ZlibDecoder.WBITS_DEFLATE)
77class LimitDecoder(DecoderProtocol):
78 __slots__ = (
79 "_decoder",
80 "_ratio",
81 "_chunk_size",
82 "_safe_limit",
83 "_hard_limit",
84 "_consumed_length",
85 "_output_length",
86 "_input_buffer",
87 "_flushed",
88 )
90 def __init__(
91 self,
92 decoder: DecoderProtocol,
93 ratio: float = 100,
94 chunk_size: int = 64 << 10,
95 safe_limit: int = 10 << 20,
96 hard_limit: int = 10 << 30,
97 ) -> None:
98 if ratio < 0:
99 raise ValueError(f"LimitDecoder() ratio={ratio} expected >= 0")
100 if chunk_size < 0:
101 raise ValueError(f"LimitDecoder() chunk_size={chunk_size} expected >= 0")
102 if safe_limit < 0:
103 raise ValueError(f"LimitDecoder() safe_limit={safe_limit} expected >= 0")
104 if hard_limit < 0:
105 raise ValueError(f"LimitDecoder() safe_limit={safe_limit} expected >= 0")
107 self._decoder: DecoderProtocol = decoder
108 self._ratio: float = ratio
109 self._chunk_size: int = chunk_size
110 self._safe_limit: int = safe_limit
111 self._hard_limit: int = hard_limit
112 self._consumed_length: int = 0
113 self._output_length: int = 0
114 self._input_buffer: bytearray = bytearray()
115 self._flushed: bool = False
117 def _check_limits(self) -> None:
118 if (self._hard_limit > 0) and (self._output_length > self._hard_limit):
119 raise DecodeLimitError(f"Output length {self._output_length} exceeds hard limit {self._hard_limit}")
120 if (self._safe_limit > 0) and (self._output_length < self._safe_limit):
121 return
122 if (self._ratio > 0) and (self._output_length > self._consumed_length * self._ratio):
123 actual_ratio = self._output_length / self._consumed_length if self._consumed_length > 0 else float("inf")
124 raise DecodeRatioError(
125 f"Amplification ratio {actual_ratio:.1f} ({self._output_length}/{self._consumed_length})"
126 f" exceeds limit {self._ratio}"
127 )
129 @property
130 def needs_input(self) -> bool:
131 return self._decoder.needs_input
133 def decode(self, b: bytes) -> bytes:
134 if self._flushed:
135 raise RuntimeError("decode() called after flush()")
136 output = self._pump(b)
137 return bytes(output)
139 def flush(self) -> bytes:
140 if self._flushed:
141 raise RuntimeError("flush() called more than once")
142 self._flushed = True
144 output = self._pump(b"")
146 data = self._decoder.flush()
147 output.extend(data)
148 self._output_length += len(data)
149 self._check_limits()
151 return bytes(output)
153 def _pump(self, b: bytes) -> bytearray:
154 self._input_buffer.extend(b)
156 output = bytearray()
157 while True:
158 if not self._decoder.needs_input:
159 data = self._decoder.decode(b"")
160 if data:
161 output.extend(data)
162 self._output_length += len(data)
163 self._check_limits()
164 continue
166 if self._input_buffer:
167 chunk = bytes(self._input_buffer[: self._chunk_size])
168 del self._input_buffer[: self._chunk_size]
170 data = self._decoder.decode(chunk)
171 self._consumed_length += len(chunk)
173 if data:
174 output.extend(data)
175 self._output_length += len(data)
176 self._check_limits()
178 continue
180 # neither input nor decoder progress
181 break
183 return output