Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 59%

165 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 07:19 +0000

1""" 

2Handlers for Content-Encoding. 

3 

4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding 

5""" 

6import codecs 

7import io 

8import typing 

9import zlib 

10 

11from ._compat import brotli 

12from ._exceptions import DecodingError 

13 

14 

15class ContentDecoder: 

16 def decode(self, data: bytes) -> bytes: 

17 raise NotImplementedError() # pragma: no cover 

18 

19 def flush(self) -> bytes: 

20 raise NotImplementedError() # pragma: no cover 

21 

22 

23class IdentityDecoder(ContentDecoder): 

24 """ 

25 Handle unencoded data. 

26 """ 

27 

28 def decode(self, data: bytes) -> bytes: 

29 return data 

30 

31 def flush(self) -> bytes: 

32 return b"" 

33 

34 

35class DeflateDecoder(ContentDecoder): 

36 """ 

37 Handle 'deflate' decoding. 

38 

39 See: https://stackoverflow.com/questions/1838699 

40 """ 

41 

42 def __init__(self) -> None: 

43 self.first_attempt = True 

44 self.decompressor = zlib.decompressobj() 

45 

46 def decode(self, data: bytes) -> bytes: 

47 was_first_attempt = self.first_attempt 

48 self.first_attempt = False 

49 try: 

50 return self.decompressor.decompress(data) 

51 except zlib.error as exc: 

52 if was_first_attempt: 

53 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) 

54 return self.decode(data) 

55 raise DecodingError(str(exc)) from exc 

56 

57 def flush(self) -> bytes: 

58 try: 

59 return self.decompressor.flush() 

60 except zlib.error as exc: # pragma: no cover 

61 raise DecodingError(str(exc)) from exc 

62 

63 

64class GZipDecoder(ContentDecoder): 

65 """ 

66 Handle 'gzip' decoding. 

67 

68 See: https://stackoverflow.com/questions/1838699 

69 """ 

70 

71 def __init__(self) -> None: 

72 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) 

73 

74 def decode(self, data: bytes) -> bytes: 

75 try: 

76 return self.decompressor.decompress(data) 

77 except zlib.error as exc: 

78 raise DecodingError(str(exc)) from exc 

79 

80 def flush(self) -> bytes: 

81 try: 

82 return self.decompressor.flush() 

83 except zlib.error as exc: # pragma: no cover 

84 raise DecodingError(str(exc)) from exc 

85 

86 

87class BrotliDecoder(ContentDecoder): 

88 """ 

89 Handle 'brotli' decoding. 

90 

91 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/ 

92 or `pip install brotli`. See https://github.com/google/brotli 

93 Supports both 'brotlipy' and 'Brotli' packages since they share an import 

94 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli' 

95 """ 

96 

97 def __init__(self) -> None: 

98 if brotli is None: # pragma: no cover 

99 raise ImportError( 

100 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' " 

101 "packages have been installed. " 

102 "Make sure to install httpx using `pip install httpx[brotli]`." 

103 ) from None 

104 

105 self.decompressor = brotli.Decompressor() 

106 self.seen_data = False 

107 self._decompress: typing.Callable[[bytes], bytes] 

108 if hasattr(self.decompressor, "decompress"): 

109 # The 'brotlicffi' package. 

110 self._decompress = self.decompressor.decompress # pragma: no cover 

111 else: 

112 # The 'brotli' package. 

113 self._decompress = self.decompressor.process # pragma: no cover 

114 

115 def decode(self, data: bytes) -> bytes: 

116 if not data: 

117 return b"" 

118 self.seen_data = True 

119 try: 

120 return self._decompress(data) 

121 except brotli.error as exc: 

122 raise DecodingError(str(exc)) from exc 

123 

124 def flush(self) -> bytes: 

125 if not self.seen_data: 

126 return b"" 

127 try: 

128 if hasattr(self.decompressor, "finish"): 

129 # Only available in the 'brotlicffi' package. 

130 

131 # As the decompressor decompresses eagerly, this 

132 # will never actually emit any data. However, it will potentially throw 

133 # errors if a truncated or damaged data stream has been used. 

134 self.decompressor.finish() # pragma: no cover 

135 return b"" 

136 except brotli.error as exc: # pragma: no cover 

137 raise DecodingError(str(exc)) from exc 

138 

139 

140class MultiDecoder(ContentDecoder): 

141 """ 

142 Handle the case where multiple encodings have been applied. 

143 """ 

144 

145 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None: 

146 """ 

147 'children' should be a sequence of decoders in the order in which 

148 each was applied. 

149 """ 

150 # Note that we reverse the order for decoding. 

151 self.children = list(reversed(children)) 

152 

153 def decode(self, data: bytes) -> bytes: 

154 for child in self.children: 

155 data = child.decode(data) 

156 return data 

157 

158 def flush(self) -> bytes: 

159 data = b"" 

160 for child in self.children: 

161 data = child.decode(data) + child.flush() 

162 return data 

163 

164 

165class ByteChunker: 

166 """ 

167 Handles returning byte content in fixed-size chunks. 

168 """ 

169 

