Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_codecs/_codecs.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

130 statements  

1""" 

2This module is for codecs only. 

3 

4While the codec implementation can contain details of the PDF specification, 

5the module should not do any PDF parsing. 

6""" 

7 

8import io 

9from abc import ABC, abstractmethod 

10from typing import Dict, List 

11 

12from pypdf._utils import logger_warning 

13 

14 

15class Codec(ABC): 

16 """Abstract base class for all codecs.""" 

17 

18 @abstractmethod 

19 def encode(self, data: bytes) -> bytes: 

20 """ 

21 Encode the input data. 

22 

23 Args: 

24 data: Data to encode. 

25 

26 Returns: 

27 Encoded data. 

28 

29 """ 

30 

31 @abstractmethod 

32 def decode(self, data: bytes) -> bytes: 

33 """ 

34 Decode the input data. 

35 

36 Args: 

37 data: Data to decode. 

38 

39 Returns: 

40 Decoded data. 

41 

42 """ 

43 

44 

45class LzwCodec(Codec): 

46 """Lempel-Ziv-Welch (LZW) adaptive compression codec.""" 

47 

48 CLEAR_TABLE_MARKER = 256 # Special code to indicate table reset 

49 EOD_MARKER = 257 # End-of-data marker 

50 INITIAL_BITS_PER_CODE = 9 # Initial code bit width 

51 MAX_BITS_PER_CODE = 12 # Maximum code bit width 

52 

53 def _initialize_encoding_table(self) -> None: 

54 """Initialize the encoding table and state to initial conditions.""" 

55 self.encoding_table: Dict[bytes, int] = {bytes([i]): i for i in range(256)} 

56 self.next_code = self.EOD_MARKER + 1 

57 self.bits_per_code = self.INITIAL_BITS_PER_CODE 

58 self.max_code_value = (1 << self.bits_per_code) - 1 

59 

60 def _increase_next_code(self) -> None: 

61 """Update bits_per_code and max_code_value if necessary.""" 

62 self.next_code += 1 

63 if ( 

64 self.next_code > self.max_code_value 

65 and self.bits_per_code < self.MAX_BITS_PER_CODE 

66 ): 

67 self.bits_per_code += 1 

68 self.max_code_value = (1 << self.bits_per_code) - 1 

69 

70 def encode(self, data: bytes) -> bytes: 

71 """ 

72 Encode data using the LZW compression algorithm. 

73 

74 Taken from PDF 1.7 specs, "7.4.4.2 Details of LZW Encoding". 

75 """ 

76 result_codes: List[int] = [] 

77 

78 # The encoder shall begin by issuing a clear-table code 

79 result_codes.append(self.CLEAR_TABLE_MARKER) 

80 self._initialize_encoding_table() 

81 

82 current_sequence = b"" 

83 for byte in data: 

84 next_sequence = current_sequence + bytes([byte]) 

85 

86 if next_sequence in self.encoding_table: 

87 # Extend current sequence if already in the table 

88 current_sequence = next_sequence 

89 else: 

90 # Output code for the current sequence 

91 result_codes.append(self.encoding_table[current_sequence]) 

92 

93 # Add the new sequence to the table if there's room 

94 if self.next_code <= (1 << self.MAX_BITS_PER_CODE) - 1: 

95 self.encoding_table[next_sequence] = self.next_code 

96 self._increase_next_code() 

97 else: 

98 # If the table is full, emit a clear-table command 

99 result_codes.append(self.CLEAR_TABLE_MARKER) 

100 self._initialize_encoding_table() 

101 

102 # Start new sequence 

103 current_sequence = bytes([byte]) 

104 

105 # Ensure everything actually is encoded 

106 if current_sequence: 

107 result_codes.append(self.encoding_table[current_sequence]) 

108 result_codes.append(self.EOD_MARKER) 

109 

110 return self._pack_codes_into_bytes(result_codes) 

111 

112 def _pack_codes_into_bytes(self, codes: List[int]) -> bytes: 

113 """ 

114 Convert the list of result codes into a continuous byte stream, with codes packed as per the code bit-width. 

115 The bit-width starts at 9 bits and expands as needed. 

116 """ 

117 self._initialize_encoding_table() 

118 buffer = 0 

119 bits_in_buffer = 0 

120 output = bytearray() 

121 

122 for code in codes: 

123 buffer = (buffer << self.bits_per_code) | code 

124 bits_in_buffer += self.bits_per_code 

125 

126 # Codes shall be packed into a continuous bit stream, high-order bit 

127 # first. This stream shall then be divided into bytes, high-order bit 

128 # first. 

129 while bits_in_buffer >= 8: 

130 bits_in_buffer -= 8 

131 output.append((buffer >> bits_in_buffer) & 0xFF) 

132 

133 if code == self.CLEAR_TABLE_MARKER: 

134 self._initialize_encoding_table() 

135 elif code == self.EOD_MARKER: 

136 continue 

137 else: 

138 self._increase_next_code() 

139 

140 # Flush any remaining bits in the buffer 

141 if bits_in_buffer > 0: 

142 output.append((buffer << (8 - bits_in_buffer)) & 0xFF) 

143 

