Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 28%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

165 statements  

1""" 

2Handlers for Content-Encoding. 

3 

4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding 

5""" 

6from __future__ import annotations 

7 

8import codecs 

9import io 

10import typing 

11import zlib 

12 

13from ._compat import brotli 

14from ._exceptions import DecodingError 

15 

16 

17class ContentDecoder: 

18 def decode(self, data: bytes) -> bytes: 

19 raise NotImplementedError() # pragma: no cover 

20 

21 def flush(self) -> bytes: 

22 raise NotImplementedError() # pragma: no cover 

23 

24 

25class IdentityDecoder(ContentDecoder): 

26 """ 

27 Handle unencoded data. 

28 """ 

29 

30 def decode(self, data: bytes) -> bytes: 

31 return data 

32 

33 def flush(self) -> bytes: 

34 return b"" 

35 

36 

37class DeflateDecoder(ContentDecoder): 

38 """ 

39 Handle 'deflate' decoding. 

40 

41 See: https://stackoverflow.com/questions/1838699 

42 """ 

43 

44 def __init__(self) -> None: 

45 self.first_attempt = True 

46 self.decompressor = zlib.decompressobj() 

47 

48 def decode(self, data: bytes) -> bytes: 

49 was_first_attempt = self.first_attempt 

50 self.first_attempt = False 

51 try: 

52 return self.decompressor.decompress(data) 

53 except zlib.error as exc: 

54 if was_first_attempt: 

55 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) 

56 return self.decode(data) 

57 raise DecodingError(str(exc)) from exc 

58 

59 def flush(self) -> bytes: 

60 try: 

61 return self.decompressor.flush() 

62 except zlib.error as exc: # pragma: no cover 

63 raise DecodingError(str(exc)) from exc 

64 

65 

66class GZipDecoder(ContentDecoder): 

67 """ 

68 Handle 'gzip' decoding. 

69 

70 See: https://stackoverflow.com/questions/1838699 

71 """ 

72 

73 def __init__(self) -> None: 

74 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) 

75 

76 def decode(self, data: bytes) -> bytes: 

77 try: 

78 return self.decompressor.decompress(data) 

79 except zlib.error as exc: 

80 raise DecodingError(str(exc)) from exc 

81 

82 def flush(self) -> bytes: 

83 try: 

84 return self.decompressor.flush() 

85 except zlib.error as exc: # pragma: no cover 

86 raise DecodingError(str(exc)) from exc 

87 

88 

89class BrotliDecoder(ContentDecoder): 

90 """ 

91 Handle 'brotli' decoding. 

92 

93 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/ 

94 or `pip install brotli`. See https://github.com/google/brotli 

95 Supports both 'brotlipy' and 'Brotli' packages since they share an import 

96 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli' 

97 """ 

98 

99 def __init__(self) -> None: 

100 if brotli is None: # pragma: no cover 

101 raise ImportError( 

102 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' " 

103 "packages have been installed. " 

104 "Make sure to install httpx using `pip install httpx[brotli]`." 

105 ) from None 

106 

107 self.decompressor = brotli.Decompressor() 

108 self.seen_data = False 

109 self._decompress: typing.Callable[[bytes], bytes] 

110 if hasattr(self.decompressor, "decompress"): 

111 # The 'brotlicffi' package. 

112 self._decompress = self.decompressor.decompress # pragma: no cover 

113 else: 

114 # The 'brotli' package. 

115 self._decompress = self.decompressor.process # pragma: no cover 

116 

117 def decode(self, data: bytes) -> bytes: 

118 if not data: 

119 return b"" 

120 self.seen_data = True 

121 try: 

122 return self._decompress(data) 

123 except brotli.error as exc: 

124 raise DecodingError(str(exc)) from exc 

125 

126 def flush(self) -> bytes: 

127 if not self.seen_data: 

128 return b"" 

129 try: 

130 if hasattr(self.decompressor, "finish"): 

131 # Only available in the 'brotlicffi' package. 

132 

133 # As the decompressor decompresses eagerly, this 

134 # will never actually emit any data. However, it will potentially throw 

135 # errors if a truncated or damaged data stream has been used. 

136 self.decompressor.finish() # pragma: no cover 

137 return b"" 

138 except brotli.error as exc: # pragma: no cover 

139 raise DecodingError(str(exc)) from exc 

140 

141 

142class MultiDecoder(ContentDecoder): 

143 """ 

144 Handle the case where multiple encodings have been applied. 

145 """ 

146 

147 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None: 

148 """ 

149 'children' should be a sequence of decoders in the order in which 

150 each was applied. 

151 """ 

152 # Note that we reverse the order for decoding. 

153 self.children = list(reversed(children)) 

154 

155 def decode(self, data: bytes) -> bytes: 

156 for child in self.children: 

157 data = child.decode(data) 

158 return data 

159 

160 def flush(self) -> bytes: 

161 data = b"" 

162 for child in self.children: 

163 data = child.decode(data) + child.flush() 

164 return data 

165 

166 

167class ByteChunker: 

168 """ 

169 Handles returning byte content in fixed-size chunks. 

170 """ 

