Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1""" 

2Handlers for Content-Encoding. 

3 

4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding 

5""" 

6 

7from __future__ import annotations 

8 

9import codecs 

10import io 

11import typing 

12import zlib 

13 

14from ._compat import brotli, zstd 

15from ._exceptions import DecodingError 

16 

17 

18class ContentDecoder: 

19 def decode(self, data: bytes) -> bytes: 

20 raise NotImplementedError() # pragma: no cover 

21 

22 def flush(self) -> bytes: 

23 raise NotImplementedError() # pragma: no cover 

24 

25 

26class IdentityDecoder(ContentDecoder): 

27 """ 

28 Handle unencoded data. 

29 """ 

30 

31 def decode(self, data: bytes) -> bytes: 

32 return data 

33 

34 def flush(self) -> bytes: 

35 return b"" 

36 

37 

38class DeflateDecoder(ContentDecoder): 

39 """ 

40 Handle 'deflate' decoding. 

41 

42 See: https://stackoverflow.com/questions/1838699 

43 """ 

44 

45 def __init__(self) -> None: 

46 self.first_attempt = True 

47 self.decompressor = zlib.decompressobj() 

48 

49 def decode(self, data: bytes) -> bytes: 

50 was_first_attempt = self.first_attempt 

51 self.first_attempt = False 

52 try: 

53 return self.decompressor.decompress(data) 

54 except zlib.error as exc: 

55 if was_first_attempt: 

56 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) 

57 return self.decode(data) 

58 raise DecodingError(str(exc)) from exc 

59 

60 def flush(self) -> bytes: 

61 try: 

62 return self.decompressor.flush() 

63 except zlib.error as exc: # pragma: no cover 

64 raise DecodingError(str(exc)) from exc 

65 

66 

67class GZipDecoder(ContentDecoder): 

68 """ 

69 Handle 'gzip' decoding. 

70 

71 See: https://stackoverflow.com/questions/1838699 

72 """ 

73 

74 def __init__(self) -> None: 

75 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) 

76 

77 def decode(self, data: bytes) -> bytes: 

78 try: 

79 return self.decompressor.decompress(data) 

80 except zlib.error as exc: 

81 raise DecodingError(str(exc)) from exc 

82 

83 def flush(self) -> bytes: 

84 try: 

85 return self.decompressor.flush() 

86 except zlib.error as exc: # pragma: no cover 

87 raise DecodingError(str(exc)) from exc 

88 

89 

90class BrotliDecoder(ContentDecoder): 

91 """ 

92 Handle 'brotli' decoding. 

93 

94 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/ 

95 or `pip install brotli`. See https://github.com/google/brotli 

96 Supports both 'brotlipy' and 'Brotli' packages since they share an import 

97 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli' 

98 """ 

99 

100 def __init__(self) -> None: 

101 if brotli is None: # pragma: no cover 

102 raise ImportError( 

103 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' " 

104 "packages have been installed. " 

105 "Make sure to install httpx using `pip install httpx[brotli]`." 

106 ) from None 

107 

108 self.decompressor = brotli.Decompressor() 

109 self.seen_data = False 

110 self._decompress: typing.Callable[[bytes], bytes] 

111 if hasattr(self.decompressor, "decompress"): 

112 # The 'brotlicffi' package. 

113 self._decompress = self.decompressor.decompress # pragma: no cover 

114 else: 

115 # The 'brotli' package. 

116 self._decompress = self.decompressor.process # pragma: no cover 

117 

118 def decode(self, data: bytes) -> bytes: 

119 if not data: 

120 return b"" 

121 self.seen_data = True 

122 try: 

123 return self._decompress(data) 

124 except brotli.error as exc: 

125 raise DecodingError(str(exc)) from exc 

126 

127 def flush(self) -> bytes: 

128 if not self.seen_data: 

129 return b"" 

130 try: 

131 if hasattr(self.decompressor, "finish"): 

132 # Only available in the 'brotlicffi' package. 

133 

