Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpx/_decoders.py: 29%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Handlers for Content-Encoding.
4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
5"""
7from __future__ import annotations
9import codecs
10import io
11import typing
12import zlib
14from ._exceptions import DecodingError
16# Brotli support is optional
17try:
18 # The C bindings in `brotli` are recommended for CPython.
19 import brotli
20except ImportError: # pragma: no cover
21 try:
22 # The CFFI bindings in `brotlicffi` are recommended for PyPy
23 # and other environments.
24 import brotlicffi as brotli
25 except ImportError:
26 brotli = None
29# Zstandard support is optional
30try:
31 import zstandard
32except ImportError: # pragma: no cover
33 zstandard = None # type: ignore
36class ContentDecoder:
37 def decode(self, data: bytes) -> bytes:
38 raise NotImplementedError() # pragma: no cover
40 def flush(self) -> bytes:
41 raise NotImplementedError() # pragma: no cover
44class IdentityDecoder(ContentDecoder):
45 """
46 Handle unencoded data.
47 """
49 def decode(self, data: bytes) -> bytes:
50 return data
52 def flush(self) -> bytes:
53 return b""
56class DeflateDecoder(ContentDecoder):
57 """
58 Handle 'deflate' decoding.
60 See: https://stackoverflow.com/questions/1838699
61 """
63 def __init__(self) -> None:
64 self.first_attempt = True
65 self.decompressor = zlib.decompressobj()
67 def decode(self, data: bytes) -> bytes:
68 was_first_attempt = self.first_attempt
69 self.first_attempt = False
70 try:
71 return self.decompressor.decompress(data)
72 except zlib.error as exc:
73 if was_first_attempt:
74 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
75 return self.decode(data)
76 raise DecodingError(str(exc)) from exc
78 def flush(self) -> bytes:
79 try:
80 return self.decompressor.flush()
81 except zlib.error as exc: # pragma: no cover
82 raise DecodingError(str(exc)) from exc
85class GZipDecoder(ContentDecoder):
86 """
87 Handle 'gzip' decoding.
89 See: https://stackoverflow.com/questions/1838699
90 """
92 def __init__(self) -> None:
93 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
95 def decode(self, data: bytes) -> bytes:
96 try:
97 return self.decompressor.decompress(data)
98 except zlib.error as exc:
99 raise DecodingError(str(exc)) from exc
101 def flush(self) -> bytes:
102 try:
103 return self.decompressor.flush()
104 except zlib.error as exc: # pragma: no cover
105 raise DecodingError(str(exc)) from exc
108class BrotliDecoder(ContentDecoder):
109 """
110 Handle 'brotli' decoding.
112 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/
113 or `pip install brotli`. See https://github.com/google/brotli
114 Supports both 'brotlipy' and 'Brotli' packages since they share an import
115 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
116 """
118 def __init__(self) -> None:
119 if brotli is None: # pragma: no cover
120 raise ImportError(
121 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
122 "packages have been installed. "
123 "Make sure to install httpx using `pip install httpx[brotli]`."
124 ) from None
126 self.decompressor = brotli.Decompressor()
127 self.seen_data = False
128 self._decompress: typing.Callable[[bytes], bytes]
129 if hasattr(self.decompressor, "decompress"):
130 # The 'brotlicffi' package.
131 self._decompress = self.decompressor.decompress # pragma: no cover
132 else:
133 # The 'brotli' package.
134 self._decompress = self.decompressor.process # pragma: no cover
136 def decode(self, data: bytes) -> bytes:
137 if not data:
138 return b""
139 self.seen_data = True
140 try:
141 return self._decompress(data)
142 except brotli.error as exc:
143 raise DecodingError(str(exc)) from exc
145 def flush(self) -> bytes:
146 if not self.seen_data:
147 return b""
148 try:
149 if hasattr(self.decompressor, "finish"):
150 # Only available in the 'brotlicffi' package.
152 # As the decompressor decompresses eagerly, this
153 # will never actually emit any data. However, it will potentially throw
154 # errors if a truncated or damaged data stream has been used.
155 self.decompressor.finish() # pragma: no cover
156 return b""
157 except brotli.error as exc: # pragma: no cover
158 raise DecodingError(str(exc)) from exc
161class ZStandardDecoder(ContentDecoder):
162 """
163 Handle 'zstd' RFC 8878 decoding.
165 Requires `pip install zstandard`.
166 Can be installed as a dependency of httpx using `pip install httpx[zstd]`.
167 """
169 # inspired by the ZstdDecoder implementation in urllib3
170 def __init__(self) -> None:
171 if zstandard is None: # pragma: no cover
172 raise ImportError(
173 "Using 'ZStandardDecoder', ..."
174 "Make sure to install httpx using `pip install httpx[zstd]`."
175 ) from None
177 self.decompressor = zstandard.ZstdDecompressor().decompressobj()
178 self.seen_data = False
180 def decode(self, data: bytes) -> bytes:
181 assert zstandard is not None
182 self.seen_data = True
183 output = io.BytesIO()
184 try:
185 output.write(self.decompressor.decompress(data))
186 while self.decompressor.eof and self.decompressor.unused_data:
187 unused_data = self.decompressor.unused_data
188 self.decompressor = zstandard.ZstdDecompressor().decompressobj()
189 output.write(self.decompressor.decompress(unused_data))
190 except zstandard.ZstdError as exc:
191 raise DecodingError(str(exc)) from exc
192 return output.getvalue()
194 def flush(self) -> bytes:
195 if not self.seen_data:
196 return b""
197 ret = self.decompressor.flush() # note: this is a no-op
198 if not self.decompressor.eof:
199 raise DecodingError("Zstandard data is incomplete") # pragma: no cover
200 return bytes(ret)
203class MultiDecoder(ContentDecoder):
204 """
205 Handle the case where multiple encodings have been applied.
206 """
208 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
209 """
210 'children' should be a sequence of decoders in the order in which
211 each was applied.
212 """
213 # Note that we reverse the order for decoding.
214 self.children = list(reversed(children))
216 def decode(self, data: bytes) -> bytes:
217 for child in self.children:
218 data = child.decode(data)
219 return data
221 def flush(self) -> bytes:
222 data = b""
223 for child in self.children:
224 data = child.decode(data) + child.flush()
225 return data
228class ByteChunker:
229 """
230 Handles returning byte content in fixed-size chunks.
231 """
233 def __init__(self, chunk_size: int | None = None) -> None:
234 self._buffer = io.BytesIO()
235 self._chunk_size = chunk_size
237 def decode(self, content: bytes) -> list[bytes]:
238 if self._chunk_size is None:
239 return [content] if content else []
241 self._buffer.write(content)
242 if self._buffer.tell() >= self._chunk_size:
243 value = self._buffer.getvalue()
244 chunks = [
245 value[i : i + self._chunk_size]
246 for i in range(0, len(value), self._chunk_size)
247 ]
248 if len(chunks[-1]) == self._chunk_size:
249 self._buffer.seek(0)
250 self._buffer.truncate()
251 return chunks
252 else:
253 self._buffer.seek(0)
254 self._buffer.write(chunks[-1])
255 self._buffer.truncate()
256 return chunks[:-1]
257 else:
258 return []
260 def flush(self) -> list[bytes]:
261 value = self._buffer.getvalue()
262 self._buffer.seek(0)
263 self._buffer.truncate()
264 return [value] if value else []
267class TextChunker:
268 """
269 Handles returning text content in fixed-size chunks.
270 """
272 def __init__(self, chunk_size: int | None = None) -> None:
273 self._buffer = io.StringIO()
274 self._chunk_size = chunk_size
276 def decode(self, content: str) -> list[str]:
277 if self._chunk_size is None:
278 return [content] if content else []
280 self._buffer.write(content)
281 if self._buffer.tell() >= self._chunk_size:
282 value = self._buffer.getvalue()
283 chunks = [
284 value[i : i + self._chunk_size]
285 for i in range(0, len(value), self._chunk_size)
286 ]
287 if len(chunks[-1]) == self._chunk_size:
288 self._buffer.seek(0)
289 self._buffer.truncate()
290 return chunks
291 else:
292 self._buffer.seek(0)
293 self._buffer.write(chunks[-1])
294 self._buffer.truncate()
295 return chunks[:-1]
296 else:
297 return []
299 def flush(self) -> list[str]:
300 value = self._buffer.getvalue()
301 self._buffer.seek(0)
302 self._buffer.truncate()
303 return [value] if value else []
306class TextDecoder:
307 """
308 Handles incrementally decoding bytes into text
309 """
311 def __init__(self, encoding: str = "utf-8") -> None:
312 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
314 def decode(self, data: bytes) -> str:
315 return self.decoder.decode(data)
317 def flush(self) -> str:
318 return self.decoder.decode(b"", True)
321class LineDecoder:
322 """
323 Handles incrementally reading lines from text.
325 Has the same behaviour as the stdllib splitlines,
326 but handling the input iteratively.
327 """
329 def __init__(self) -> None:
330 self.buffer: list[str] = []
331 self.trailing_cr: bool = False
333 def decode(self, text: str) -> list[str]:
334 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
335 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
337 # We always push a trailing `\r` into the next decode iteration.
338 if self.trailing_cr:
339 text = "\r" + text
340 self.trailing_cr = False
341 if text.endswith("\r"):
342 self.trailing_cr = True
343 text = text[:-1]
345 if not text:
346 # NOTE: the edge case input of empty text doesn't occur in practice,
347 # because other httpx internals filter out this value
348 return [] # pragma: no cover
350 trailing_newline = text[-1] in NEWLINE_CHARS
351 lines = text.splitlines()
353 if len(lines) == 1 and not trailing_newline:
354 # No new lines, buffer the input and continue.
355 self.buffer.append(lines[0])
356 return []
358 if self.buffer:
359 # Include any existing buffer in the first portion of the
360 # splitlines result.
361 lines = ["".join(self.buffer) + lines[0]] + lines[1:]
362 self.buffer = []
364 if not trailing_newline:
365 # If the last segment of splitlines is not newline terminated,
366 # then drop it from our output and start a new buffer.
367 self.buffer = [lines.pop()]
369 return lines
371 def flush(self) -> list[str]:
372 if not self.buffer and not self.trailing_cr:
373 return []
375 lines = ["".join(self.buffer)]
376 self.buffer = []
377 self.trailing_cr = False
378 return lines
381SUPPORTED_DECODERS = {
382 "identity": IdentityDecoder,
383 "gzip": GZipDecoder,
384 "deflate": DeflateDecoder,
385 "br": BrotliDecoder,
386 "zstd": ZStandardDecoder,
387}
390if brotli is None:
391 SUPPORTED_DECODERS.pop("br") # pragma: no cover
392if zstandard is None:
393 SUPPORTED_DECODERS.pop("zstd") # pragma: no cover