Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/httpx/_decoders.py: 26%

178 statements  

« prev     ^ index     » next       coverage.py v7.2.2, created at 2023-03-26 06:12 +0000

1""" 

2Handlers for Content-Encoding. 

3 

4See: https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Content-Encoding 

5""" 

6import codecs 

7import io 

8import typing 

9import zlib 

10 

11from ._compat import brotli 

12from ._exceptions import DecodingError 

13 

14 

15class ContentDecoder: 

16 def decode(self, data: bytes) -> bytes: 

17 raise NotImplementedError() # pragma: no cover 

18 

19 def flush(self) -> bytes: 

20 raise NotImplementedError() # pragma: no cover 

21 

22 

23class IdentityDecoder(ContentDecoder): 

24 """ 

25 Handle unencoded data. 

26 """ 

27 

28 def decode(self, data: bytes) -> bytes: 

29 return data 

30 

31 def flush(self) -> bytes: 

32 return b"" 

33 

34 

35class DeflateDecoder(ContentDecoder): 

36 """ 

37 Handle 'deflate' decoding. 

38 

39 See: https://stackoverflow.com/questions/1838699 

40 """ 

41 

42 def __init__(self) -> None: 

43 self.first_attempt = True 

44 self.decompressor = zlib.decompressobj() 

45 

46 def decode(self, data: bytes) -> bytes: 

47 was_first_attempt = self.first_attempt 

48 self.first_attempt = False 

49 try: 

50 return self.decompressor.decompress(data) 

51 except zlib.error as exc: 

52 if was_first_attempt: 

53 self.decompressor = zlib.decompressobj(-zlib.MAX_WBITS) 

54 return self.decode(data) 

55 raise DecodingError(str(exc)) from exc 

56 

57 def flush(self) -> bytes: 

58 try: 

59 return self.decompressor.flush() 

60 except zlib.error as exc: # pragma: no cover 

61 raise DecodingError(str(exc)) from exc 

62 

63 

64class GZipDecoder(ContentDecoder): 

65 """ 

66 Handle 'gzip' decoding. 

67 

68 See: https://stackoverflow.com/questions/1838699 

69 """ 

70 

71 def __init__(self) -> None: 

72 self.decompressor = zlib.decompressobj(zlib.MAX_WBITS | 16) 

73 

74 def decode(self, data: bytes) -> bytes: 

75 try: 

76 return self.decompressor.decompress(data) 

77 except zlib.error as exc: 

78 raise DecodingError(str(exc)) from exc 

79 

80 def flush(self) -> bytes: 

81 try: 

82 return self.decompressor.flush() 

83 except zlib.error as exc: # pragma: no cover 

84 raise DecodingError(str(exc)) from exc 

85 

86 

87class BrotliDecoder(ContentDecoder): 

88 """ 

89 Handle 'brotli' decoding. 

90 

91 Requires `pip install brotlipy`. See: https://brotlipy.readthedocs.io/ 

92 or `pip install brotli`. See https://github.com/google/brotli 

93 Supports both 'brotlipy' and 'Brotli' packages since they share an import 

94 name. The top branches are for 'brotlipy' and bottom branches for 'Brotli' 

95 """ 

96 

97 def __init__(self) -> None: 

98 if brotli is None: # pragma: no cover 

99 raise ImportError( 

100 "Using 'BrotliDecoder', but neither of the 'brotlicffi' or 'brotli' " 

101 "packages have been installed. " 

102 "Make sure to install httpx using `pip install httpx[brotli]`." 

103 ) from None 

104 

105 self.decompressor = brotli.Decompressor() 

106 self.seen_data = False 

107 self._decompress: typing.Callable[[bytes], bytes] 

108 if hasattr(self.decompressor, "decompress"): 

109 # The 'brotlicffi' package. 

110 self._decompress = self.decompressor.decompress # pragma: no cover 

111 else: 

112 # The 'brotli' package. 

113 self._decompress = self.decompressor.process # pragma: no cover 

114 

115 def decode(self, data: bytes) -> bytes: 

116 if not data: 

117 return b"" 

118 self.seen_data = True 

119 try: 

120 return self._decompress(data) 

121 except brotli.error as exc: 

122 raise DecodingError(str(exc)) from exc 

123 

124 def flush(self) -> bytes: 

125 if not self.seen_data: 

126 return b"" 

127 try: 

128 if hasattr(self.decompressor, "finish"): 

129 # Only available in the 'brotlicffi' package. 

130 

131 # As the decompressor decompresses eagerly, this 

132 # will never actually emit any data. However, it will potentially throw 

133 # errors if a truncated or damaged data stream has been used. 

134 self.decompressor.finish() # pragma: no cover 

135 return b"" 

136 except brotli.error as exc: # pragma: no cover 

137 raise DecodingError(str(exc)) from exc 

138 

139 

140class MultiDecoder(ContentDecoder): 

141 """ 

142 Handle the case where multiple encodings have been applied. 

143 """ 

144 

145 def __init__(self, children: typing.Sequence[ContentDecoder]) -> None: 

146 """ 

147 'children' should be a sequence of decoders in the order in which 

148 each was applied. 

149 """ 

150 # Note that we reverse the order for decoding. 

151 self.children = list(reversed(children)) 

152 

153 def decode(self, data: bytes) -> bytes: 

154 for child in self.children: 

155 data = child.decode(data) 

156 return data 

157 

158 def flush(self) -> bytes: 

159 data = b"" 

160 for child in self.children: 

161 data = child.decode(data) + child.flush() 

162 return data 

163 

164 

165class ByteChunker: 