134 # As the decompressor decompresses eagerly, this 

135 # will never actually emit any data. However, it will potentially throw 

136 # errors if a truncated or damaged data stream has been used. 

137 self.decompressor.finish() # pragma: no cover 

138 return b"" 

139 except brotli.error as exc: # pragma: no cover 

140 raise DecodingError(str(exc)) from exc 

141 

142 

143class ZStandardDecoder(ContentDecoder): 

144 """ 

145 Handle 'zstd' RFC 8878 decoding. 

146 

147 Requires `pip install zstandard`. 

148 Can be installed as a dependency of httpx using `pip install httpx[zstd]`. 

149 """ 

150 

151 # inspired by the ZstdDecoder implementation in urllib3 

152 def __init__(self) -> None: 

153 if zstd is None: # pragma: no cover 

154 raise ImportError( 

155 "Using 'ZStandardDecoder', ..." 

156 "Make sure to install httpx using `pip install httpx[zstd]`." 

157 ) from None 

158 

159 self.decompressor = zstd.ZstdDecompressor().decompressobj() 

160 

161 def decode(self, data: bytes) -> bytes: 

162 assert zstd is not None 

163 output = io.BytesIO() 

164 try: 

165 output.write(self.decompressor.decompress(data)) 

166 while self.decompressor.eof and self.decompressor.unused_data: 

167 unused_data = self.decompressor.unused_data 

168 self.decompressor = zstd.ZstdDecompressor().decompressobj() 

169 output.write(self.decompressor.decompress(unused_data)) 

170 except zstd.ZstdError as exc: 

171 raise DecodingError(str(exc)) from exc 

172 return output.getvalue() 

173 

174 def flush(self) -> bytes: 

175 ret = self.decompressor.flush() # note: this is a no-op 

176 if not self.decompressor.eof: 

177 raise DecodingError("Zstandard data is incomplete") # pragma: no cover 

178 return bytes(ret) 

179 

180 

181class MultiDecoder(ContentDecoder): 

182 """ 

183 Handle the case where multiple encodings have been applied. 

184 """ 

185 

186 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None: 

187 """ 

188 'children' should be a sequence of decoders in the order in which 

189 each was applied. 

190 """ 

191 # Note that we reverse the order for decoding. 

192 self.children = list(reversed(children)) 

193 

194 def decode(self, data: bytes) -> bytes: 

195 for child in self.children: 

196 data = child.decode(data) 

197 return data 

198 

199 def flush(self) -> bytes: 

200 data = b"" 

201 for child in self.children: 

202 data = child.decode(data) + child.flush() 

203 return data 

204 

205 

206class ByteChunker: 

207 """ 

208 Handles returning byte content in fixed-size chunks. 

209 """ 

210 

211 def __init__(self, chunk_size: int | None = None) -> None: 

212 self._buffer = io.BytesIO() 

213 self._chunk_size = chunk_size 

214 

215 def decode(self, content: bytes) -> list[bytes]: 

216 if self._chunk_size is None: 

217 return [content] if content else [] 

218 

219 self._buffer.write(content) 

220 if self._buffer.tell() >= self._chunk_size: 

221 value = self._buffer.getvalue() 

222 chunks = [ 

223 value[i : i + self._chunk_size] 

224 for i in range(0, len(value), self._chunk_size) 

225 ] 

226 if len(chunks[-1]) == self._chunk_size: 

227 self._buffer.seek(0) 

228 self._buffer.truncate() 

229 return chunks 

230 else: 

231 self._buffer.seek(0) 

232 self._buffer.write(chunks[-1]) 

233 self._buffer.truncate() 

234 return chunks[:-1] 

235 else: 

236 return [] 

237 

238 def flush(self) -> list[bytes]: 

239 value = self._buffer.getvalue() 

240 self._buffer.seek(0) 

241 self._buffer.truncate() 

242 return [value] if value else [] 

243 

244 

245class TextChunker: 

246 """ 

247 Handles returning text content in fixed-size chunks. 

248 """ 

