Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 28%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1"""
2Handlers for Content-Encoding.
4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
5"""
6from __future__ import annotations
8import codecs
9import io
10import typing
11import zlib
13from ._compat import brotli
14from ._exceptions import DecodingError
17class ContentDecoder:
18 def decode(self, data: bytes) -> bytes:
19 raise NotImplementedError() # pragma: no cover
21 def flush(self) -> bytes:
22 raise NotImplementedError() # pragma: no cover
25class IdentityDecoder(ContentDecoder):
26 """
27 Handle unencoded data.
28 """
30 def decode(self, data: bytes) -> bytes:
31 return data
33 def flush(self) -> bytes:
34 return b""
37class DeflateDecoder(ContentDecoder):
38 """
39 Handle 'deflate' decoding.
41 See: https://stackoverflow.com/questions/1838699
42 """
44 def __init__(self) -> None:
45 self.first_attempt = True
46 self.decompressor = zlib.decompressobj()
48 def decode(self, data: bytes) -> bytes:
49 was_first_attempt = self.first_attempt
50 self.first_attempt = False
51 try:
52 return self.decompressor.decompress(data)
53 except zlib.error as exc:
54 if was_first_attempt:
55 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
56 return self.decode(data)
57 raise DecodingError(str(exc)) from exc
59 def flush(self) -> bytes:
60 try:
61 return self.decompressor.flush()
62 except zlib.error as exc: # pragma: no cover
63 raise DecodingError(str(exc)) from exc
66class GZipDecoder(ContentDecoder):
67 """
68 Handle 'gzip' decoding.
70 See: https://stackoverflow.com/questions/1838699
71 """
73 def __init__(self) -> None:
74 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
76 def decode(self, data: bytes) -> bytes:
77 try:
78 return self.decompressor.decompress(data)
79 except zlib.error as exc:
80 raise DecodingError(str(exc)) from exc
82 def flush(self) -> bytes:
83 try:
84 return self.decompressor.flush()
85 except zlib.error as exc: # pragma: no cover
86 raise DecodingError(str(exc)) from exc
89class BrotliDecoder(ContentDecoder):
90 """
91 Handle 'brotli' decoding.
93 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/
94 or `pip install brotli`. See https://github.com/google/brotli
95 Supports both 'brotlipy' and 'Brotli' packages since they share an import
96 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
97 """
99 def __init__(self) -> None:
100 if brotli is None: # pragma: no cover
101 raise ImportError(
102 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
103 "packages have been installed. "
104 "Make sure to install httpx using `pip install httpx[brotli]`."
105 ) from None
107 self.decompressor = brotli.Decompressor()
108 self.seen_data = False
109 self._decompress: typing.Callable[[bytes], bytes]
110 if hasattr(self.decompressor, "decompress"):
111 # The 'brotlicffi' package.
112 self._decompress = self.decompressor.decompress # pragma: no cover
113 else:
114 # The 'brotli' package.
115 self._decompress = self.decompressor.process # pragma: no cover
117 def decode(self, data: bytes) -> bytes:
118 if not data:
119 return b""
120 self.seen_data = True
121 try:
122 return self._decompress(data)
123 except brotli.error as exc:
124 raise DecodingError(str(exc)) from exc
126 def flush(self) -> bytes:
127 if not self.seen_data:
128 return b""
129 try:
130 if hasattr(self.decompressor, "finish"):
131 # Only available in the 'brotlicffi' package.
133 # As the decompressor decompresses eagerly, this
134 # will never actually emit any data. However, it will potentially throw
135 # errors if a truncated or damaged data stream has been used.
136 self.decompressor.finish() # pragma: no cover
137 return b""
138 except brotli.error as exc: # pragma: no cover
139 raise DecodingError(str(exc)) from exc
142class MultiDecoder(ContentDecoder):
143 """
144 Handle the case where multiple encodings have been applied.
145 """
147 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
148 """
149 'children' should be a sequence of decoders in the order in which
150 each was applied.
151 """
152 # Note that we reverse the order for decoding.
153 self.children = list(reversed(children))
155 def decode(self, data: bytes) -> bytes:
156 for child in self.children:
157 data = child.decode(data)
158 return data
160 def flush(self) -> bytes:
161 data = b""
162 for child in self.children:
163 data = child.decode(data) + child.flush()
164 return data
167class ByteChunker:
168 """
169 Handles returning byte content in fixed-size chunks.
170 """
172 def __init__(self, chunk_size: int | None = None) -> None:
173 self._buffer = io.BytesIO()
174 self._chunk_size = chunk_size
176 def decode(self, content: bytes) -> list[bytes]:
177 if self._chunk_size is None:
178 return [content] if content else []
180 self._buffer.write(content)
181 if self._buffer.tell() >= self._chunk_size:
182 value = self._buffer.getvalue()
183 chunks = [
184 value[i : i + self._chunk_size]
185 for i in range(0, len(value), self._chunk_size)
186 ]
187 if len(chunks[-1]) == self._chunk_size:
188 self._buffer.seek(0)
189 self._buffer.truncate()
190 return chunks
191 else:
192 self._buffer.seek(0)
193 self._buffer.write(chunks[-1])
194 self._buffer.truncate()
195 return chunks[:-1]
196 else:
197 return []
199 def flush(self) -> list[bytes]:
200 value = self._buffer.getvalue()
201 self._buffer.seek(0)
202 self._buffer.truncate()
203 return [value] if value else []
206class TextChunker:
207 """
208 Handles returning text content in fixed-size chunks.
209 """
211 def __init__(self, chunk_size: int | None = None) -> None:
212 self._buffer = io.StringIO()
213 self._chunk_size = chunk_size
215 def decode(self, content: str) -> list[str]:
216 if self._chunk_size is None:
217 return [content] if content else []
219 self._buffer.write(content)
220 if self._buffer.tell() >= self._chunk_size:
221 value = self._buffer.getvalue()
222 chunks = [
223 value[i : i + self._chunk_size]
224 for i in range(0, len(value), self._chunk_size)
225 ]
226 if len(chunks[-1]) == self._chunk_size:
227 self._buffer.seek(0)
228 self._buffer.truncate()
229 return chunks
230 else:
231 self._buffer.seek(0)
232 self._buffer.write(chunks[-1])
233 self._buffer.truncate()
234 return chunks[:-1]
235 else:
236 return []
238 def flush(self) -> list[str]:
239 value = self._buffer.getvalue()
240 self._buffer.seek(0)
241 self._buffer.truncate()
242 return [value] if value else []
245class TextDecoder:
246 """
247 Handles incrementally decoding bytes into text
248 """
250 def __init__(self, encoding: str = "utf-8") -> None:
251 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
253 def decode(self, data: bytes) -> str:
254 return self.decoder.decode(data)
256 def flush(self) -> str:
257 return self.decoder.decode(b"", True)
260class LineDecoder:
261 """
262 Handles incrementally reading lines from text.
264 Has the same behaviour as the stdllib splitlines,
265 but handling the input iteratively.
266 """
268 def __init__(self) -> None:
269 self.buffer: list[str] = []
270 self.trailing_cr: bool = False
272 def decode(self, text: str) -> list[str]:
273 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
274 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
276 # We always push a trailing `\r` into the next decode iteration.
277 if self.trailing_cr:
278 text = "\r" + text
279 self.trailing_cr = False
280 if text.endswith("\r"):
281 self.trailing_cr = True
282 text = text[:-1]
284 if not text:
285 # NOTE: the edge case input of empty text doesn't occur in practice,
286 # because other httpx internals filter out this value
287 return [] # pragma: no cover
289 trailing_newline = text[-1] in NEWLINE_CHARS
290 lines = text.splitlines()
292 if len(lines) == 1 and not trailing_newline:
293 # No new lines, buffer the input and continue.
294 self.buffer.append(lines[0])
295 return []
297 if self.buffer:
298 # Include any existing buffer in the first portion of the
299 # splitlines result.
300 lines = ["".join(self.buffer) + lines[0]] + lines[1:]
301 self.buffer = []
303 if not trailing_newline:
304 # If the last segment of splitlines is not newline terminated,
305 # then drop it from our output and start a new buffer.
306 self.buffer = [lines.pop()]
308 return lines
310 def flush(self) -> list[str]:
311 if not self.buffer and not self.trailing_cr:
312 return []
314 lines = ["".join(self.buffer)]
315 self.buffer = []
316 self.trailing_cr = False
317 return lines
320SUPPORTED_DECODERS = {
321 "identity": IdentityDecoder,
322 "gzip": GZipDecoder,
323 "deflate": DeflateDecoder,
324 "br": BrotliDecoder,
325}
328if brotli is None:
329 SUPPORTED_DECODERS.pop("br") # pragma: no cover