Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/jbig2.py: 22%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import math
2import os
3from collections.abc import Iterable
4from struct import calcsize, pack, unpack
5from typing import BinaryIO, ClassVar, cast
7from pdfminer.pdfexceptions import PDFValueError
9# segment structure base
10SEG_STRUCT = [
11 (">L", "number"),
12 (">B", "flags"),
13 (">B", "retention_flags"),
14 (">B", "page_assoc"),
15 (">L", "data_length"),
16]
18# segment header literals
19HEADER_FLAG_DEFERRED = 0b10000000
20HEADER_FLAG_PAGE_ASSOC_LONG = 0b01000000
22SEG_TYPE_MASK = 0b00111111
24REF_COUNT_SHORT_MASK = 0b11100000
25REF_COUNT_LONG_MASK = 0x1FFFFFFF
26REF_COUNT_LONG = 7
28DATA_LEN_UNKNOWN = 0xFFFFFFFF
30# segment types
31SEG_TYPE_IMMEDIATE_GEN_REGION = 38
32SEG_TYPE_END_OF_PAGE = 49
33SEG_TYPE_END_OF_FILE = 51
35# file literals
36FILE_HEADER_ID = b"\x97\x4a\x42\x32\x0d\x0a\x1a\x0a"
37FILE_HEAD_FLAG_SEQUENTIAL = 0b00000001
40def bit_set(bit_pos: int, value: int) -> bool:
41 return bool((value >> bit_pos) & 1)
44def check_flag(flag: int, value: int) -> bool:
45 return bool(flag & value)
48def masked_value(mask: int, value: int) -> int:
49 for bit_pos in range(31):
50 if bit_set(bit_pos, mask):
51 return (value & mask) >> bit_pos
53 raise PDFValueError("Invalid mask or value")
56def mask_value(mask: int, value: int) -> int:
57 for bit_pos in range(31):
58 if bit_set(bit_pos, mask):
59 return (value & (mask >> bit_pos)) << bit_pos
61 raise PDFValueError("Invalid mask or value")
64def unpack_int(format: str, buffer: bytes) -> int:
65 assert format in {">B", ">I", ">L"}
66 [result] = cast(tuple[int], unpack(format, buffer))
67 return result
70JBIG2SegmentFlags = dict[str, int | bool]
71JBIG2RetentionFlags = dict[str, int | list[int] | list[bool]]
72JBIG2Segment = dict[
73 str,
74 bool | int | bytes | JBIG2SegmentFlags | JBIG2RetentionFlags,
75]
78class JBIG2StreamReader:
79 """Read segments from a JBIG2 byte stream"""
81 def __init__(self, stream: BinaryIO) -> None:
82 self.stream = stream
84 def get_segments(self) -> list[JBIG2Segment]:
85 segments: list[JBIG2Segment] = []
86 while not self.is_eof():
87 segment: JBIG2Segment = {}
88 for field_format, name in SEG_STRUCT:
89 field_len = calcsize(field_format)
90 field = self.stream.read(field_len)
91 if len(field) < field_len:
92 segment["_error"] = True
93 break
94 value = unpack_int(field_format, field)
95 parser = getattr(self, f"parse_{name}", None)
96 if callable(parser):
97 value = parser(segment, value, field)
98 segment[name] = value
100 if not segment.get("_error"):
101 segments.append(segment)
102 return segments
104 def is_eof(self) -> bool:
105 if self.stream.read(1) == b"":
106 return True
107 else:
108 self.stream.seek(-1, os.SEEK_CUR)
109 return False
111 def parse_flags(
112 self,
113 segment: JBIG2Segment,
114 flags: int,
115 field: bytes,
116 ) -> JBIG2SegmentFlags:
117 return {
118 "deferred": check_flag(HEADER_FLAG_DEFERRED, flags),
119 "page_assoc_long": check_flag(HEADER_FLAG_PAGE_ASSOC_LONG, flags),
120 "type": masked_value(SEG_TYPE_MASK, flags),
121 }
123 def parse_retention_flags(
124 self,
125 segment: JBIG2Segment,
126 flags: int,
127 field: bytes,
128 ) -> JBIG2RetentionFlags:
129 ref_count = masked_value(REF_COUNT_SHORT_MASK, flags)
130 retain_segments = []
131 ref_segments = []
133 if ref_count < REF_COUNT_LONG:
134 for bit_pos in range(5):
135 retain_segments.append(bit_set(bit_pos, flags))
136 else:
137 field += self.stream.read(3)
138 ref_count = unpack_int(">L", field)
139 ref_count = masked_value(REF_COUNT_LONG_MASK, ref_count)
140 ret_bytes_count = math.ceil((ref_count + 1) / 8)
141 for _ret_byte_index in range(ret_bytes_count):
142 ret_byte = unpack_int(">B", self.stream.read(1))
143 for bit_pos in range(7):
144 retain_segments.append(bit_set(bit_pos, ret_byte))
146 seg_num = segment["number"]
147 assert isinstance(seg_num, int)
148 if seg_num <= 256:
149 ref_format = ">B"
150 elif seg_num <= 65536:
151 ref_format = ">I"
152 else:
153 ref_format = ">L"
155 ref_size = calcsize(ref_format)
157 for _ref_index in range(ref_count):
158 ref_data = self.stream.read(ref_size)
159 ref = unpack_int(ref_format, ref_data)
160 ref_segments.append(ref)
162 return {
163 "ref_count": ref_count,
164 "retain_segments": retain_segments,
165 "ref_segments": ref_segments,
166 }
168 def parse_page_assoc(self, segment: JBIG2Segment, page: int, field: bytes) -> int:
169 if cast(JBIG2SegmentFlags, segment["flags"])["page_assoc_long"]:
170 field += self.stream.read(3)
171 page = unpack_int(">L", field)
172 return page
174 def parse_data_length(
175 self,
176 segment: JBIG2Segment,
177 length: int,
178 field: bytes,
179 ) -> int:
180 if length:
181 if (
182 cast(JBIG2SegmentFlags, segment["flags"])["type"]
183 == SEG_TYPE_IMMEDIATE_GEN_REGION
184 ) and (length == DATA_LEN_UNKNOWN):
185 raise NotImplementedError(
186 "Working with unknown segment length is not implemented yet",
187 )
188 else:
189 segment["raw_data"] = self.stream.read(length)
191 return length
194class JBIG2StreamWriter:
195 """Write JBIG2 segments to a file in JBIG2 format"""
197 EMPTY_RETENTION_FLAGS: ClassVar[JBIG2RetentionFlags] = {
198 "ref_count": 0,
199 "ref_segments": cast(list[int], []),
200 "retain_segments": cast(list[bool], []),
201 }
203 def __init__(self, stream: BinaryIO) -> None:
204 self.stream = stream
206 def write_segments(
207 self,
208 segments: Iterable[JBIG2Segment],
209 fix_last_page: bool = True,
210 ) -> int:
211 data_len = 0
212 current_page: int | None = None
213 seg_num: int | None = None
215 for segment in segments:
216 data = self.encode_segment(segment)
217 self.stream.write(data)
218 data_len += len(data)
220 seg_num = cast(int | None, segment["number"])
222 if fix_last_page:
223 seg_page = cast(int, segment.get("page_assoc"))
225 if (
226 cast(JBIG2SegmentFlags, segment["flags"])["type"]
227 == SEG_TYPE_END_OF_PAGE
228 ):
229 current_page = None
230 elif seg_page:
231 current_page = seg_page
233 if fix_last_page and current_page and (seg_num is not None):
234 segment = self.get_eop_segment(seg_num + 1, current_page)
235 data = self.encode_segment(segment)
236 self.stream.write(data)
237 data_len += len(data)
239 return data_len
241 def write_file(
242 self,
243 segments: Iterable[JBIG2Segment],
244 fix_last_page: bool = True,
245 ) -> int:
246 header = FILE_HEADER_ID
247 header_flags = FILE_HEAD_FLAG_SEQUENTIAL
248 header += pack(">B", header_flags)
249 # The embedded JBIG2 files in a PDF always
250 # only have one page
251 number_of_pages = pack(">L", 1)
252 header += number_of_pages
253 self.stream.write(header)
254 data_len = len(header)
256 data_len += self.write_segments(segments, fix_last_page)
258 seg_num = 0
259 for segment in segments:
260 seg_num = cast(int, segment["number"])
262 seg_num_offset = 2 if fix_last_page else 1
263 eof_segment = self.get_eof_segment(seg_num + seg_num_offset)
264 data = self.encode_segment(eof_segment)
266 self.stream.write(data)
267 data_len += len(data)
269 return data_len
271 def encode_segment(self, segment: JBIG2Segment) -> bytes:
272 data = b""
273 for field_format, name in SEG_STRUCT:
274 value = segment.get(name)
275 encoder = getattr(self, f"encode_{name}", None)
276 if callable(encoder):
277 field = encoder(value, segment)
278 else:
279 field = pack(field_format, value)
280 data += field
281 return data
283 def encode_flags(self, value: JBIG2SegmentFlags, segment: JBIG2Segment) -> bytes:
284 flags = 0
285 if value.get("deferred"):
286 flags |= HEADER_FLAG_DEFERRED
288 if "page_assoc_long" in value:
289 flags |= HEADER_FLAG_PAGE_ASSOC_LONG if value["page_assoc_long"] else flags
290 else:
291 flags |= (
292 HEADER_FLAG_PAGE_ASSOC_LONG
293 if cast(int, segment.get("page", 0)) > 255
294 else flags
295 )
297 flags |= mask_value(SEG_TYPE_MASK, value["type"])
299 return pack(">B", flags)
301 def encode_retention_flags(
302 self,
303 value: JBIG2RetentionFlags,
304 segment: JBIG2Segment,
305 ) -> bytes:
306 flags = []
307 flags_format = ">B"
308 ref_count = value["ref_count"]
309 assert isinstance(ref_count, int)
310 retain_segments = cast(list[bool], value.get("retain_segments", []))
312 if ref_count <= 4:
313 flags_byte = mask_value(REF_COUNT_SHORT_MASK, ref_count)
314 for ref_index, ref_retain in enumerate(retain_segments):
315 if ref_retain:
316 flags_byte |= 1 << ref_index
317 flags.append(flags_byte)
318 else:
319 bytes_count = math.ceil((ref_count + 1) / 8)
320 flags_format = ">L" + ("B" * bytes_count)
321 flags_dword = mask_value(REF_COUNT_SHORT_MASK, REF_COUNT_LONG) << 24
322 flags.append(flags_dword)
324 for byte_index in range(bytes_count):
325 ret_byte = 0
326 ret_part = retain_segments[byte_index * 8 : byte_index * 8 + 8]
327 for bit_pos, ret_seg in enumerate(ret_part):
328 ret_byte |= 1 << bit_pos if ret_seg else ret_byte
330 flags.append(ret_byte)
332 ref_segments = cast(list[int], value.get("ref_segments", []))
334 seg_num = cast(int, segment["number"])
335 if seg_num <= 256:
336 ref_format = "B"
337 elif seg_num <= 65536:
338 ref_format = "I"
339 else:
340 ref_format = "L"
342 for ref in ref_segments:
343 flags_format += ref_format
344 flags.append(ref)
346 return pack(flags_format, *flags)
348 def encode_data_length(self, value: int, segment: JBIG2Segment) -> bytes:
349 data = pack(">L", value)
350 data += cast(bytes, segment["raw_data"])
351 return data
353 def get_eop_segment(self, seg_number: int, page_number: int) -> JBIG2Segment:
354 return {
355 "data_length": 0,
356 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_PAGE},
357 "number": seg_number,
358 "page_assoc": page_number,
359 "raw_data": b"",
360 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS,
361 }
363 def get_eof_segment(self, seg_number: int) -> JBIG2Segment:
364 return {
365 "data_length": 0,
366 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_FILE},
367 "number": seg_number,
368 "page_assoc": 0,
369 "raw_data": b"",
370 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS,
371 }