170 def __init__(self, chunk_size: typing.Optional[int] = None) -> None: 

171 self._buffer = io.BytesIO() 

172 self._chunk_size = chunk_size 

173 

174 def decode(self, content: bytes) -> typing.List[bytes]: 

175 if self._chunk_size is None: 

176 return [content] if content else [] 

177 

178 self._buffer.write(content) 

179 if self._buffer.tell() >= self._chunk_size: 

180 value = self._buffer.getvalue() 

181 chunks = [ 

182 value[i : i + self._chunk_size] 

183 for i in range(0, len(value), self._chunk_size) 

184 ] 

185 if len(chunks[-1]) == self._chunk_size: 

186 self._buffer.seek(0) 

187 self._buffer.truncate() 

188 return chunks 

189 else: 

190 self._buffer.seek(0) 

191 self._buffer.write(chunks[-1]) 

192 self._buffer.truncate() 

193 return chunks[:-1] 

194 else: 

195 return [] 

196 

197 def flush(self) -> typing.List[bytes]: 

198 value = self._buffer.getvalue() 

199 self._buffer.seek(0) 

200 self._buffer.truncate() 

201 return [value] if value else [] 

202 

203 

204class TextChunker: 

205 """ 

206 Handles returning text content in fixed-size chunks. 

207 """ 

208 

209 def __init__(self, chunk_size: typing.Optional[int] = None) -> None: 

210 self._buffer = io.StringIO() 

211 self._chunk_size = chunk_size 

212 

213 def decode(self, content: str) -> typing.List[str]: 

214 if self._chunk_size is None: 

215 return [content] 

216 

217 self._buffer.write(content) 

218 if self._buffer.tell() >= self._chunk_size: 

219 value = self._buffer.getvalue() 

220 chunks = [ 

221 value[i : i + self._chunk_size] 

222 for i in range(0, len(value), self._chunk_size) 

223 ] 

224 if len(chunks[-1]) == self._chunk_size: 

225 self._buffer.seek(0) 

226 self._buffer.truncate() 

227 return chunks 

228 else: 

229 self._buffer.seek(0) 

230 self._buffer.write(chunks[-1]) 

231 self._buffer.truncate() 

232 return chunks[:-1] 

233 else: 

234 return [] 

235 

236 def flush(self) -> typing.List[str]: 

237 value = self._buffer.getvalue() 

238 self._buffer.seek(0) 

239 self._buffer.truncate() 

240 return [value] if value else [] 

241 

242 

243class TextDecoder: 

244 """ 

245 Handles incrementally decoding bytes into text 

246 """ 

247 

248 def __init__(self, encoding: str = "utf-8"): 

249 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace") 

250 

251 def decode(self, data: bytes) -> str: 

252 return self.decoder.decode(data) 

253 

254 def flush(self) -> str: 

255 return self.decoder.decode(b"", True) 

256 

257 

258class LineDecoder: 

259 """ 

260 Handles incrementally reading lines from text. 

261 

262 Has the same behaviour as the stdllib splitlines, but handling the input iteratively. 

263 """ 

264 

265 def __init__(self) -> None: 

266 self.buffer: typing.List[str] = [] 

267 self.trailing_cr: bool = False 

268 

269 def decode(self, text: str) -> typing.List[str]: 

270 # See https://docs.python.org/3/library/stdtypes.html#str.splitlines 

271 NEWLINE_CHARS = "\n\r\x0b\x0c\x1c\x1d\x1e\x85\u2028\u2029" 

272 

273 # We always push a trailing `\r` into the next decode iteration. 

274 if self.trailing_cr: 

275 text = "\r" + text 

276 self.trailing_cr = False 

277 if text.endswith("\r"): 

278 self.trailing_cr = True 

279 text = text[:-1] 

280 

281 if not text: 

282 return [] 

283 

284 trailing_newline = text[-1] in NEWLINE_CHARS 

285 lines = text.splitlines() 

286 

287 if len(lines) == 1 and not trailing_newline: 

288 # No new lines, buffer the input and continue. 

289 self.buffer.append(lines[0]) 

290 return [] 

291 

292 if self.buffer: 

293 # Include any existing buffer in the first portion of the 

294 # splitlines result. 

295 lines = ["".join(self.buffer) + lines[0]] + lines[1:] 

296 self.buffer = [] 

297 

298 if not trailing_newline: 

299 # If the last segment of splitlines is not newline terminated, 

300 # then drop it from our output and start a new buffer. 

301 self.buffer = [lines.pop()] 

302 

303 return lines 

304 

305 def flush(self) -> typing.List[str]: 

306 if not self.buffer and not self.trailing_cr: 

307 return [] 

308 

309 lines = ["".join(self.buffer)] 

310 self.buffer = [] 

311 self.trailing_cr = False 

312 return lines 

313 

314 

315SUPPORTED_DECODERS = { 

316 "identity": IdentityDecoder, 

317 "gzip": GZipDecoder, 

318 "deflate": DeflateDecoder, 

319 "br": BrotliDecoder, 

320} 

321 

322 

323if brotli is None: 

324 SUPPORTED_DECODERS.pop("br") # pragma: no cover