Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/compression_utils.py: 74%
65 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:52 +0000
1import asyncio
2import zlib
3from concurrent.futures import Executor
4from typing import Optional, cast
6try:
7 import brotli
9 HAS_BROTLI = True
10except ImportError: # pragma: no cover
11 HAS_BROTLI = False
13MAX_SYNC_CHUNK_SIZE = 1024
16def encoding_to_mode(
17 encoding: Optional[str] = None,
18 suppress_deflate_header: bool = False,
19) -> int:
20 if encoding == "gzip":
21 return 16 + zlib.MAX_WBITS
23 return -zlib.MAX_WBITS if suppress_deflate_header else zlib.MAX_WBITS
26class ZlibBaseHandler:
27 def __init__(
28 self,
29 mode: int,
30 executor: Optional[Executor] = None,
31 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
32 ):
33 self._mode = mode
34 self._executor = executor
35 self._max_sync_chunk_size = max_sync_chunk_size
38class ZLibCompressor(ZlibBaseHandler):
39 def __init__(
40 self,
41 encoding: Optional[str] = None,
42 suppress_deflate_header: bool = False,
43 level: Optional[int] = None,
44 wbits: Optional[int] = None,
45 strategy: int = zlib.Z_DEFAULT_STRATEGY,
46 executor: Optional[Executor] = None,
47 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
48 ):
49 super().__init__(
50 mode=encoding_to_mode(encoding, suppress_deflate_header)
51 if wbits is None
52 else wbits,
53 executor=executor,
54 max_sync_chunk_size=max_sync_chunk_size,
55 )
56 if level is None:
57 self._compressor = zlib.compressobj(wbits=self._mode, strategy=strategy)
58 else:
59 self._compressor = zlib.compressobj(
60 wbits=self._mode, strategy=strategy, level=level
61 )
63 def compress_sync(self, data: bytes) -> bytes:
64 return self._compressor.compress(data)
66 async def compress(self, data: bytes) -> bytes:
67 if (
68 self._max_sync_chunk_size is not None
69 and len(data) > self._max_sync_chunk_size
70 ):
71 return await asyncio.get_event_loop().run_in_executor(
72 self._executor, self.compress_sync, data
73 )
74 return self.compress_sync(data)
76 def flush(self, mode: int = zlib.Z_FINISH) -> bytes:
77 return self._compressor.flush(mode)
80class ZLibDecompressor(ZlibBaseHandler):
81 def __init__(
82 self,
83 encoding: Optional[str] = None,
84 suppress_deflate_header: bool = False,
85 executor: Optional[Executor] = None,
86 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
87 ):
88 super().__init__(
89 mode=encoding_to_mode(encoding, suppress_deflate_header),
90 executor=executor,
91 max_sync_chunk_size=max_sync_chunk_size,
92 )
93 self._decompressor = zlib.decompressobj(wbits=self._mode)
95 def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes:
96 return self._decompressor.decompress(data, max_length)
98 async def decompress(self, data: bytes, max_length: int = 0) -> bytes:
99 if (
100 self._max_sync_chunk_size is not None
101 and len(data) > self._max_sync_chunk_size
102 ):
103 return await asyncio.get_event_loop().run_in_executor(
104 self._executor, self.decompress_sync, data, max_length
105 )
106 return self.decompress_sync(data, max_length)
108 def flush(self, length: int = 0) -> bytes:
109 return (
110 self._decompressor.flush(length)
111 if length > 0
112 else self._decompressor.flush()
113 )
115 @property
116 def eof(self) -> bool:
117 return self._decompressor.eof
119 @property
120 def unconsumed_tail(self) -> bytes:
121 return self._decompressor.unconsumed_tail
123 @property
124 def unused_data(self) -> bytes:
125 return self._decompressor.unused_data
128class BrotliDecompressor:
129 # Supports both 'brotlipy' and 'Brotli' packages
130 # since they share an import name. The top branches
131 # are for 'brotlipy' and bottom branches for 'Brotli'
132 def __init__(self) -> None:
133 if not HAS_BROTLI:
134 raise RuntimeError(
135 "The brotli decompression is not available. "
136 "Please install `Brotli` module"
137 )
138 self._obj = brotli.Decompressor()
140 def decompress_sync(self, data: bytes) -> bytes:
141 if hasattr(self._obj, "decompress"):
142 return cast(bytes, self._obj.decompress(data))
143 return cast(bytes, self._obj.process(data))
145 def flush(self) -> bytes:
146 if hasattr(self._obj, "flush"):
147 return cast(bytes, self._obj.flush())
148 return b""