Coverage for /pythoncovmergedfiles/medio/medio/src/aiohttp/aiohttp/compression_utils.py: 74%

65 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:52 +0000

1import asyncio 

2import zlib 

3from concurrent.futures import Executor 

4from typing import Optional, cast 

5 

6try: 

7 import brotli 

8 

9 HAS_BROTLI = True 

10except ImportError: # pragma: no cover 

11 HAS_BROTLI = False 

12 

13MAX_SYNC_CHUNK_SIZE = 1024 

14 

15 

16def encoding_to_mode( 

17 encoding: Optional[str] = None, 

18 suppress_deflate_header: bool = False, 

19) -> int: 

20 if encoding == "gzip": 

21 return 16 + zlib.MAX_WBITS 

22 

23 return -zlib.MAX_WBITS if suppress_deflate_header else zlib.MAX_WBITS 

24 

25 

26class ZlibBaseHandler: 

27 def __init__( 

28 self, 

29 mode: int, 

30 executor: Optional[Executor] = None, 

31 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, 

32 ): 

33 self._mode = mode 

34 self._executor = executor 

35 self._max_sync_chunk_size = max_sync_chunk_size 

36 

37 

38class ZLibCompressor(ZlibBaseHandler): 

39 def __init__( 

40 self, 

41 encoding: Optional[str] = None, 

42 suppress_deflate_header: bool = False, 

43 level: Optional[int] = None, 

44 wbits: Optional[int] = None, 

45 strategy: int = zlib.Z_DEFAULT_STRATEGY, 

46 executor: Optional[Executor] = None, 

47 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, 

48 ): 

49 super().__init__( 

50 mode=encoding_to_mode(encoding, suppress_deflate_header) 

51 if wbits is None 

52 else wbits, 

53 executor=executor, 

54 max_sync_chunk_size=max_sync_chunk_size, 

55 ) 

56 if level is None: 

57 self._compressor = zlib.compressobj(wbits=self._mode, strategy=strategy) 

58 else: 

59 self._compressor = zlib.compressobj( 

60 wbits=self._mode, strategy=strategy, level=level 

61 ) 

62 

63 def compress_sync(self, data: bytes) -> bytes: 

64 return self._compressor.compress(data) 

65 

66 async def compress(self, data: bytes) -> bytes: 

67 if ( 

68 self._max_sync_chunk_size is not None 

69 and len(data) > self._max_sync_chunk_size 

70 ): 

71 return await asyncio.get_event_loop().run_in_executor( 

72 self._executor, self.compress_sync, data 

73 ) 

74 return self.compress_sync(data) 

75 

76 def flush(self, mode: int = zlib.Z_FINISH) -> bytes: 

77 return self._compressor.flush(mode) 

78 

79 

80class ZLibDecompressor(ZlibBaseHandler): 

81 def __init__( 

82 self, 

83 encoding: Optional[str] = None, 

84 suppress_deflate_header: bool = False, 

85 executor: Optional[Executor] = None, 

86 max_sync_chunk_size: Optional[int] = MAX_SYNC_CHUNK_SIZE, 

87 ): 

88 super().__init__( 

89 mode=encoding_to_mode(encoding, suppress_deflate_header), 

90 executor=executor, 

91 max_sync_chunk_size=max_sync_chunk_size, 

92 ) 

93 self._decompressor = zlib.decompressobj(wbits=self._mode) 

94 

95 def decompress_sync(self, data: bytes, max_length: int = 0) -> bytes: 

96 return self._decompressor.decompress(data, max_length) 

97 

98 async def decompress(self, data: bytes, max_length: int = 0) -> bytes: 

99 if ( 

100 self._max_sync_chunk_size is not None 

101 and len(data) > self._max_sync_chunk_size 

102 ): 

103 return await asyncio.get_event_loop().run_in_executor( 

104 self._executor, self.decompress_sync, data, max_length 

105 ) 

106 return self.decompress_sync(data, max_length) 

107 

108 def flush(self, length: int = 0) -> bytes: 

109 return ( 

110 self._decompressor.flush(length) 

111 if length > 0 

112 else self._decompressor.flush() 

113 ) 

114 

115 @property 

116 def eof(self) -> bool: 

117 return self._decompressor.eof 

118 

119 @property 

120 def unconsumed_tail(self) -> bytes: 

121 return self._decompressor.unconsumed_tail 

122 

123 @property 

124 def unused_data(self) -> bytes: 

125 return self._decompressor.unused_data 

126 

127 

128class BrotliDecompressor: 

129 # Supports both 'brotlipy' and 'Brotli' packages 

130 # since they share an import name. The top branches 

131 # are for 'brotlipy' and bottom branches for 'Brotli' 

132 def __init__(self) -> None: 

133 if not HAS_BROTLI: 

134 raise RuntimeError( 

135 "The brotli decompression is not available. " 

136 "Please install `Brotli` module" 

137 ) 

138 self._obj = brotli.Decompressor() 

139 

140 def decompress_sync(self, data: bytes) -> bytes: 

141 if hasattr(self._obj, "decompress"): 

142 return cast(bytes, self._obj.decompress(data)) 

143 return cast(bytes, self._obj.process(data)) 

144 

145 def flush(self) -> bytes: 

146 if hasattr(self._obj, "flush"): 

147 return cast(bytes, self._obj.flush()) 

148 return b""