Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/compression_utils.py: 47%
70 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:40 +0000
1import asyncio
2import zlib
3from concurrent.futures import Executor
4from typing import Optional, cast
6try:
7 try:
8 import brotlicffi as brotli
9 except ImportError:
10 import brotli
12 HAS_BROTLI = True
13except ImportError: # pragma: no cover
14 HAS_BROTLI = False
16MAX_SYNC_CHUNK_SIZE = 1024
19def encoding_to_mode(
20 encoding: Optional[str] = None,
21 suppress_deflate_header: bool = False,
22) -> int:
23 if encoding == "gzip":
24 return 16 + zlib.MAX_WBITS
26 return -zlib.MAX_WBITS if suppress_deflate_header else zlib.MAX_WBITS
29class ZlibBaseHandler:
30 def __init__(
31 self,
32 mode: int,
33 executor: Optional[Executor] = None,
34 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
35 ):
36 self._mode = mode
37 self._executor = executor
38 self._max_sync_chunk_size = max_sync_chunk_size
41class ZLibCompressor(ZlibBaseHandler):
42 def __init__(
43 self,
44 encoding: Optional[str] = None,
45 suppress_deflate_header: bool = False,
46 level: Optional[int] = None,
47 wbits: Optional[int] = None,
48 strategy: int = zlib.Z_DEFAULT_STRATEGY,
49 executor: Optional[Executor] = None,
50 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
51 ):
52 super().__init__(
53 mode=encoding_to_mode(encoding, suppress_deflate_header)
54 if wbits is None
55 else wbits,
56 executor=executor,
57 max_sync_chunk_size=max_sync_chunk_size,
58 )
59 if level is None:
60 self._compressor = zlib.compressobj(wbits=self._mode, strategy=strategy)
61 else:
62 self._compressor = zlib.compressobj(
63 wbits=self._mode, strategy=strategy, level=level
64 )
65 self._compress_lock = asyncio.Lock()
67 def compress_sync(self, data: bytes) -> bytes:
68 return self._compressor.compress(data)
70 async def compress(self, data: bytes) -> bytes:
71 async with self._compress_lock:
72 # To ensure the stream is consistent in the event
73 # there are multiple writers, we need to lock
74 # the compressor so that only one writer can
75 # compress at a time.
76 if (
77 self._max_sync_chunk_size is not None
78 and len(data) > self._max_sync_chunk_size
79 ):
80 return await asyncio.get_event_loop().run_in_executor(
81 self._executor, self.compress_sync, data
82 )
83 return self.compress_sync(data)
85 def flush(self, mode: int = zlib.Z_FINISH) -> bytes:
86 return self._compressor.flush(mode)
89class ZLibDecompressor(ZlibBaseHandler):
90 def __init__(
91 self,
92 encoding: Optional[str] = None,
93 suppress_deflate_header: bool = False,
94 executor: Optional[Executor] = None,
95 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE,
96 ):
97 super().__init__(
98 mode=encoding_to_mode(encoding, suppress_deflate_header),
99 executor=executor,
100 max_sync_chunk_size=max_sync_chunk_size,
101 )
102 self._decompressor = zlib.decompressobj(wbits=self._mode)
104 def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes:
105 return self._decompressor.decompress(data, max_length)
107 async def decompress(self, data: bytes, max_length: int = 0) -> bytes:
108 if (
109 self._max_sync_chunk_size is not None
110 and len(data) > self._max_sync_chunk_size
111 ):
112 return await asyncio.get_event_loop().run_in_executor(
113 self._executor, self.decompress_sync, data, max_length
114 )
115 return self.decompress_sync(data, max_length)
117 def flush(self, length: int = 0) -> bytes:
118 return (
119 self._decompressor.flush(length)
120 if length > 0
121 else self._decompressor.flush()
122 )
124 @property
125 def eof(self) -> bool:
126 return self._decompressor.eof
128 @property
129 def unconsumed_tail(self) -> bytes:
130 return self._decompressor.unconsumed_tail
132 @property
133 def unused_data(self) -> bytes:
134 return self._decompressor.unused_data
137class BrotliDecompressor:
138 # Supports both 'brotlipy' and 'Brotli' packages
139 # since they share an import name. The top branches
140 # are for 'brotlipy' and bottom branches for 'Brotli'
141 def __init__(self) -> None:
142 if not HAS_BROTLI:
143 raise RuntimeError(
144 "The brotli decompression is not available. "
145 "Please install `Brotli` module"
146 )
147 self._obj = brotli.Decompressor()
149 def decompress_sync(self, data: bytes) -> bytes:
150 if hasattr(self._obj, "decompress"):
151 return cast(bytes, self._obj.decompress(data))
152 return cast(bytes, self._obj.process(data))
154 def flush(self) -> bytes:
155 if hasattr(self._obj, "flush"):
156 return cast(bytes, self._obj.flush())
157 return b""