Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/httpx/_decoders.py: 29%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

193 statements  

1""" 

2Handlers for Content-Encoding. 

3 

4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding 

5""" 

6 

7from __future__ import annotations 

8 

9import codecs 

10import io 

11import typing 

12import zlib 

13 

14from ._exceptions import DecodingError 

15 

16# Brotli support is optional 

17try: 

18 # The C bindings in `brotli` are recommended for CPython. 

19 import brotli 

20except ImportError: # pragma: no cover 

21 try: 

22 # The CFFI bindings in `brotlicffi` are recommended for PyPy 

23 # and other environments. 

24 import brotlicffi as brotli 

25 except ImportError: 

26 brotli = None 

27 

28 

29# Zstandard support is optional 

30try: 

31 import zstandard 

32except ImportError: # pragma: no cover 

33 zstandard = None # type: ignore 

34 

35 

36class ContentDecoder: 

37 def decode(self, data: bytes) -> bytes: 

38 raise NotImplementedError() # pragma: no cover 

39 

40 def flush(self) -> bytes: 

41 raise NotImplementedError() # pragma: no cover 

42 

43 

44class IdentityDecoder(ContentDecoder): 

45 """ 

46 Handle unencoded data. 

47 """ 

48 

49 def decode(self, data: bytes) -> bytes: 

50 return data 

51 

52 def flush(self) -> bytes: 

53 return b"" 

54 

55 

56class DeflateDecoder(ContentDecoder): 

57 """ 

58 Handle 'deflate' decoding. 

59 

60 See: https://stackoverflow.com/questions/1838699 

61 """ 

62 

63 def __init__(self) -> None: 

64 self.first_attempt = True 

65 self.decompressor = zlib.decompressobj() 

66 

67 def decode(self, data: bytes) -> bytes: 

68 was_first_attempt = self.first_attempt 

69 self.first_attempt = False 

70 try: 

71 return self.decompressor.decompress(data) 

72 except zlib.error as exc: 

73 if was_first_attempt: 

74 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) 

75 return self.decode(data) 

76 raise DecodingError(str(exc)) from exc 

77 

78 def flush(self) -> bytes: 

79 try: 

80 return self.decompressor.flush() 

81 except zlib.error as exc: # pragma: no cover 

82 raise DecodingError(str(exc)) from exc 

83 

84 

85class GZipDecoder(ContentDecoder): 

86 """ 

87 Handle 'gzip' decoding. 

88 

89 See: https://stackoverflow.com/questions/1838699 

90 """ 

91 

92 def __init__(self) -> None: 

93 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) 

94 

95 def decode(self, data: bytes) -> bytes: 

96 try: 

97 return self.decompressor.decompress(data) 

98 except zlib.error as exc: 

99 raise DecodingError(str(exc)) from exc 

100 

101 def flush(self) -> bytes: 

102 try: 

103 return self.decompressor.flush() 

104 except zlib.error as exc: # pragma: no cover 

105 raise DecodingError(str(exc)) from exc 

106 

107 

108class BrotliDecoder(ContentDecoder): 

109 """ 

110 Handle 'brotli' decoding. 

111 

112 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/ 

113 or `pip install brotli`. See https://github.com/google/brotli 

114 Supports both 'brotlipy' and 'Brotli' packages since they share an import 

115 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli' 

116 """ 

117 

118 def __init__(self) -> None: 

119 if brotli is None: # pragma: no cover 

120 raise ImportError( 

121 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' " 

122 "packages have been installed. " 

123 "Make sure to install httpx using `pip install httpx[brotli]`." 

124 ) from None 

125 

126 self.decompressor = brotli.Decompressor() 

127 self.seen_data = False 

128 self._decompress: typing.Callable[[bytes], bytes] 

129 if hasattr(self.decompressor, "decompress"): 

130 # The 'brotlicffi' package. 

131 self._decompress = self.decompressor.decompress # pragma: no cover 

132 else: 

133 # The 'brotli' package. 

134 self._decompress = self.decompressor.process # pragma: no cover 

135 

136 def decode(self, data: bytes) -> bytes: 

137 if not data: 

138 return b"" 

139 self.seen_data = True 

140 try: 

141 return self._decompress(data) 

