Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 26%
178 statements
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
« prev ^ index » next coverage.py v7.2.2, created at 2023-03-26 06:12 +0000
1"""
2Handlers for Content-Encoding.
4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding
5"""
6import codecs
7import io
8import typing
9import zlib
11from ._compat import brotli
12from ._exceptions import DecodingError
15class ContentDecoder:
16 def decode(self, data: bytes) -> bytes:
17 raise NotImplementedError() # pragma: no cover
19 def flush(self) -> bytes:
20 raise NotImplementedError() # pragma: no cover
23class IdentityDecoder(ContentDecoder):
24 """
25 Handle unencoded data.
26 """
28 def decode(self, data: bytes) -> bytes:
29 return data
31 def flush(self) -> bytes:
32 return b""
35class DeflateDecoder(ContentDecoder):
36 """
37 Handle 'deflate' decoding.
39 See: https://stackoverflow.com/questions/1838699
40 """
42 def __init__(self) -> None:
43 self.first_attempt = True
44 self.decompressor = zlib.decompressobj()
46 def decode(self, data: bytes) -> bytes:
47 was_first_attempt = self.first_attempt
48 self.first_attempt = False
49 try:
50 return self.decompressor.decompress(data)
51 except zlib.error as exc:
52 if was_first_attempt:
53 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS)
54 return self.decode(data)
55 raise DecodingError(str(exc)) from exc
57 def flush(self) -> bytes:
58 try:
59 return self.decompressor.flush()
60 except zlib.error as exc: # pragma: no cover
61 raise DecodingError(str(exc)) from exc
64class GZipDecoder(ContentDecoder):
65 """
66 Handle 'gzip' decoding.
68 See: https://stackoverflow.com/questions/1838699
69 """
71 def __init__(self) -> None:
72 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16)
74 def decode(self, data: bytes) -> bytes:
75 try:
76 return self.decompressor.decompress(data)
77 except zlib.error as exc:
78 raise DecodingError(str(exc)) from exc
80 def flush(self) -> bytes:
81 try:
82 return self.decompressor.flush()
83 except zlib.error as exc: # pragma: no cover
84 raise DecodingError(str(exc)) from exc
87class BrotliDecoder(ContentDecoder):
88 """
89 Handle 'brotli' decoding.
91 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/
92 or `pip install brotli`. See https://github.com/google/brotli
93 Supports both 'brotlipy' and 'Brotli' packages since they share an import
94 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli'
95 """
97 def __init__(self) -> None:
98 if brotli is None: # pragma: no cover
99 raise ImportError(
100 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' "
101 "packages have been installed. "
102 "Make sure to install httpx using `pip install httpx[brotli]`."
103 ) from None
105 self.decompressor = brotli.Decompressor()
106 self.seen_data = False
107 self._decompress: typing.Callable[[bytes], bytes]
108 if hasattr(self.decompressor, "decompress"):
109 # The 'brotlicffi' package.
110 self._decompress = self.decompressor.decompress # pragma: no cover
111 else:
112 # The 'brotli' package.
113 self._decompress = self.decompressor.process # pragma: no cover
115 def decode(self, data: bytes) -> bytes:
116 if not data:
117 return b""
118 self.seen_data = True
119 try:
120 return self._decompress(data)
121 except brotli.error as exc:
122 raise DecodingError(str(exc)) from exc
124 def flush(self) -> bytes:
125 if not self.seen_data:
126 return b""
127 try:
128 if hasattr(self.decompressor, "finish"):
129 # Only available in the 'brotlicffi' package.
131 # As the decompressor decompresses eagerly, this
132 # will never actually emit any data. However, it will potentially throw
133 # errors if a truncated or damaged data stream has been used.
134 self.decompressor.finish() # pragma: no cover
135 return b""
136 except brotli.error as exc: # pragma: no cover
137 raise DecodingError(str(exc)) from exc
140class MultiDecoder(ContentDecoder):
141 """
142 Handle the case where multiple encodings have been applied.
143 """
145 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None:
146 """
147 'children' should be a sequence of decoders in the order in which
148 each was applied.
149 """
150 # Note that we reverse the order for decoding.
151 self.children = list(reversed(children))
153 def decode(self, data: bytes) -> bytes:
154 for child in self.children:
155 data = child.decode(data)
156 return data
158 def flush(self) -> bytes:
159 data = b""
160 for child in self.children:
161 data = child.decode(data) + child.flush()
162 return data
165class ByteChunker:
166 """
167 Handles returning byte content in fixed-size chunks.
168 """
170 def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
171 self._buffer = io.BytesIO()
172 self._chunk_size = chunk_size
174 def decode(self, content: bytes) -> typing.List[bytes]:
175 if self._chunk_size is None:
176 return [content] if content else []
178 self._buffer.write(content)
179 if self._buffer.tell() >= self._chunk_size:
180 value = self._buffer.getvalue()
181 chunks = [
182 value[i : i + self._chunk_size]
183 for i in range(0, len(value), self._chunk_size)
184 ]
185 if len(chunks[-1]) == self._chunk_size:
186 self._buffer.seek(0)
187 self._buffer.truncate()
188 return chunks
189 else:
190 self._buffer.seek(0)
191 self._buffer.write(chunks[-1])
192 self._buffer.truncate()
193 return chunks[:-1]
194 else:
195 return []
197 def flush(self) -> typing.List[bytes]:
198 value = self._buffer.getvalue()
199 self._buffer.seek(0)
200 self._buffer.truncate()
201 return [value] if value else []
204class TextChunker:
205 """
206 Handles returning text content in fixed-size chunks.
207 """
209 def __init__(self, chunk_size: typing.Optional[int] = None) -> None:
210 self._buffer = io.StringIO()
211 self._chunk_size = chunk_size
213 def decode(self, content: str) -> typing.List[str]:
214 if self._chunk_size is None:
215 return [content]
217 self._buffer.write(content)
218 if self._buffer.tell() >= self._chunk_size:
219 value = self._buffer.getvalue()
220 chunks = [
221 value[i : i + self._chunk_size]
222 for i in range(0, len(value), self._chunk_size)
223 ]
224 if len(chunks[-1]) == self._chunk_size:
225 self._buffer.seek(0)
226 self._buffer.truncate()
227 return chunks
228 else:
229 self._buffer.seek(0)
230 self._buffer.write(chunks[-1])
231 self._buffer.truncate()
232 return chunks[:-1]
233 else:
234 return []
236 def flush(self) -> typing.List[str]:
237 value = self._buffer.getvalue()
238 self._buffer.seek(0)
239 self._buffer.truncate()
240 return [value] if value else []
243class TextDecoder:
244 """
245 Handles incrementally decoding bytes into text
246 """
248 def __init__(self, encoding: str = "utf-8"):
249 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace")
251 def decode(self, data: bytes) -> str:
252 return self.decoder.decode(data)
254 def flush(self) -> str:
255 return self.decoder.decode(b"", True)
258class LineDecoder:
259 """
260 Handles incrementally reading lines from text.
262 Uses universal line decoding, supporting any of `\n`, `\r`, or `\r\n`
263 as line endings, normalizing to `\n`.
264 """
266 def __init__(self) -> None:
267 self.buffer = ""
269 def decode(self, text: str) -> typing.List[str]:
270 lines = []
272 if text and self.buffer and self.buffer[-1] == "\r":
273 if text.startswith("\n"):
274 # Handle the case where we have an "\r\n" split across
275 # our previous input, and our new chunk.
276 lines.append(self.buffer[:-1] + "\n")
277 self.buffer = ""
278 text = text[1:]
279 else:
280 # Handle the case where we have "\r" at the end of our
281 # previous input.
282 lines.append(self.buffer[:-1] + "\n")
283 self.buffer = ""
285 while text:
286 num_chars = len(text)
287 for idx in range(num_chars):
288 char = text[idx]
289 next_char = None if idx + 1 == num_chars else text[idx + 1]
290 if char == "\n":
291 lines.append(self.buffer + text[: idx + 1])
292 self.buffer = ""
293 text = text[idx + 1 :]
294 break
295 elif char == "\r" and next_char == "\n":
296 lines.append(self.buffer + text[:idx] + "\n")
297 self.buffer = ""
298 text = text[idx + 2 :]
299 break
300 elif char == "\r" and next_char is not None:
301 lines.append(self.buffer + text[:idx] + "\n")
302 self.buffer = ""
303 text = text[idx + 1 :]
304 break
305 elif next_char is None:
306 self.buffer += text
307 text = ""
308 break
310 return lines
312 def flush(self) -> typing.List[str]:
313 if self.buffer.endswith("\r"):
314 # Handle the case where we had a trailing '\r', which could have
315 # been a '\r\n' pair.
316 lines = [self.buffer[:-1] + "\n"]
317 elif self.buffer:
318 lines = [self.buffer]
319 else:
320 lines = []
321 self.buffer = ""
322 return lines
325SUPPORTED_DECODERS = {
326 "identity": IdentityDecoder,
327 "gzip": GZipDecoder,
328 "deflate": DeflateDecoder,
329 "br": BrotliDecoder,
330}
333if brotli is None:
334 SUPPORTED_DECODERS.pop("br") # pragma: no cover