Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 59%
165 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 07:19 +0000
1"""
2Handlers for Content-Encoding.
4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
5"""
6import codecs
7import io
8import typing
9import zlib
11from ._compat import brotli
12from ._exceptions import DecodingError
15class ContentDecoder:
16 def decode(self, data: bytes) -> bytes:
17 raise NotImplementedError() # pragma: no cover
19 def flush(self) -> bytes:
20 raise NotImplementedError() # pragma: no cover
23class IdentityDecoder(ContentDecoder):
24 """
25 Handle unencoded data.
26 """
28 def decode(self, data: bytes) -> bytes:
29 return data
31 def flush(self) -> bytes:
32 return b""
35class DeflateDecoder(ContentDecoder):
36 """
37 Handle 'deflate' decoding.
39 See: https://stackoverflow.com/questions/1838699
40 """
42 def __init__(self) -> None:
43 self.first_attempt = True
44 self.decompressor = zlib.decompressobj()
46 def decode(self, data: bytes) -> bytes:
47 was_first_attempt = self.first_attempt
48 self.first_attempt = False
49 try:
50 return self.decompressor.decompress(data)
51 except zlib.error as exc:
52 if was_first_attempt:
53 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
54 return self.decode(data)
55 raise DecodingError(str(exc)) from exc
57 def flush(self) -> bytes:
58 try:
59 return self.decompressor.flush()
60 except zlib.error as exc: # pragma: no cover
61 raise DecodingError(str(exc)) from exc
64class GZipDecoder(ContentDecoder):
65 """
66 Handle 'gzip' decoding.
68 See: https://stackoverflow.com/questions/1838699
69 """
71 def __init__(self) -> None:
72 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
74 def decode(self, data: bytes) -> bytes:
75 try:
76 return self.decompressor.decompress(data)
77 except zlib.error as exc:
78 raise DecodingError(str(exc)) from exc
80 def flush(self) -> bytes:
81 try:
82 return self.decompressor.flush()
83 except zlib.error as exc: # pragma: no cover
84 raise DecodingError(str(exc)) from exc
87class BrotliDecoder(ContentDecoder):
88 """
89 Handle 'brotli' decoding.
91 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/
92 or `pip install brotli`. See https://github.com/google/brotli
93 Supports both 'brotlipy' and 'Brotli' packages since they share an import
94 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
95 """
97 def __init__(self) -> None:
98 if brotli is None: # pragma: no cover
99 raise ImportError(
100 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
101 "packages have been installed. "
102 "Make sure to install httpx using `pip install httpx[brotli]`."
103 ) from None
105 self.decompressor = brotli.Decompressor()
106 self.seen_data = False
107 self._decompress: typing.Callable[[bytes], bytes]
108 if hasattr(self.decompressor, "decompress"):
109 # The 'brotlicffi' package.
110 self._decompress = self.decompressor.decompress # pragma: no cover
111 else:
112 # The 'brotli' package.
113 self._decompress = self.decompressor.process # pragma: no cover
115 def decode(self, data: bytes) -> bytes:
116 if not data:
117 return b""
118 self.seen_data = True
119 try:
120 return self._decompress(data)
121 except brotli.error as exc:
122 raise DecodingError(str(exc)) from exc
124 def flush(self) -> bytes:
125 if not self.seen_data:
126 return b""
127 try:
128 if hasattr(self.decompressor, "finish"):
129 # Only available in the 'brotlicffi' package.
131 # As the decompressor decompresses eagerly, this
132 # will never actually emit any data. However, it will potentially throw
133 # errors if a truncated or damaged data stream has been used.
134 self.decompressor.finish() # pragma: no cover
135 return b""
136 except brotli.error as exc: # pragma: no cover
137 raise DecodingError(str(exc)) from exc
140class MultiDecoder(ContentDecoder):
141 """
142 Handle the case where multiple encodings have been applied.
143 """
145 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
146 """
147 'children' should be a sequence of decoders in the order in which
148 each was applied.
149 """
150 # Note that we reverse the order for decoding.
151 self.children = list(reversed(children))
153 def decode(self, data: bytes) -> bytes:
154 for child in self.children:
155 data = child.decode(data)
156 return data
158 def flush(self) -> bytes:
159 data = b""
160 for child in self.children:
161 data = child.decode(data) + child.flush()
162 return data
165class ByteChunker:
166 """
167 Handles returning byte content in fixed-size chunks.
168 """
170 def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
171 self._buffer = io.BytesIO()
172 self._chunk_size = chunk_size
174 def decode(self, content: bytes) -> typing.List[bytes]:
175 if self._chunk_size is None:
176 return [content] if content else []
178 self._buffer.write(content)
179 if self._buffer.tell() >= self._chunk_size:
180 value = self._buffer.getvalue()
181 chunks = [
182 value[i : i + self._chunk_size]
183 for i in range(0, len(value), self._chunk_size)
184 ]
185 if len(chunks[-1]) == self._chunk_size:
186 self._buffer.seek(0)
187 self._buffer.truncate()
188 return chunks
189 else:
190 self._buffer.seek(0)
191 self._buffer.write(chunks[-1])
192 self._buffer.truncate()
193 return chunks[:-1]
194 else:
195 return []
197 def flush(self) -> typing.List[bytes]:
198 value = self._buffer.getvalue()
199 self._buffer.seek(0)
200 self._buffer.truncate()
201 return [value] if value else []
204class TextChunker:
205 """
206 Handles returning text content in fixed-size chunks.
207 """
209 def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
210 self._buffer = io.StringIO()
211 self._chunk_size = chunk_size
213 def decode(self, content: str) -> typing.List[str]:
214 if self._chunk_size is None:
215 return [content]
217 self._buffer.write(content)
218 if self._buffer.tell() >= self._chunk_size:
219 value = self._buffer.getvalue()
220 chunks = [
221 value[i : i + self._chunk_size]
222 for i in range(0, len(value), self._chunk_size)
223 ]
224 if len(chunks[-1]) == self._chunk_size:
225 self._buffer.seek(0)
226 self._buffer.truncate()
227 return chunks
228 else:
229 self._buffer.seek(0)
230 self._buffer.write(chunks[-1])
231 self._buffer.truncate()
232 return chunks[:-1]
233 else:
234 return []
236 def flush(self) -> typing.List[str]:
237 value = self._buffer.getvalue()
238 self._buffer.seek(0)
239 self._buffer.truncate()
240 return [value] if value else []
243class TextDecoder:
244 """
245 Handles incrementally decoding bytes into text
246 """
248 def __init__(self, encoding: str = "utf-8"):
249 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
251 def decode(self, data: bytes) -> str:
252 return self.decoder.decode(data)
254 def flush(self) -> str:
255 return self.decoder.decode(b"", True)
258class LineDecoder:
259 """
260 Handles incrementally reading lines from text.
262 Has the same behaviour as the stdllib splitlines, but handling the input iteratively.
263 """
265 def __init__(self) -> None:
266 self.buffer: typing.List[str] = []
267 self.trailing_cr: bool = False
269 def decode(self, text: str) -> typing.List[str]:
270 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines
271 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029"
273 # We always push a trailing `\r` into the next decode iteration.
274 if self.trailing_cr:
275 text = "\r" + text
276 self.trailing_cr = False
277 if text.endswith("\r"):
278 self.trailing_cr = True
279 text = text[:-1]
281 if not text:
282 return []
284 trailing_newline = text[-1] in NEWLINE_CHARS
285 lines = text.splitlines()
287 if len(lines) == 1 and not trailing_newline:
288 # No new lines, buffer the input and continue.
289 self.buffer.append(lines[0])
290 return []
292 if self.buffer:
293 # Include any existing buffer in the first portion of the
294 # splitlines result.
295 lines = ["".join(self.buffer) + lines[0]] + lines[1:]
296 self.buffer = []
298 if not trailing_newline:
299 # If the last segment of splitlines is not newline terminated,
300 # then drop it from our output and start a new buffer.
301 self.buffer = [lines.pop()]
303 return lines
305 def flush(self) -> typing.List[str]:
306 if not self.buffer and not self.trailing_cr:
307 return []
309 lines = ["".join(self.buffer)]
310 self.buffer = []
311 self.trailing_cr = False
312 return lines
315SUPPORTED_DECODERS = {
316 "identity": IdentityDecoder,
317 "gzip": GZipDecoder,
318 "deflate": DeflateDecoder,
319 "br": BrotliDecoder,
320}
323if brotli is None:
324 SUPPORTED_DECODERS.pop("br") # pragma: no cover