142 except brotli.error as exc: 

143 raise DecodingError(str(exc)) from exc 

144 

145 def flush(self) -> bytes: 

146 if not self.seen_data: 

147 return b"" 

148 try: 

149 if hasattr(self.decompressor, "finish"): 

150 # Only available in the 'brotlicffi' package. 

151 

152 # As the decompressor decompresses eagerly, this 

153 # will never actually emit any data. However, it will potentially throw 

154 # errors if a truncated or damaged data stream has been used. 

155 self.decompressor.finish() # pragma: no cover 

156 return b"" 

157 except brotli.error as exc: # pragma: no cover 

158 raise DecodingError(str(exc)) from exc 

159 

160 

161class ZStandardDecoder(ContentDecoder): 

162 """ 

163 Handle 'zstd' RFC 8878 decoding. 

164 

165 Requires `pip install zstandard`. 

166 Can be installed as a dependency of httpx using `pip install httpx[zstd]`. 

167 """ 

168 

169 # inspired by the ZstdDecoder implementation in urllib3 

170 def __init__(self) -> None: 

171 if zstandard is None: # pragma: no cover 

172 raise ImportError( 

173 "Using 'ZStandardDecoder', ..." 

174 "Make sure to install httpx using `pip install httpx[zstd]`." 

175 ) from None 

176 

177 self.decompressor = zstandard.ZstdDecompressor().decompressobj() 

178 self.seen_data = False 

179 

180 def decode(self, data: bytes) -> bytes: 

181 assert zstandard is not None 

182 self.seen_data = True 

183 output = io.BytesIO() 

184 try: 

185 output.write(self.decompressor.decompress(data)) 

186 while self.decompressor.eof and self.decompressor.unused_data: 

187 unused_data = self.decompressor.unused_data 

188 self.decompressor = zstandard.ZstdDecompressor().decompressobj() 

189 output.write(self.decompressor.decompress(unused_data)) 

190 except zstandard.ZstdError as exc: 

191 raise DecodingError(str(exc)) from exc 

192 return output.getvalue() 

193 

194 def flush(self) -> bytes: 

195 if not self.seen_data: 

196 return b"" 

197 ret = self.decompressor.flush() # note: this is a no-op 

198 if not self.decompressor.eof: 

199 raise DecodingError("Zstandard data is incomplete") # pragma: no cover 

200 return bytes(ret) 

201 

202 

203class MultiDecoder(ContentDecoder): 

204 """ 

205 Handle the case where multiple encodings have been applied. 

206 """ 

207 

208 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None: 

209 """ 

210 'children' should be a sequence of decoders in the order in which 

211 each was applied. 

212 """ 

213 # Note that we reverse the order for decoding. 

214 self.children = list(reversed(children)) 

215 

216 def decode(self, data: bytes) -> bytes: 

217 for child in self.children: 

218 data = child.decode(data) 

219 return data 

220 

221 def flush(self) -> bytes: 

222 data = b"" 

223 for child in self.children: 

224 data = child.decode(data) + child.flush() 

225 return data 

226 

227 

228class ByteChunker: 

229 """ 

230 Handles returning byte content in fixed-size chunks. 

231 """ 

232 

233 def __init__(self, chunk_size: int | None = None) -> None: 

234 self._buffer = io.BytesIO() 

235 self._chunk_size = chunk_size 

236 

237 def decode(self, content: bytes) -> list[bytes]: 

238 if self._chunk_size is None: 

239 return [content] if content else [] 

240 

241 self._buffer.write(content) 

242 if self._buffer.tell() >= self._chunk_size: 

243 value = self._buffer.getvalue() 

244 chunks = [ 

245 value[i : i + self._chunk_size] 

246 for i in range(0, len(value), self._chunk_size) 

247 ] 

248 if len(chunks[-1]) == self._chunk_size: 

249 self._buffer.seek(0) 

250 self._buffer.truncate() 

251 return chunks 

252 else: 

253 self._buffer.seek(0) 

254 self._buffer.write(chunks[-1]) 

255 self._buffer.truncate() 

256 return chunks[:-1] 

257 else: 

258 return [] 

259 

260 def flush(self) -> list[bytes]: 

261 value = self._buffer.getvalue() 