144 return bytes(output) 

145 

146 def _initialize_decoding_table(self) -> None: 

147 self.max_code_value = (1 << self.MAX_BITS_PER_CODE) - 1 

148 self.decoding_table = [bytes([i]) for i in range(self.CLEAR_TABLE_MARKER)] + [ 

149 b"" 

150 ] * (self.max_code_value - self.CLEAR_TABLE_MARKER + 1) 

151 self._table_index = self.EOD_MARKER + 1 

152 self._bits_to_get = 9 

153 

154 def _next_code_decode(self, data: bytes) -> int: 

155 self._next_data: int 

156 try: 

157 while self._next_bits < self._bits_to_get: 

158 self._next_data = (self._next_data << 8) | ( 

159 data[self._byte_pointer] 

160 ) 

161 self._byte_pointer += 1 

162 self._next_bits += 8 

163 

164 code = ( 

165 self._next_data >> (self._next_bits - self._bits_to_get) 

166 ) & self._and_table[self._bits_to_get - 9] 

167 self._next_bits -= self._bits_to_get 

168 

169 # Reduce data to get rid of the overhead, 

170 # which increases performance on large streams significantly. 

171 self._next_data = self._next_data & 0xFFFFF 

172 

173 return code 

174 except IndexError: 

175 return self.EOD_MARKER 

176 

177 # The following method has been converted to Python from PDFsharp: 

178 # https://github.com/empira/PDFsharp/blob/5fbf6ed14740bc4e16786816882d32e43af3ff5d/src/foundation/src/PDFsharp/src/PdfSharp/Pdf.Filters/LzwDecode.cs 

179 # 

180 # Original license: 

181 # 

182 # ------------------------------------------------------------------------- 

183 # Copyright (c) 2001-2024 empira Software GmbH, Troisdorf (Cologne Area), 

184 # Germany 

185 # 

186 # http://docs.pdfsharp.net 

187 # 

188 # MIT License 

189 # 

190 # Permission is hereby granted, free of charge, to any person obtaining a 

191 # copy of this software and associated documentation files (the "Software"), 

192 # to deal in the Software without restriction, including without limitation 

193 # the rights to use, copy, modify, merge, publish, distribute, sublicense, 

194 # and/or sell copies of the Software, and to permit persons to whom the 

195 # Software is furnished to do so, subject to the following conditions: 

196 # 

197 # The above copyright notice and this permission notice shall be included 

198 # in all copies or substantial portions of the Software. 

199 # 

200 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR 

201 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, 

202 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL 

203 # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER 

204 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING 

205 # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER 

206 # DEALINGS IN THE SOFTWARE. 

207 # -------------------------------------------------------------------------- 

208 def decode(self, data: bytes) -> bytes: 

209 """ 

210 The following code was converted to Python from the following code: 

211 https://github.com/empira/PDFsharp/blob/master/src/foundation/src/PDFsharp/src/PdfSharp/Pdf.Filters/LzwDecode.cs 

212 """ 

213 self._and_table = [511, 1023, 2047, 4095] 

214 self._table_index = 0 

215 self._bits_to_get = 9 

216 self._byte_pointer = 0 

217 self._next_data = 0 

218 self._next_bits = 0 

219 

220 output_stream = io.BytesIO() 

221 

222 self._initialize_decoding_table() 

223 self._byte_pointer = 0 

224 self._next_data = 0 

225 self._next_bits = 0 

226 old_code = self.CLEAR_TABLE_MARKER 

227 

228 while True: 

229 code = self._next_code_decode(data) 

230 if code == self.EOD_MARKER: 

231 break 

232 

233 if code == self.CLEAR_TABLE_MARKER: 

234 self._initialize_decoding_table() 

235 code = self._next_code_decode(data) 

236 if code == self.EOD_MARKER: 

237 break 

238 output_stream.write(self.decoding_table[code]) 

239 old_code = code 

240 elif code < self._table_index: 

241 string = self.decoding_table[code] 

242 output_stream.write(string) 

243 if old_code != self.CLEAR_TABLE_MARKER: 

244 self._add_entry_decode(self.decoding_table[old_code], string[0]) 

245 old_code = code 

246 else: 

247 # The code is not in the table and not one of the special codes 

248 string = ( 

249 self.decoding_table[old_code] + self.decoding_table[old_code][:1] 

250 ) 

251 output_stream.write(string) 

252 self._add_entry_decode(self.decoding_table[old_code], string[0]) 

253 old_code = code 

254 

255 return output_stream.getvalue() 

256 

257 def _add_entry_decode(self, old_string: bytes, new_char: int) -> None: 

258 new_string = old_string + bytes([new_char]) 

259 if self._table_index > self.max_code_value: 

260 logger_warning("Ignoring too large LZW table index.", __name__) 

261 return 

262 self.decoding_table[self._table_index] = new_string 

263 self._table_index += 1 

264 

265 # Update the number of bits to get based on the table index 

266 if self._table_index == 511: 

267 self._bits_to_get = 10 

268 elif self._table_index == 1023: 

269 self._bits_to_get = 11 

270 elif self._table_index == 2047: 

271 self._bits_to_get = 12