171 

172 def __init__(self, chunk_size: int | None = None) -> None: 

173 self._buffer = io.BytesIO() 

174 self._chunk_size = chunk_size 

175 

176 def decode(self, content: bytes) -> list[bytes]: 

177 if self._chunk_size is None: 

178 return [content] if content else [] 

179 

180 self._buffer.write(content) 

181 if self._buffer.tell() >= self._chunk_size: 

182 value = self._buffer.getvalue() 

183 chunks = [ 

184 value[i : i + self._chunk_size] 

185 for i in range(0, len(value), self._chunk_size) 

186 ] 

187 if len(chunks[-1]) == self._chunk_size: 

188 self._buffer.seek(0) 

189 self._buffer.truncate() 

190 return chunks 

191 else: 

192 self._buffer.seek(0) 

193 self._buffer.write(chunks[-1]) 

194 self._buffer.truncate() 

195 return chunks[:-1] 

196 else: 

197 return [] 

198 

199 def flush(self) -> list[bytes]: 

200 value = self._buffer.getvalue() 

201 self._buffer.seek(0) 

202 self._buffer.truncate() 

203 return [value] if value else [] 

204 

205 

206class TextChunker: 

207 """ 

208 Handles returning text content in fixed-size chunks. 

209 """ 

210 

211 def __init__(self, chunk_size: int | None = None) -> None: 

212 self._buffer = io.StringIO() 

213 self._chunk_size = chunk_size 

214 

215 def decode(self, content: str) -> list[str]: 

216 if self._chunk_size is None: 

217 return [content] if content else [] 

218 

219 self._buffer.write(content) 

220 if self._buffer.tell() >= self._chunk_size: 

221 value = self._buffer.getvalue() 

222 chunks = [ 

223 value[i : i + self._chunk_size] 

224 for i in range(0, len(value), self._chunk_size) 

225 ] 

226 if len(chunks[-1]) == self._chunk_size: 

227 self._buffer.seek(0) 

228 self._buffer.truncate() 

229 return chunks 

230 else: 

231 self._buffer.seek(0) 

232 self._buffer.write(chunks[-1]) 

233 self._buffer.truncate() 

234 return chunks[:-1] 

235 else: 

236 return [] 

237 

238 def flush(self) -> list[str]: 

239 value = self._buffer.getvalue() 

240 self._buffer.seek(0) 

241 self._buffer.truncate() 

242 return [value] if value else [] 

243 

244 

245class TextDecoder: 

246 """ 

247 Handles incrementally decoding bytes into text 

248 """ 

249 

250 def __init__(self, encoding: str = "utf-8") -> None: 

251 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace") 

252 

253 def decode(self, data: bytes) -> str: 

254 return self.decoder.decode(data) 

255 

256 def flush(self) -> str: 

257 return self.decoder.decode(b"", True) 

258 

259 

260class LineDecoder: 

261 """ 

262 Handles incrementally reading lines from text. 

263 

264 Has the same behaviour as the stdllib splitlines, 

265 but handling the input iteratively. 

266 """ 

267 

268 def __init__(self) -> None: 

269 self.buffer: list[str] = [] 

270 self.trailing_cr: bool = False 

271 

272 def decode(self, text: str) -> list[str]: 

273 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines 

274 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029" 

275 

276 # We always push a trailing `\r` into the next decode iteration. 

277 if self.trailing_cr: 

278 text = "\r" + text 

279 self.trailing_cr = False 

280 if text.endswith("\r"): 

281 self.trailing_cr = True 

282 text = text[:-1] 

283 

284 if not text: 

285 # NOTE: the edge case input of empty text doesn't occur in practice, 

286 # because other httpx internals filter out this value 

287 return [] # pragma: no cover 

288 

289 trailing_newline = text[-1] in NEWLINE_CHARS 

290 lines = text.splitlines() 

291 

292 if len(lines) == 1 and not trailing_newline: 

293 # No new lines, buffer the input and continue. 

294 self.buffer.append(lines[0]) 

295 return [] 

296 

297 if self.buffer: 

298 # Include any existing buffer in the first portion of the 

299 # splitlines result. 

300 lines = ["".join(self.buffer) + lines[0]] + lines[1:] 

301 self.buffer = [] 

302 

303 if not trailing_newline: 

304 # If the last segment of splitlines is not newline terminated, 

305 # then drop it from our output and start a new buffer. 

306 self.buffer = [lines.pop()] 

307 

308 return lines 

309 

310 def flush(self) -> list[str]: 

311 if not self.buffer and not self.trailing_cr: 

312 return [] 

313 

314 lines = ["".join(self.buffer)] 

315 self.buffer = [] 

316 self.trailing_cr = False 

317 return lines 

318 

319 

320SUPPORTED_DECODERS = { 

321 "identity": IdentityDecoder, 

322 "gzip": GZipDecoder, 

323 "deflate": DeflateDecoder, 

324 "br": BrotliDecoder, 

325} 

326 

327 

328if brotli is None: 

329 SUPPORTED_DECODERS.pop("br") # pragma: no cover