249 

250 def __init__(self, chunk_size: int | None = None) -> None: 

251 self._buffer = io.StringIO() 

252 self._chunk_size = chunk_size 

253 

254 def decode(self, content: str) -> list[str]: 

255 if self._chunk_size is None: 

256 return [content] if content else [] 

257 

258 self._buffer.write(content) 

259 if self._buffer.tell() >= self._chunk_size: 

260 value = self._buffer.getvalue() 

261 chunks = [ 

262 value[i : i + self._chunk_size] 

263 for i in range(0, len(value), self._chunk_size) 

264 ] 

265 if len(chunks[-1]) == self._chunk_size: 

266 self._buffer.seek(0) 

267 self._buffer.truncate() 

268 return chunks 

269 else: 

270 self._buffer.seek(0) 

271 self._buffer.write(chunks[-1]) 

272 self._buffer.truncate() 

273 return chunks[:-1] 

274 else: 

275 return [] 

276 

277 def flush(self) -> list[str]: 

278 value = self._buffer.getvalue() 

279 self._buffer.seek(0) 

280 self._buffer.truncate() 

281 return [value] if value else [] 

282 

283 

284class TextDecoder: 

285 """ 

286 Handles incrementally decoding bytes into text 

287 """ 

288 

289 def __init__(self, encoding: str = "utf-8") -> None: 

290 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace") 

291 

292 def decode(self, data: bytes) -> str: 

293 return self.decoder.decode(data) 

294 

295 def flush(self) -> str: 

296 return self.decoder.decode(b"", True) 

297 

298 

299class LineDecoder: 

300 """ 

301 Handles incrementally reading lines from text. 

302 

303 Has the same behaviour as the stdllib splitlines, 

304 but handling the input iteratively. 

305 """ 

306 

307 def __init__(self) -> None: 

308 self.buffer: list[str] = [] 

309 self.trailing_cr: bool = False 

310 

311 def decode(self, text: str) -> list[str]: 

312 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines 

313 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029" 

314 

315 # We always push a trailing `\r` into the next decode iteration. 

316 if self.trailing_cr: 

317 text = "\r" + text 

318 self.trailing_cr = False 

319 if text.endswith("\r"): 

320 self.trailing_cr = True 

321 text = text[:-1] 

322 

323 if not text: 

324 # NOTE: the edge case input of empty text doesn't occur in practice, 

325 # because other httpx internals filter out this value 

326 return [] # pragma: no cover 

327 

328 trailing_newline = text[-1] in NEWLINE_CHARS 

329 lines = text.splitlines() 

330 

331 if len(lines) == 1 and not trailing_newline: 

332 # No new lines, buffer the input and continue. 

333 self.buffer.append(lines[0]) 

334 return [] 

335 

336 if self.buffer: 

337 # Include any existing buffer in the first portion of the 

338 # splitlines result. 

339 lines = ["".join(self.buffer) + lines[0]] + lines[1:] 

340 self.buffer = [] 

341 

342 if not trailing_newline: 

343 # If the last segment of splitlines is not newline terminated, 

344 # then drop it from our output and start a new buffer. 

345 self.buffer = [lines.pop()] 

346 

347 return lines 

348 

349 def flush(self) -> list[str]: 

350 if not self.buffer and not self.trailing_cr: 

351 return [] 

352 

353 lines = ["".join(self.buffer)] 

354 self.buffer = [] 

355 self.trailing_cr = False 

356 return lines 

357 

358 

359SUPPORTED_DECODERS = { 

360 "identity": IdentityDecoder, 

361 "gzip": GZipDecoder, 

362 "deflate": DeflateDecoder, 

363 "br": BrotliDecoder, 

364 "zstd": ZStandardDecoder, 

365} 

366 

367 

368if brotli is None: 

369 SUPPORTED_DECODERS.pop("br") # pragma: no cover 

370if zstd is None: 

371 SUPPORTED_DECODERS.pop("zstd") # pragma: no cover