Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Handlers for Content-Encoding.
4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
5"""
7from __future__ import annotations
9import codecs
10import io
11import typing
12import zlib
14from ._compat import brotli, zstd
15from ._exceptions import DecodingError
18class ContentDecoder:
19 def decode(self, data: bytes) -> bytes:
20 raise NotImplementedError() # pragma: no cover
22 def flush(self) -> bytes:
23 raise NotImplementedError() # pragma: no cover
26class IdentityDecoder(ContentDecoder):
27 """
28 Handle unencoded data.
29 """
31 def decode(self, data: bytes) -> bytes:
32 return data
34 def flush(self) -> bytes:
35 return b""
38class DeflateDecoder(ContentDecoder):
39 """
40 Handle 'deflate' decoding.
42 See: https://stackoverflow.com/questions/1838699
43 """
45 def __init__(self) -> None:
46 self.first_attempt = True
47 self.decompressor = zlib.decompressobj()
49 def decode(self, data: bytes) -> bytes:
50 was_first_attempt = self.first_attempt
51 self.first_attempt = False
52 try:
53 return self.decompressor.decompress(data)
54 except zlib.error as exc:
55 if was_first_attempt:
56 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
57 return self.decode(data)
58 raise DecodingError(str(exc)) from exc
60 def flush(self) -> bytes:
61 try:
62 return self.decompressor.flush()
63 except zlib.error as exc: # pragma: no cover
64 raise DecodingError(str(exc)) from exc
67class GZipDecoder(ContentDecoder):
68 """
69 Handle 'gzip' decoding.
71 See: https://stackoverflow.com/questions/1838699
72 """
74 def __init__(self) -> None:
75 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
77 def decode(self, data: bytes) -> bytes:
78 try:
79 return self.decompressor.decompress(data)
80 except zlib.error as exc:
81 raise DecodingError(str(exc)) from exc
83 def flush(self) -> bytes:
84 try:
85 return self.decompressor.flush()
86 except zlib.error as exc: # pragma: no cover
87 raise DecodingError(str(exc)) from exc
90class BrotliDecoder(ContentDecoder):
91 """
92 Handle 'brotli' decoding.
94 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/
95 or `pip install brotli`. See https://github.com/google/brotli
96 Supports both 'brotlipy' and 'Brotli' packages since they share an import
97 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
98 """
100 def __init__(self) -> None:
101 if brotli is None: # pragma: no cover
102 raise ImportError(
103 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
104 "packages have been installed. "
105 "Make sure to install httpx using `pip install httpx[brotli]`."
106 ) from None
108 self.decompressor = brotli.Decompressor()
109 self.seen_data = False
110 self._decompress: typing.Callable[[bytes], bytes]
111 if hasattr(self.decompressor, "decompress"):
112 # The 'brotlicffi' package.
113 self._decompress = self.decompressor.decompress # pragma: no cover
114 else:
115 # The 'brotli' package.
116 self._decompress = self.decompressor.process # pragma: no cover
118 def decode(self, data: bytes) -> bytes:
119 if not data:
120 return b""
121 self.seen_data = True
122 try:
123 return self._decompress(data)
124 except brotli.error as exc:
125 raise DecodingError(str(exc)) from exc
127 def flush(self) -> bytes:
128 if not self.seen_data:
129 return b""
130 try:
131 if hasattr(self.decompressor, "finish"):
132 # Only available in the 'brotlicffi' package.
134 # As the decompressor decompresses eagerly, this
135 # will never actually emit any data. However, it will potentially throw
136 # errors if a truncated or damaged data stream has been used.
137 self.decompressor.finish() # pragma: no cover
138 return b""
139 except brotli.error as exc: # pragma: no cover
140 raise DecodingError(str(exc)) from exc
143class ZStandardDecoder(ContentDecoder):
144 """
145 Handle 'zstd' RFC 8878 decoding.
147 Requires `pip install zstandard`.
148 Can be installed as a dependency of httpx using `pip install httpx[zstd]`.
149 """
151 # inspired by the ZstdDecoder implementation in urllib3
152 def __init__(self) -> None:
153 if zstd is None: # pragma: no cover
154 raise ImportError(
155 "Using 'ZStandardDecoder', ..."
156 "Make sure to install httpx using `pip install httpx[zstd]`."
157 ) from None
159 self.decompressor = zstd.ZstdDecompressor().decompressobj()
161 def decode(self, data: bytes) -> bytes:
162 assert zstd is not None
163 output = io.BytesIO()
164 try:
165 output.write(self.decompressor.decompress(data))
166 while self.decompressor.eof and self.decompressor.unused_data:
167 unused_data = self.decompressor.unused_data
168 self.decompressor = zstd.ZstdDecompressor().decompressobj()
169 output.write(self.decompressor.decompress(unused_data))
170 except zstd.ZstdError as exc:
171 raise DecodingError(str(exc)) from exc
172 return output.getvalue()
174 def flush(self) -> bytes:
175 ret = self.decompressor.flush() # note: this is a no-op
176 if not self.decompressor.eof:
177 raise DecodingError("Zstandard data is incomplete") # pragma: no cover
178 return bytes(ret)
181class MultiDecoder(ContentDecoder):
182 """
183 Handle the case where multiple encodings have been applied.
184 """
186 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
187 """
188 'children' should be a sequence of decoders in the order in which
189 each was applied.
190 """
191 # Note that we reverse the order for decoding.
192 self.children = list(reversed(children))
194 def decode(self, data: bytes) -> bytes:
195 for child in self.children:
196 data = child.decode(data)
197 return data
199 def flush(self) -> bytes:
200 data = b""
201 for child in self.children:
202 data = child.decode(data) + child.flush()
203 return data
206class ByteChunker:
207 """
208 Handles returning byte content in fixed-size chunks.
209 """
211 def __init__(self, chunk_size: int | None = None) -> None:
212 self._buffer = io.BytesIO()
213 self._chunk_size = chunk_size
215 def decode(self, content: bytes) -> list[bytes]:
216 if self._chunk_size is None:
217 return [content] if content else []
219 self._buffer.write(content)
220 if self._buffer.tell() >= self._chunk_size:
221 value = self._buffer.getvalue()
222 chunks = [
223 value[i : i + self._chunk_size]
224 for i in range(0, len(value), self._chunk_size)
225 ]
226 if len(chunks[-1]) == self._chunk_size:
227 self._buffer.seek(0)
228 self._buffer.truncate()
229 return chunks
230 else:
231 self._buffer.seek(0)
232 self._buffer.write(chunks[-1])
233 self._buffer.truncate()
234 return chunks[:-1]
235 else:
236 return []
238 def flush(self) -> list[bytes]:
239 value = self._buffer.getvalue()
240 self._buffer.seek(0)
241 self._buffer.truncate()
242 return [value] if value else []
245class TextChunker:
246 """
247 Handles returning text content in fixed-size chunks.
248 """
250 def __init__(self, chunk_size: int | None = None) -> None:
251 self._buffer = io.StringIO()
252 self._chunk_size = chunk_size
254 def decode(self, content: str) -> list[str]:
255 if self._chunk_size is None:
256 return [content] if content else []
258 self._buffer.write(content)
259 if self._buffer.tell() >= self._chunk_size:
260 value = self._buffer.getvalue()
261 chunks = [
262 value[i : i + self._chunk_size]
263 for i in range(0, len(value), self._chunk_size)
264 ]
265 if len(chunks[-1]) == self._chunk_size:
266 self._buffer.seek(0)
267 self._buffer.truncate()
268 return chunks
269 else:
270 self._buffer.seek(0)
271 self._buffer.write(chunks[-1])
272 self._buffer.truncate()
273 return chunks[:-1]
274 else:
275 return []
277 def flush(self) -> list[str]:
278 value = self._buffer.getvalue()
279 self._buffer.seek(0)
280 self._buffer.truncate()
281 return [value] if value else []
284class TextDecoder:
285 """
286 Handles incrementally decoding bytes into text
287 """
289 def __init__(self, encoding: str = "utf-8") -> None:
290 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
292 def decode(self, data: bytes) -> str:
293 return self.decoder.decode(data)
295 def flush(self) -> str:
296 return self.decoder.decode(b"", True)
299class LineDecoder:
300 """
301 Handles incrementally reading lines from text.
303 Has the same behaviour as the stdllib splitlines,
304 but handling the input iteratively.
305 """
307 def __init__(self) -> None:
308 self.buffer: list[str] = []
309 self.trailing_cr: bool = False
311 def decode(self, text: str) -> list[str]:
312 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
313 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
315 # We always push a trailing `\r` into the next decode iteration.
316 if self.trailing_cr:
317 text = "\r" + text
318 self.trailing_cr = False
319 if text.endswith("\r"):
320 self.trailing_cr = True
321 text = text[:-1]
323 if not text:
324 # NOTE: the edge case input of empty text doesn't occur in practice,
325 # because other httpx internals filter out this value
326 return [] # pragma: no cover
328 trailing_newline = text[-1] in NEWLINE_CHARS
329 lines = text.splitlines()
331 if len(lines) == 1 and not trailing_newline:
332 # No new lines, buffer the input and continue.
333 self.buffer.append(lines[0])
334 return []
336 if self.buffer:
337 # Include any existing buffer in the first portion of the
338 # splitlines result.
339 lines = ["".join(self.buffer) + lines[0]] + lines[1:]
340 self.buffer = []
342 if not trailing_newline:
343 # If the last segment of splitlines is not newline terminated,
344 # then drop it from our output and start a new buffer.
345 self.buffer = [lines.pop()]
347 return lines
349 def flush(self) -> list[str]:
350 if not self.buffer and not self.trailing_cr:
351 return []
353 lines = ["".join(self.buffer)]
354 self.buffer = []
355 self.trailing_cr = False
356 return lines
359SUPPORTED_DECODERS = {
360 "identity": IdentityDecoder,
361 "gzip": GZipDecoder,
362 "deflate": DeflateDecoder,
363 "br": BrotliDecoder,
364 "zstd": ZStandardDecoder,
365}
368if brotli is None:
369 SUPPORTED_DECODERS.pop("br") # pragma: no cover
370if zstd is None:
371 SUPPORTED_DECODERS.pop("zstd") # pragma: no cover