166 """ 

167 Handles returning byte content in fixed-size chunks. 

168 """ 

169 

170 def __init__(self, chunk_size: typing.Optional[int] = None) -> None: 

171 self._buffer = io.BytesIO() 

172 self._chunk_size = chunk_size 

173 

174 def decode(self, content: bytes) -> typing.List[bytes]: 

175 if self._chunk_size is None: 

176 return [content] if content else [] 

177 

178 self._buffer.write(content) 

179 if self._buffer.tell() >= self._chunk_size: 

180 value = self._buffer.getvalue() 

181 chunks = [ 

182 value[i : i + self._chunk_size] 

183 for i in range(0, len(value), self._chunk_size) 

184 ] 

185 if len(chunks[-1]) == self._chunk_size: 

186 self._buffer.seek(0) 

187 self._buffer.truncate() 

188 return chunks 

189 else: 

190 self._buffer.seek(0) 

191 self._buffer.write(chunks[-1]) 

192 self._buffer.truncate() 

193 return chunks[:-1] 

194 else: 

195 return [] 

196 

197 def flush(self) -> typing.List[bytes]: 

198 value = self._buffer.getvalue() 

199 self._buffer.seek(0) 

200 self._buffer.truncate() 

201 return [value] if value else [] 

202 

203 

204class TextChunker: 

205 """ 

206 Handles returning text content in fixed-size chunks. 

207 """ 

208 

209 def __init__(self, chunk_size: typing.Optional[int] = None) -> None: 

210 self._buffer = io.StringIO() 

211 self._chunk_size = chunk_size 

212 

213 def decode(self, content: str) -> typing.List[str]: 

214 if self._chunk_size is None: 

215 return [content] 

216 

217 self._buffer.write(content) 

218 if self._buffer.tell() >= self._chunk_size: 

219 value = self._buffer.getvalue() 

220 chunks = [ 

221 value[i : i + self._chunk_size] 

222 for i in range(0, len(value), self._chunk_size) 

223 ] 

224 if len(chunks[-1]) == self._chunk_size: 

225 self._buffer.seek(0) 

226 self._buffer.truncate() 

227 return chunks 

228 else: 

229 self._buffer.seek(0) 

230 self._buffer.write(chunks[-1]) 

231 self._buffer.truncate() 

232 return chunks[:-1] 

233 else: 

234 return [] 

235 

236 def flush(self) -> typing.List[str]: 

237 value = self._buffer.getvalue() 

238 self._buffer.seek(0) 

239 self._buffer.truncate() 

240 return [value] if value else [] 

241 

242 

243class TextDecoder: 

244 """ 

245 Handles incrementally decoding bytes into text 

246 """ 

247 

248 def __init__(self, encoding: str = "utf-8"): 

249 self.decoder = codecs.getincrementaldecoder(encoding)(errors="replace") 

250 

251 def decode(self, data: bytes) -> str: 

252 return self.decoder.decode(data) 

253 

254 def flush(self) -> str: 

255 return self.decoder.decode(b"", True) 

256 

257 

258class LineDecoder: 

259 """ 

260 Handles incrementally reading lines from text. 

261 

262 Uses universal line decoding, supporting any of `\n`, `\r`, or `\r\n` 

263 as line endings, normalizing to `\n`. 

264 """ 

265 

266 def __init__(self) -> None: 

267 self.buffer = "" 

268 

269 def decode(self, text: str) -> typing.List[str]: 

270 lines = [] 

271 

272 if text and self.buffer and self.buffer[-1] == "\r": 

273 if text.startswith("\n"): 

274 # Handle the case where we have an "\r\n" split across 

275 # our previous input, and our new chunk. 

276 lines.append(self.buffer[:-1] + "\n") 

277 self.buffer = "" 

278 text = text[1:] 

279 else: 

280 # Handle the case where we have "\r" at the end of our 

281 # previous input. 

282 lines.append(self.buffer[:-1] + "\n") 

283 self.buffer = "" 

284 

285 while text: 

286 num_chars = len(text) 

287 for idx in range(num_chars): 

288 char = text[idx] 

289 next_char = None if idx + 1 == num_chars else text[idx + 1] 

290 if char == "\n": 

291 lines.append(self.buffer + text[: idx + 1]) 

292 self.buffer = "" 

293 text = text[idx + 1 :] 

294 break 

295 elif char == "\r" and next_char == "\n": 

296 lines.append(self.buffer + text[:idx] + "\n") 

297 self.buffer = "" 

298 text = text[idx + 2 :] 

299 break 

300 elif char == "\r" and next_char is not None: 

301 lines.append(self.buffer + text[:idx] + "\n") 

302 self.buffer = "" 

303 text = text[idx + 1 :] 

304 break 

305 elif next_char is None: 

306 self.buffer += text 

307 text = "" 

308 break 

309 

310 return lines 

311 

312 def flush(self) -> typing.List[str]: 

313 if self.buffer.endswith("\r"): 

314 # Handle the case where we had a trailing '\r', which could have 

315 # been a '\r\n' pair. 

316 lines = [self.buffer[:-1] + "\n"] 

317 elif self.buffer: 

318 lines = [self.buffer] 

319 else: 

320 lines = [] 

321 self.buffer = "" 

322 return lines 

323 

324 

325SUPPORTED_DECODERS = { 

326 "identity": IdentityDecoder, 

327 "gzip": GZipDecoder, 

328 "deflate": DeflateDecoder, 

329 "br": BrotliDecoder, 

330} 

331 

332 

333if brotli is None: 

334 SUPPORTED_DECODERS.pop("br") # pragma: no cover