Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/backports/zstd/_streams.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

103 statements  

1"""Internal classes used by compression modules""" 

2 

3import io 

4import sys 

5 

6BUFFER_SIZE = io.DEFAULT_BUFFER_SIZE # Compressed data read chunk size 

7 

8 

9class BaseStream(io.BufferedIOBase): 

10 """Mode-checking helper functions.""" 

11 

12 def _check_not_closed(self): 

13 if self.closed: 

14 raise ValueError("I/O operation on closed file") 

15 

16 def _check_can_read(self): 

17 if not self.readable(): 

18 raise io.UnsupportedOperation("File not open for reading") 

19 

20 def _check_can_write(self): 

21 if not self.writable(): 

22 raise io.UnsupportedOperation("File not open for writing") 

23 

24 def _check_can_seek(self): 

25 if not self.readable(): 

26 raise io.UnsupportedOperation("Seeking is only supported " 

27 "on files open for reading") 

28 if not self.seekable(): 

29 raise io.UnsupportedOperation("The underlying file object " 

30 "does not support seeking") 

31 

32 

33class DecompressReader(io.RawIOBase): 

34 """Adapts the decompressor API to a RawIOBase reader API""" 

35 

36 def readable(self): 

37 return True 

38 

39 def __init__(self, fp, decomp_factory, trailing_error=(), **decomp_args): 

40 self._fp = fp 

41 self._eof = False 

42 self._pos = 0 # Current offset in decompressed stream 

43 

44 # Set to size of decompressed stream once it is known, for SEEK_END 

45 self._size = -1 

46 

47 # Save the decompressor factory and arguments. 

48 # If the file contains multiple compressed streams, each 

49 # stream will need a separate decompressor object. A new decompressor 

50 # object is also needed when implementing a backwards seek(). 

51 self._decomp_factory = decomp_factory 

52 self._decomp_args = decomp_args 

53 self._decompressor = self._decomp_factory(**self._decomp_args) 

54 

55 # Exception class to catch from decompressor signifying invalid 

56 # trailing data to ignore 

57 self._trailing_error = trailing_error 

58 

59 def close(self): 

60 self._decompressor = None 

61 return super().close() 

62 

63 def seekable(self): 

64 return self._fp.seekable() 

65 

66 def readinto(self, b): 

67 with memoryview(b) as view, view.cast("B") as byte_view: 

68 data = self.read(len(byte_view)) 

69 byte_view[:len(data)] = data 

70 return len(data) 

71 

72 def read(self, size=-1): 

73 if size < 0: 

74 return self.readall() 

75 

76 if not size or self._eof: 

77 return b"" 

78 data = None # Default if EOF is encountered 

79 # Depending on the input data, our call to the decompressor may not 

80 # return any data. In this case, try again after reading another block. 

81 while True: 

82 if self._decompressor.eof: 

83 rawblock = (self._decompressor.unused_data or 

84 self._fp.read(BUFFER_SIZE)) 

85 if not rawblock: 

86 break 

87 # Continue to next stream. 

88 self._decompressor = self._decomp_factory( 

89 **self._decomp_args) 

90 try: 

91 data = self._decompressor.decompress(rawblock, size) 

92 except self._trailing_error: 

93 # Trailing data isn't a valid compressed stream; ignore it. 

94 break 

95 else: 

96 if self._decompressor.needs_input: 

97 rawblock = self._fp.read(BUFFER_SIZE) 

98 if not rawblock: 

99 raise EOFError("Compressed file ended before the " 

100 "end-of-stream marker was reached") 

101 else: 

102 rawblock = b"" 

103 data = self._decompressor.decompress(rawblock, size) 

104 if data: 

105 break 

106 if not data: 

107 self._eof = True 

108 self._size = self._pos 

109 return b"" 

110 self._pos += len(data) 

111 return data 

112 

113 def readall(self): 

114 chunks = [] 

115 # sys.maxsize means the max length of output buffer is unlimited, 

116 # so that the whole input buffer can be decompressed within one 

117 # .decompress() call. 

118 while data := self.read(sys.maxsize): 

119 chunks.append(data) 

120 

121 return b"".join(chunks) 

122 

123 # Rewind the file to the beginning of the data stream. 

124 def _rewind(self): 

125 self._fp.seek(0) 

126 self._eof = False 

127 self._pos = 0 

128 self._decompressor = self._decomp_factory(**self._decomp_args) 

129 

130 def seek(self, offset, whence=io.SEEK_SET): 

131 # Recalculate offset as an absolute file position. 

132 if whence == io.SEEK_SET: 

133 pass 

134 elif whence == io.SEEK_CUR: 

135 offset = self._pos + offset 

136 elif whence == io.SEEK_END: 

137 # Seeking relative to EOF - we need to know the file's size. 

138 if self._size < 0: 

139 while self.read(io.DEFAULT_BUFFER_SIZE): 

140 pass 

141 offset = self._size + offset 

142 else: 

143 raise ValueError("Invalid value for whence: {}".format(whence)) 

144 

145 # Make it so that offset is the number of bytes to skip forward. 

146 if offset < self._pos: 

147 self._rewind() 

148 else: 

149 offset -= self._pos 

150 

151 # Read and discard data until we reach the desired position. 

152 while offset > 0: 

153 data = self.read(min(io.DEFAULT_BUFFER_SIZE, offset)) 

154 if not data: 

155 break 

156 offset -= len(data) 

157 

158 return self._pos 

159 

160 def tell(self): 

161 """Return the current file position.""" 

162 return self._pos