Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/aiohttp/compression_utils.py: 47%

70 statements  

« prev     ^ index     » next       coverage.py v7.3.2, created at 2023-12-08 06:40 +0000

1import asyncio 

2import zlib 

3from concurrent.futures import Executor 

4from typing import Optional, cast 

5 

6try: 

7 try: 

8 import brotlicffi as brotli 

9 except ImportError: 

10 import brotli 

11 

12 HAS_BROTLI = True 

13except ImportError: # pragma: no cover 

14 HAS_BROTLI = False 

15 

16MAX_SYNC_CHUNK_SIZE = 1024 

17 

18 

19def encoding_to_mode( 

20 encoding: Optional[str] = None, 

21 suppress_deflate_header: bool = False, 

22) -> int: 

23 if encoding == "gzip": 

24 return 16 + zlib.MAX_WBITS 

25 

26 return -zlib.MAX_WBITS if suppress_deflate_header else zlib.MAX_WBITS 

27 

28 

29class ZlibBaseHandler: 

30 def __init__( 

31 self, 

32 mode: int, 

33 executor: Optional[Executor] = None, 

34 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, 

35 ): 

36 self._mode = mode 

37 self._executor = executor 

38 self._max_sync_chunk_size = max_sync_chunk_size 

39 

40 

41class ZLibCompressor(ZlibBaseHandler): 

42 def __init__( 

43 self, 

44 encoding: Optional[str] = None, 

45 suppress_deflate_header: bool = False, 

46 level: Optional[int] = None, 

47 wbits: Optional[int] = None, 

48 strategy: int = zlib.Z_DEFAULT_STRATEGY, 

49 executor: Optional[Executor] = None, 

50 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, 

51 ): 

52 super().__init__( 

53 mode=encoding_to_mode(encoding, suppress_deflate_header) 

54 if wbits is None 

55 else wbits, 

56 executor=executor, 

57 max_sync_chunk_size=max_sync_chunk_size, 

58 ) 

59 if level is None: 

60 self._compressor = zlib.compressobj(wbits=self._mode, strategy=strategy) 

61 else: 

62 self._compressor = zlib.compressobj( 

63 wbits=self._mode, strategy=strategy, level=level 

64 ) 

65 self._compress_lock = asyncio.Lock() 

66 

67 def compress_sync(self, data: bytes) -> bytes: 

68 return self._compressor.compress(data) 

69 

70 async def compress(self, data: bytes) -> bytes: 

71 async with self._compress_lock: 

72 # To ensure the stream is consistent in the event 

73 # there are multiple writers, we need to lock 

74 # the compressor so that only one writer can 

75 # compress at a time. 

76 if ( 

77 self._max_sync_chunk_size is not None 

78 and len(data) > self._max_sync_chunk_size 

79 ): 

80 return await asyncio.get_event_loop().run_in_executor( 

81 self._executor, self.compress_sync, data 

82 ) 

83 return self.compress_sync(data) 

84 

85 def flush(self, mode: int = zlib.Z_FINISH) -> bytes: 

86 return self._compressor.flush(mode) 

87 

88 

89class ZLibDecompressor(ZlibBaseHandler): 

90 def __init__( 

91 self, 

92 encoding: Optional[str] = None, 

93 suppress_deflate_header: bool = False, 

94 executor: Optional[Executor] = None, 

95 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, 

96 ): 

97 super().__init__( 

98 mode=encoding_to_mode(encoding, suppress_deflate_header), 

99 executor=executor, 

100 max_sync_chunk_size=max_sync_chunk_size, 

101 ) 

102 self._decompressor = zlib.decompressobj(wbits=self._mode) 

103 

104 def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes: 

105 return self._decompressor.decompress(data, max_length) 

106 

107 async def decompress(self, data: bytes, max_length: int = 0) -> bytes: 

108 if ( 

109 self._max_sync_chunk_size is not None 

110 and len(data) > self._max_sync_chunk_size 

111 ): 

112 return await asyncio.get_event_loop().run_in_executor( 

113 self._executor, self.decompress_sync, data, max_length 

114 ) 

115 return self.decompress_sync(data, max_length) 

116 

117 def flush(self, length: int = 0) -> bytes: 

118 return ( 

119 self._decompressor.flush(length) 

120 if length > 0 

121 else self._decompressor.flush() 

122 ) 

123 

124 @property 

125 def eof(self) -> bool: 

126 return self._decompressor.eof 

127 

128 @property 

129 def unconsumed_tail(self) -> bytes: 

130 return self._decompressor.unconsumed_tail 

131 

132 @property 

133 def unused_data(self) -> bytes: 

134 return self._decompressor.unused_data 

135 

136 

137class BrotliDecompressor: 

138 # Supports both 'brotlipy' and 'Brotli' packages 

139 # since they share an import name. The top branches 

140 # are for 'brotlipy' and bottom branches for 'Brotli' 

141 def __init__(self) -> None: 

142 if not HAS_BROTLI: 

143 raise RuntimeError( 

144 "The brotli decompression is not available. " 

145 "Please install `Brotli` module" 

146 ) 

147 self._obj = brotli.Decompressor() 

148 

149 def decompress_sync(self, data: bytes) -> bytes: 

150 if hasattr(self._obj, "decompress"): 

151 return cast(bytes, self._obj.decompress(data)) 

152 return cast(bytes, self._obj.process(data)) 

153 

154 def flush(self) -> bytes: 

155 if hasattr(self._obj, "flush"): 

156 return cast(bytes, self._obj.flush()) 

157 return b""