Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/compression_utils.py: 65%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import asyncio
2import sys
3import zlib
4from abc import ABC, abstractmethod
5from concurrent.futures import Executor
6from typing import Any, Final, Protocol, TypedDict, cast
8if sys.version_info >= (3, 12):
9 from collections.abc import Buffer
10else:
11 from typing import Union
13 Buffer = Union[bytes, bytearray, "memoryview[int]", "memoryview[bytes]"]
15try:
16 try:
17 import brotlicffi as brotli
18 except ImportError:
19 import brotli
21 HAS_BROTLI = True
22except ImportError:
23 HAS_BROTLI = False
25try:
26 if sys.version_info >= (3, 14):
27 from compression.zstd import ZstdDecompressor # noqa: I900
28 else: # TODO(PY314): Remove mentions of backports.zstd across codebase
29 from backports.zstd import ZstdDecompressor
31 HAS_ZSTD = True
32except ImportError:
33 HAS_ZSTD = False
36MAX_SYNC_CHUNK_SIZE = 4096
37DEFAULT_MAX_DECOMPRESS_SIZE = 2**25 # 32MiB
39# Unlimited decompression constants - different libraries use different conventions
40ZLIB_MAX_LENGTH_UNLIMITED = 0 # zlib uses 0 to mean unlimited
41ZSTD_MAX_LENGTH_UNLIMITED = -1 # zstd uses -1 to mean unlimited
44class ZLibCompressObjProtocol(Protocol):
45 def compress(self, data: Buffer) -> bytes: ...
46 def flush(self, mode: int = ..., /) -> bytes: ...
49class ZLibDecompressObjProtocol(Protocol):
50 def decompress(self, data: Buffer, max_length: int = ...) -> bytes: ...
51 def flush(self, length: int = ..., /) -> bytes: ...
53 @property
54 def eof(self) -> bool: ...
57class ZLibBackendProtocol(Protocol):
58 MAX_WBITS: int
59 Z_FULL_FLUSH: int
60 Z_SYNC_FLUSH: int
61 Z_BEST_SPEED: int
62 Z_FINISH: int
64 def compressobj(
65 self,
66 level: int = ...,
67 method: int = ...,
68 wbits: int = ...,
69 memLevel: int = ...,
70 strategy: int = ...,
71 zdict: Buffer | None = ...,
72 ) -> ZLibCompressObjProtocol: ...
73 def decompressobj(
74 self, wbits: int = ..., zdict: Buffer = ...
75 ) -> ZLibDecompressObjProtocol: ...
77 def compress(
78 self, data: Buffer, /, level: int = ..., wbits: int = ...
79 ) -> bytes: ...
80 def decompress(
81 self, data: Buffer, /, wbits: int = ..., bufsize: int = ...
82 ) -> bytes: ...
85class CompressObjArgs(TypedDict, total=False):
86 wbits: int
87 strategy: int
88 level: int
91class ZLibBackendWrapper:
92 def __init__(self, _zlib_backend: ZLibBackendProtocol):
93 self._zlib_backend: ZLibBackendProtocol = _zlib_backend
95 @property
96 def name(self) -> str:
97 return getattr(self._zlib_backend, "__name__", "undefined")
99 @property
100 def MAX_WBITS(self) -> int:
101 return self._zlib_backend.MAX_WBITS
103 @property
104 def Z_FULL_FLUSH(self) -> int:
105 return self._zlib_backend.Z_FULL_FLUSH
107 @property
108 def Z_SYNC_FLUSH(self) -> int:
109 return self._zlib_backend.Z_SYNC_FLUSH
111 @property
112 def Z_BEST_SPEED(self) -> int:
113 return self._zlib_backend.Z_BEST_SPEED
115 @property
116 def Z_FINISH(self) -> int:
117 return self._zlib_backend.Z_FINISH
119 def compressobj(self, *args: Any, **kwargs: Any) -> ZLibCompressObjProtocol:
120 return self._zlib_backend.compressobj(*args, **kwargs)
122 def decompressobj(self, *args: Any, **kwargs: Any) -> ZLibDecompressObjProtocol:
123 return self._zlib_backend.decompressobj(*args, **kwargs)
125 def compress(self, data: Buffer, *args: Any, **kwargs: Any) -> bytes:
126 return self._zlib_backend.compress(data, *args, **kwargs)
128 def decompress(self, data: Buffer, *args: Any, **kwargs: Any) -> bytes:
129 return self._zlib_backend.decompress(data, *args, **kwargs)
131 # Everything not explicitly listed in the Protocol we just pass through
132 def __getattr__(self, attrname: str) -> Any:
133 return getattr(self._zlib_backend, attrname)
136ZLibBackend: ZLibBackendWrapper = ZLibBackendWrapper(zlib)
139def set_zlib_backend(new_zlib_backend: ZLibBackendProtocol) -> None:
140 ZLibBackend._zlib_backend = new_zlib_backend
143def encoding_to_mode(
144 encoding: str | None = None,
145 suppress_deflate_header: bool = False,
146) -> int:
147 if encoding == "gzip":
148 return 16 + ZLibBackend.MAX_WBITS
150 return -ZLibBackend.MAX_WBITS if suppress_deflate_header else ZLibBackend.MAX_WBITS
153class DecompressionBaseHandler(ABC):
154 def __init__(
155 self,
156 executor: Executor | None = None,
157 max_sync_chunk_size: int | None = MAX_SYNC_CHUNK_SIZE,
158 ):
159 """Base class for decompression handlers."""
160 self._executor = executor
161 self._max_sync_chunk_size = max_sync_chunk_size
163 @abstractmethod
164 def decompress_sync(
165 self, data: bytes, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED
166 ) -> bytes:
167 """Decompress the given data."""
169 async def decompress(
170 self, data: bytes, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED
171 ) -> bytes:
172 """Decompress the given data."""
173 if (
174 self._max_sync_chunk_size is not None
175 and len(data) > self._max_sync_chunk_size
176 ):
177 return await asyncio.get_event_loop().run_in_executor(
178 self._executor, self.decompress_sync, data, max_length
179 )
180 return self.decompress_sync(data, max_length)
183class ZLibCompressor:
184 def __init__(
185 self,
186 encoding: str | None = None,
187 suppress_deflate_header: bool = False,
188 level: int | None = None,
189 wbits: int | None = None,
190 strategy: int | None = None,
191 executor: Executor | None = None,
192 max_sync_chunk_size: int | None = MAX_SYNC_CHUNK_SIZE,
193 ):
194 self._executor = executor
195 self._max_sync_chunk_size = max_sync_chunk_size
196 self._mode = (
197 encoding_to_mode(encoding, suppress_deflate_header)
198 if wbits is None
199 else wbits
200 )
201 self._zlib_backend: Final = ZLibBackendWrapper(ZLibBackend._zlib_backend)
203 kwargs: CompressObjArgs = {}
204 kwargs["wbits"] = self._mode
205 if strategy is not None:
206 kwargs["strategy"] = strategy
207 if level is not None:
208 kwargs["level"] = level
209 self._compressor = self._zlib_backend.compressobj(**kwargs)
211 def compress_sync(self, data: Buffer) -> bytes:
212 return self._compressor.compress(data)
214 async def compress(self, data: Buffer) -> bytes:
215 """Compress the data and returned the compressed bytes.
217 Note that flush() must be called after the last call to compress()
219 If the data size is large than the max_sync_chunk_size, the compression
220 will be done in the executor. Otherwise, the compression will be done
221 in the event loop.
223 **WARNING: This method is NOT cancellation-safe when used with flush().**
224 If this operation is cancelled, the compressor state may be corrupted.
225 The connection MUST be closed after cancellation to avoid data corruption
226 in subsequent compress operations.
228 For cancellation-safe compression (e.g., WebSocket), the caller MUST wrap
229 compress() + flush() + send operations in a shield and lock to ensure atomicity.
230 """
231 # For large payloads, offload compression to executor to avoid blocking event loop
232 should_use_executor = (
233 self._max_sync_chunk_size is not None
234 and len(data) > self._max_sync_chunk_size
235 )
236 if should_use_executor:
237 return await asyncio.get_running_loop().run_in_executor(
238 self._executor, self._compressor.compress, data
239 )
240 return self.compress_sync(data)
242 def flush(self, mode: int | None = None) -> bytes:
243 """Flush the compressor synchronously.
245 **WARNING: This method is NOT cancellation-safe when called after compress().**
246 The flush() operation accesses shared compressor state. If compress() was
247 cancelled, calling flush() may result in corrupted data. The connection MUST
248 be closed after compress() cancellation.
250 For cancellation-safe compression (e.g., WebSocket), the caller MUST wrap
251 compress() + flush() + send operations in a shield and lock to ensure atomicity.
252 """
253 return self._compressor.flush(
254 mode if mode is not None else self._zlib_backend.Z_FINISH
255 )
258class ZLibDecompressor(DecompressionBaseHandler):
259 def __init__(
260 self,
261 encoding: str | None = None,
262 suppress_deflate_header: bool = False,
263 executor: Executor | None = None,
264 max_sync_chunk_size: int | None = MAX_SYNC_CHUNK_SIZE,
265 ):
266 super().__init__(executor=executor, max_sync_chunk_size=max_sync_chunk_size)
267 self._mode = encoding_to_mode(encoding, suppress_deflate_header)
268 self._zlib_backend: Final = ZLibBackendWrapper(ZLibBackend._zlib_backend)
269 self._decompressor = self._zlib_backend.decompressobj(wbits=self._mode)
271 def decompress_sync(
272 self, data: Buffer, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED
273 ) -> bytes:
274 return self._decompressor.decompress(data, max_length)
276 def flush(self, length: int = 0) -> bytes:
277 return (
278 self._decompressor.flush(length)
279 if length > 0
280 else self._decompressor.flush()
281 )
283 @property
284 def eof(self) -> bool:
285 return self._decompressor.eof
288class BrotliDecompressor(DecompressionBaseHandler):
289 # Supports both 'brotlipy' and 'Brotli' packages
290 # since they share an import name. The top branches
291 # are for 'brotlipy' and bottom branches for 'Brotli'
292 def __init__(
293 self,
294 executor: Executor | None = None,
295 max_sync_chunk_size: int | None = MAX_SYNC_CHUNK_SIZE,
296 ) -> None:
297 """Decompress data using the Brotli library."""
298 if not HAS_BROTLI:
299 raise RuntimeError(
300 "The brotli decompression is not available. "
301 "Please install `Brotli` module"
302 )
303 self._obj = brotli.Decompressor()
304 super().__init__(executor=executor, max_sync_chunk_size=max_sync_chunk_size)
306 def decompress_sync(
307 self, data: Buffer, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED
308 ) -> bytes:
309 """Decompress the given data."""
310 if hasattr(self._obj, "decompress"):
311 return cast(bytes, self._obj.decompress(data, max_length))
312 return cast(bytes, self._obj.process(data, max_length))
314 def flush(self) -> bytes:
315 """Flush the decompressor."""
316 if hasattr(self._obj, "flush"):
317 return cast(bytes, self._obj.flush())
318 return b""
321class ZSTDDecompressor(DecompressionBaseHandler):
322 def __init__(
323 self,
324 executor: Executor | None = None,
325 max_sync_chunk_size: int | None = MAX_SYNC_CHUNK_SIZE,
326 ) -> None:
327 if not HAS_ZSTD:
328 raise RuntimeError(
329 "The zstd decompression is not available. "
330 "Please install `backports.zstd` module"
331 )
332 self._obj = ZstdDecompressor()
333 self._pending_unused_data: bytes | None = None
334 super().__init__(executor=executor, max_sync_chunk_size=max_sync_chunk_size)
336 def decompress_sync(
337 self, data: bytes, max_length: int = ZLIB_MAX_LENGTH_UNLIMITED
338 ) -> bytes:
339 # zstd uses -1 for unlimited, while zlib uses 0 for unlimited
340 # Convert the zlib convention (0=unlimited) to zstd convention (-1=unlimited)
341 zstd_max_length = (
342 ZSTD_MAX_LENGTH_UNLIMITED
343 if max_length == ZLIB_MAX_LENGTH_UNLIMITED
344 else max_length
345 )
346 if self._pending_unused_data is not None:
347 data = self._pending_unused_data + data
348 self._pending_unused_data = None
349 result = self._obj.decompress(data, zstd_max_length)
351 # Handle multi-frame zstd streams.
352 # https://datatracker.ietf.org/doc/html/rfc8878#section-3.1.1
353 # ZstdDecompressor handles one frame only. When a frame ends,
354 # eof becomes True and any trailing data goes to unused_data.
355 # We create a fresh decompressor to continue with the next frame.
356 while self._obj.eof and self._obj.unused_data:
357 unused_data = self._obj.unused_data
358 self._obj = ZstdDecompressor()
359 if zstd_max_length != ZSTD_MAX_LENGTH_UNLIMITED:
360 zstd_max_length -= len(result)
361 if zstd_max_length <= 0:
362 self._pending_unused_data = unused_data
363 break
364 result += self._obj.decompress(unused_data, zstd_max_length)
366 # Frame ended exactly at chunk boundary — no unused_data, but the
367 # next feed_data() call would fail on the spent decompressor.
368 # Prepare a fresh one for the next chunk.
369 if self._obj.eof:
370 self._obj = ZstdDecompressor()
372 return result
374 def flush(self) -> bytes:
375 return b""