1"""
2This module is for codecs only.
3
4While the codec implementation can contain details of the PDF specification,
5the module should not do any PDF parsing.
6"""
7
8import io
9from abc import ABC, abstractmethod
10
11from pypdf._utils import logger_warning
12
13
14class Codec(ABC):
15 """Abstract base class for all codecs."""
16
17 @abstractmethod
18 def encode(self, data: bytes) -> bytes:
19 """
20 Encode the input data.
21
22 Args:
23 data: Data to encode.
24
25 Returns:
26 Encoded data.
27
28 """
29
30 @abstractmethod
31 def decode(self, data: bytes) -> bytes:
32 """
33 Decode the input data.
34
35 Args:
36 data: Data to decode.
37
38 Returns:
39 Decoded data.
40
41 """
42
43
44class LzwCodec(Codec):
45 """Lempel-Ziv-Welch (LZW) adaptive compression codec."""
46
47 CLEAR_TABLE_MARKER = 256 # Special code to indicate table reset
48 EOD_MARKER = 257 # End-of-data marker
49 INITIAL_BITS_PER_CODE = 9 # Initial code bit width
50 MAX_BITS_PER_CODE = 12 # Maximum code bit width
51
52 def _initialize_encoding_table(self) -> None:
53 """Initialize the encoding table and state to initial conditions."""
54 self.encoding_table: dict[bytes, int] = {bytes([i]): i for i in range(256)}
55 self.next_code = self.EOD_MARKER + 1
56 self.bits_per_code = self.INITIAL_BITS_PER_CODE
57 self.max_code_value = (1 << self.bits_per_code) - 1
58
59 def _increase_next_code(self) -> None:
60 """Update bits_per_code and max_code_value if necessary."""
61 self.next_code += 1
62 if (
63 self.next_code > self.max_code_value
64 and self.bits_per_code < self.MAX_BITS_PER_CODE
65 ):
66 self.bits_per_code += 1
67 self.max_code_value = (1 << self.bits_per_code) - 1
68
69 def encode(self, data: bytes) -> bytes:
70 """
71 Encode data using the LZW compression algorithm.
72
73 Taken from PDF 1.7 specs, "7.4.4.2 Details of LZW Encoding".
74 """
75 result_codes: list[int] = []
76
77 # The encoder shall begin by issuing a clear-table code
78 result_codes.append(self.CLEAR_TABLE_MARKER)
79 self._initialize_encoding_table()
80
81 current_sequence = b""
82 for byte in data:
83 next_sequence = current_sequence + bytes([byte])
84
85 if next_sequence in self.encoding_table:
86 # Extend current sequence if already in the table
87 current_sequence = next_sequence
88 else:
89 # Output code for the current sequence
90 result_codes.append(self.encoding_table[current_sequence])
91
92 # Add the new sequence to the table if there's room
93 if self.next_code <= (1 << self.MAX_BITS_PER_CODE) - 1:
94 self.encoding_table[next_sequence] = self.next_code
95 self._increase_next_code()
96 else:
97 # If the table is full, emit a clear-table command
98 result_codes.append(self.CLEAR_TABLE_MARKER)
99 self._initialize_encoding_table()
100
101 # Start new sequence
102 current_sequence = bytes([byte])
103
104 # Ensure everything actually is encoded
105 if current_sequence:
106 result_codes.append(self.encoding_table[current_sequence])
107 result_codes.append(self.EOD_MARKER)
108
109 return self._pack_codes_into_bytes(result_codes)
110
111 def _pack_codes_into_bytes(self, codes: list[int]) -> bytes:
112 """
113 Convert the list of result codes into a continuous byte stream, with codes packed as per the code bit-width.
114 The bit-width starts at 9 bits and expands as needed.
115 """
116 self._initialize_encoding_table()
117 buffer = 0
118 bits_in_buffer = 0
119 output = bytearray()
120
121 for code in codes:
122 buffer = (buffer << self.bits_per_code) | code
123 bits_in_buffer += self.bits_per_code
124
125 # Codes shall be packed into a continuous bit stream, high-order bit
126 # first. This stream shall then be divided into bytes, high-order bit
127 # first.
128 while bits_in_buffer >= 8:
129 bits_in_buffer -= 8
130 output.append((buffer >> bits_in_buffer) & 0xFF)
131
132 if code == self.CLEAR_TABLE_MARKER:
133 self._initialize_encoding_table()
134 elif code == self.EOD_MARKER:
135 continue
136 else:
137 self._increase_next_code()
138
139 # Flush any remaining bits in the buffer
140 if bits_in_buffer > 0:
141 output.append((buffer << (8 - bits_in_buffer)) & 0xFF)
142
143 return bytes(output)
144
145 def _initialize_decoding_table(self) -> None:
146 self.max_code_value = (1 << self.MAX_BITS_PER_CODE) - 1
147 self.decoding_table = [bytes([i]) for i in range(self.CLEAR_TABLE_MARKER)] + [
148 b""
149 ] * (self.max_code_value - self.CLEAR_TABLE_MARKER + 1)
150 self._table_index = self.EOD_MARKER + 1
151 self._bits_to_get = 9
152
153 def _next_code_decode(self, data: bytes) -> int:
154 self._next_data: int
155 try:
156 while self._next_bits < self._bits_to_get:
157 self._next_data = (self._next_data << 8) | (
158 data[self._byte_pointer]
159 )
160 self._byte_pointer += 1
161 self._next_bits += 8
162
163 code = (
164 self._next_data >> (self._next_bits - self._bits_to_get)
165 ) & self._and_table[self._bits_to_get - 9]
166 self._next_bits -= self._bits_to_get
167
168 # Reduce data to get rid of the overhead,
169 # which increases performance on large streams significantly.
170 self._next_data = self._next_data & 0xFFFFF
171
172 return code
173 except IndexError:
174 return self.EOD_MARKER
175
176 # The following method has been converted to Python from PDFsharp:
177 # https://github.com/empira/PDFsharp/blob/5fbf6ed14740bc4e16786816882d32e43af3ff5d/src/foundation/src/PDFsharp/src/PdfSharp/Pdf.Filters/LzwDecode.cs
178 #
179 # Original license:
180 #
181 # -------------------------------------------------------------------------
182 # Copyright (c) 2001-2024 empira Software GmbH, Troisdorf (Cologne Area),
183 # Germany
184 #
185 # http://docs.pdfsharp.net
186 #
187 # MIT License
188 #
189 # Permission is hereby granted, free of charge, to any person obtaining a
190 # copy of this software and associated documentation files (the "Software"),
191 # to deal in the Software without restriction, including without limitation
192 # the rights to use, copy, modify, merge, publish, distribute, sublicense,
193 # and/or sell copies of the Software, and to permit persons to whom the
194 # Software is furnished to do so, subject to the following conditions:
195 #
196 # The above copyright notice and this permission notice shall be included
197 # in all copies or substantial portions of the Software.
198 #
199 # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
200 # IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
201 # FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
202 # THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
203 # LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
204 # FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
205 # DEALINGS IN THE SOFTWARE.
206 # --------------------------------------------------------------------------
207 def decode(self, data: bytes) -> bytes:
208 """
209 The following code was converted to Python from the following code:
210 https://github.com/empira/PDFsharp/blob/master/src/foundation/src/PDFsharp/src/PdfSharp/Pdf.Filters/LzwDecode.cs
211 """
212 self._and_table = [511, 1023, 2047, 4095]
213 self._table_index = 0
214 self._bits_to_get = 9
215 self._byte_pointer = 0
216 self._next_data = 0
217 self._next_bits = 0
218
219 output_stream = io.BytesIO()
220
221 self._initialize_decoding_table()
222 self._byte_pointer = 0
223 self._next_data = 0
224 self._next_bits = 0
225 old_code = self.CLEAR_TABLE_MARKER
226
227 while True:
228 code = self._next_code_decode(data)
229 if code == self.EOD_MARKER:
230 break
231
232 if code == self.CLEAR_TABLE_MARKER:
233 self._initialize_decoding_table()
234 code = self._next_code_decode(data)
235 if code == self.EOD_MARKER:
236 break
237 output_stream.write(self.decoding_table[code])
238 old_code = code
239 elif code < self._table_index:
240 string = self.decoding_table[code]
241 output_stream.write(string)
242 if old_code != self.CLEAR_TABLE_MARKER:
243 self._add_entry_decode(self.decoding_table[old_code], string[0])
244 old_code = code
245 else:
246 # The code is not in the table and not one of the special codes
247 string = (
248 self.decoding_table[old_code] + self.decoding_table[old_code][:1]
249 )
250 output_stream.write(string)
251 self._add_entry_decode(self.decoding_table[old_code], string[0])
252 old_code = code
253
254 return output_stream.getvalue()
255
256 def _add_entry_decode(self, old_string: bytes, new_char: int) -> None:
257 new_string = old_string + bytes([new_char])
258 if self._table_index > self.max_code_value:
259 logger_warning("Ignoring too large LZW table index.", __name__)
260 return
261 self.decoding_table[self._table_index] = new_string
262 self._table_index += 1
263
264 # Update the number of bits to get based on the table index
265 if self._table_index == 511:
266 self._bits_to_get = 10
267 elif self._table_index == 1023:
268 self._bits_to_get = 11
269 elif self._table_index == 2047:
270 self._bits_to_get = 12