262 self._buffer.seek(0) 

263 self._buffer.truncate() 

264 return [value] if value else [] 

265 

266 

267class TextChunker: 

268 """ 

269 Handles returning text content in fixed-size chunks. 

270 """ 

271 

272 def __init__(self, chunk_size: int | None = None) -> None: 

273 self._buffer = io.StringIO() 

274 self._chunk_size = chunk_size 

275 

276 def decode(self, content: str) -> list[str]: 

277 if self._chunk_size is None: 

278 return [content] if content else [] 

279 

280 self._buffer.write(content) 

281 if self._buffer.tell() >= self._chunk_size: 

282 value = self._buffer.getvalue() 

283 chunks = [ 

284 value[i : i + self._chunk_size] 

285 for i in range(0, len(value), self._chunk_size) 

286 ] 

287 if len(chunks[-1]) == self._chunk_size: 

288 self._buffer.seek(0) 

289 self._buffer.truncate() 

290 return chunks 

291 else: 

292 self._buffer.seek(0) 

293 self._buffer.write(chunks[-1]) 

294 self._buffer.truncate() 

295 return chunks[:-1] 

296 else: 

297 return [] 

298 

299 def flush(self) -> list[str]: 

300 value = self._buffer.getvalue() 

301 self._buffer.seek(0) 

302 self._buffer.truncate() 

303 return [value] if value else [] 

304 

305 

306class TextDecoder: 

307 """ 

308 Handles incrementally decoding bytes into text 

309 """ 

310 

311 def __init__(self, encoding: str = "utf-8") -> None: 

312 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace") 

313 

314 def decode(self, data: bytes) -> str: 

315 return self.decoder.decode(data) 

316 

317 def flush(self) -> str: 

318 return self.decoder.decode(b"", True) 

319 

320 

321class LineDecoder: 

322 """ 

323 Handles incrementally reading lines from text. 

324 

325 Has the same behaviour as the stdllib splitlines, 

326 but handling the input iteratively. 

327 """ 

328 

329 def __init__(self) -> None: 

330 self.buffer: list[str] = [] 

331 self.trailing_cr: bool = False 

332 

333 def decode(self, text: str) -> list[str]: 

334 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines 

335 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029" 

336 

337 # We always push a trailing `\r` into the next decode iteration. 

338 if self.trailing_cr: 

339 text = "\r" + text 

340 self.trailing_cr = False 

341 if text.endswith("\r"): 

342 self.trailing_cr = True 

343 text = text[:-1] 

344 

345 if not text: 

346 # NOTE: the edge case input of empty text doesn't occur in practice, 

347 # because other httpx internals filter out this value 

348 return [] # pragma: no cover 

349 

350 trailing_newline = text[-1] in NEWLINE_CHARS 

351 lines = text.splitlines() 

352 

353 if len(lines) == 1 and not trailing_newline: 

354 # No new lines, buffer the input and continue. 

355 self.buffer.append(lines[0]) 

356 return [] 

357 

358 if self.buffer: 

359 # Include any existing buffer in the first portion of the 

360 # splitlines result. 

361 lines = ["".join(self.buffer) + lines[0]] + lines[1:] 

362 self.buffer = [] 

363 

364 if not trailing_newline: 

365 # If the last segment of splitlines is not newline terminated, 

366 # then drop it from our output and start a new buffer. 

367 self.buffer = [lines.pop()] 

368 

369 return lines 

370 

371 def flush(self) -> list[str]: 

372 if not self.buffer and not self.trailing_cr: 

373 return [] 

374 

375 lines = ["".join(self.buffer)] 

376 self.buffer = [] 

377 self.trailing_cr = False 

378 return lines 

379 

380 

381SUPPORTED_DECODERS = { 

382 "identity": IdentityDecoder, 

383 "gzip": GZipDecoder, 

384 "deflate": DeflateDecoder, 

385 "br": BrotliDecoder, 

386 "zstd": ZStandardDecoder, 

387} 

388 

389 

390if brotli is None: 

391 SUPPORTED_DECODERS.pop("br") # pragma: no cover 

392if zstandard is None: 

393 SUPPORTED_DECODERS.pop("zstd") # pragma: no cover