Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/jbig2.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import math
2import os
3from struct import calcsize, pack, unpack
4from typing import BinaryIO, Dict, Iterable, List, Optional, Tuple, Union, cast
6from pdfminer.pdfexceptions import PDFValueError
8# segment structure base
9SEG_STRUCT = [
10 (">L", "number"),
11 (">B", "flags"),
12 (">B", "retention_flags"),
13 (">B", "page_assoc"),
14 (">L", "data_length"),
15]
17# segment header literals
18HEADER_FLAG_DEFERRED = 0b10000000
19HEADER_FLAG_PAGE_ASSOC_LONG = 0b01000000
21SEG_TYPE_MASK = 0b00111111
23REF_COUNT_SHORT_MASK = 0b11100000
24REF_COUNT_LONG_MASK = 0x1FFFFFFF
25REF_COUNT_LONG = 7
27DATA_LEN_UNKNOWN = 0xFFFFFFFF
29# segment types
30SEG_TYPE_IMMEDIATE_GEN_REGION = 38
31SEG_TYPE_END_OF_PAGE = 49
32SEG_TYPE_END_OF_FILE = 51
34# file literals
35FILE_HEADER_ID = b"\x97\x4a\x42\x32\x0d\x0a\x1a\x0a"
36FILE_HEAD_FLAG_SEQUENTIAL = 0b00000001
39def bit_set(bit_pos: int, value: int) -> bool:
40 return bool((value >> bit_pos) & 1)
43def check_flag(flag: int, value: int) -> bool:
44 return bool(flag & value)
47def masked_value(mask: int, value: int) -> int:
48 for bit_pos in range(31):
49 if bit_set(bit_pos, mask):
50 return (value & mask) >> bit_pos
52 raise PDFValueError("Invalid mask or value")
55def mask_value(mask: int, value: int) -> int:
56 for bit_pos in range(31):
57 if bit_set(bit_pos, mask):
58 return (value & (mask >> bit_pos)) << bit_pos
60 raise PDFValueError("Invalid mask or value")
63def unpack_int(format: str, buffer: bytes) -> int:
64 assert format in {">B", ">I", ">L"}
65 [result] = cast(Tuple[int], unpack(format, buffer))
66 return result
69JBIG2SegmentFlags = Dict[str, Union[int, bool]]
70JBIG2RetentionFlags = Dict[str, Union[int, List[int], List[bool]]]
71JBIG2Segment = Dict[
72 str,
73 Union[bool, int, bytes, JBIG2SegmentFlags, JBIG2RetentionFlags],
74]
77class JBIG2StreamReader:
78 """Read segments from a JBIG2 byte stream"""
80 def __init__(self, stream: BinaryIO) -> None:
81 self.stream = stream
83 def get_segments(self) -> List[JBIG2Segment]:
84 segments: List[JBIG2Segment] = []
85 while not self.is_eof():
86 segment: JBIG2Segment = {}
87 for field_format, name in SEG_STRUCT:
88 field_len = calcsize(field_format)
89 field = self.stream.read(field_len)
90 if len(field) < field_len:
91 segment["_error"] = True
92 break
93 value = unpack_int(field_format, field)
94 parser = getattr(self, "parse_%s" % name, None)
95 if callable(parser):
96 value = parser(segment, value, field)
97 segment[name] = value
99 if not segment.get("_error"):
100 segments.append(segment)
101 return segments
103 def is_eof(self) -> bool:
104 if self.stream.read(1) == b"":
105 return True
106 else:
107 self.stream.seek(-1, os.SEEK_CUR)
108 return False
110 def parse_flags(
111 self,
112 segment: JBIG2Segment,
113 flags: int,
114 field: bytes,
115 ) -> JBIG2SegmentFlags:
116 return {
117 "deferred": check_flag(HEADER_FLAG_DEFERRED, flags),
118 "page_assoc_long": check_flag(HEADER_FLAG_PAGE_ASSOC_LONG, flags),
119 "type": masked_value(SEG_TYPE_MASK, flags),
120 }
122 def parse_retention_flags(
123 self,
124 segment: JBIG2Segment,
125 flags: int,
126 field: bytes,
127 ) -> JBIG2RetentionFlags:
128 ref_count = masked_value(REF_COUNT_SHORT_MASK, flags)
129 retain_segments = []
130 ref_segments = []
132 if ref_count < REF_COUNT_LONG:
133 for bit_pos in range(5):
134 retain_segments.append(bit_set(bit_pos, flags))
135 else:
136 field += self.stream.read(3)
137 ref_count = unpack_int(">L", field)
138 ref_count = masked_value(REF_COUNT_LONG_MASK, ref_count)
139 ret_bytes_count = int(math.ceil((ref_count + 1) / 8))
140 for ret_byte_index in range(ret_bytes_count):
141 ret_byte = unpack_int(">B", self.stream.read(1))
142 for bit_pos in range(7):
143 retain_segments.append(bit_set(bit_pos, ret_byte))
145 seg_num = segment["number"]
146 assert isinstance(seg_num, int)
147 if seg_num <= 256:
148 ref_format = ">B"
149 elif seg_num <= 65536:
150 ref_format = ">I"
151 else:
152 ref_format = ">L"
154 ref_size = calcsize(ref_format)
156 for ref_index in range(ref_count):
157 ref_data = self.stream.read(ref_size)
158 ref = unpack_int(ref_format, ref_data)
159 ref_segments.append(ref)
161 return {
162 "ref_count": ref_count,
163 "retain_segments": retain_segments,
164 "ref_segments": ref_segments,
165 }
167 def parse_page_assoc(self, segment: JBIG2Segment, page: int, field: bytes) -> int:
168 if cast(JBIG2SegmentFlags, segment["flags"])["page_assoc_long"]:
169 field += self.stream.read(3)
170 page = unpack_int(">L", field)
171 return page
173 def parse_data_length(
174 self,
175 segment: JBIG2Segment,
176 length: int,
177 field: bytes,
178 ) -> int:
179 if length:
180 if (
181 cast(JBIG2SegmentFlags, segment["flags"])["type"]
182 == SEG_TYPE_IMMEDIATE_GEN_REGION
183 ) and (length == DATA_LEN_UNKNOWN):
184 raise NotImplementedError(
185 "Working with unknown segment length is not implemented yet",
186 )
187 else:
188 segment["raw_data"] = self.stream.read(length)
190 return length
193class JBIG2StreamWriter:
194 """Write JBIG2 segments to a file in JBIG2 format"""
196 EMPTY_RETENTION_FLAGS: JBIG2RetentionFlags = {
197 "ref_count": 0,
198 "ref_segments": cast(List[int], []),
199 "retain_segments": cast(List[bool], []),
200 }
202 def __init__(self, stream: BinaryIO) -> None:
203 self.stream = stream
205 def write_segments(
206 self,
207 segments: Iterable[JBIG2Segment],
208 fix_last_page: bool = True,
209 ) -> int:
210 data_len = 0
211 current_page: Optional[int] = None
212 seg_num: Optional[int] = None
214 for segment in segments:
215 data = self.encode_segment(segment)
216 self.stream.write(data)
217 data_len += len(data)
219 seg_num = cast(Optional[int], segment["number"])
221 if fix_last_page:
222 seg_page = cast(int, segment.get("page_assoc"))
224 if (
225 cast(JBIG2SegmentFlags, segment["flags"])["type"]
226 == SEG_TYPE_END_OF_PAGE
227 ):
228 current_page = None
229 elif seg_page:
230 current_page = seg_page
232 if fix_last_page and current_page and (seg_num is not None):
233 segment = self.get_eop_segment(seg_num + 1, current_page)
234 data = self.encode_segment(segment)
235 self.stream.write(data)
236 data_len += len(data)
238 return data_len
240 def write_file(
241 self,
242 segments: Iterable[JBIG2Segment],
243 fix_last_page: bool = True,
244 ) -> int:
245 header = FILE_HEADER_ID
246 header_flags = FILE_HEAD_FLAG_SEQUENTIAL
247 header += pack(">B", header_flags)
248 # The embedded JBIG2 files in a PDF always
249 # only have one page
250 number_of_pages = pack(">L", 1)
251 header += number_of_pages
252 self.stream.write(header)
253 data_len = len(header)
255 data_len += self.write_segments(segments, fix_last_page)
257 seg_num = 0
258 for segment in segments:
259 seg_num = cast(int, segment["number"])
261 if fix_last_page:
262 seg_num_offset = 2
263 else:
264 seg_num_offset = 1
265 eof_segment = self.get_eof_segment(seg_num + seg_num_offset)
266 data = self.encode_segment(eof_segment)
268 self.stream.write(data)
269 data_len += len(data)
271 return data_len
273 def encode_segment(self, segment: JBIG2Segment) -> bytes:
274 data = b""
275 for field_format, name in SEG_STRUCT:
276 value = segment.get(name)
277 encoder = getattr(self, "encode_%s" % name, None)
278 if callable(encoder):
279 field = encoder(value, segment)
280 else:
281 field = pack(field_format, value)
282 data += field
283 return data
285 def encode_flags(self, value: JBIG2SegmentFlags, segment: JBIG2Segment) -> bytes:
286 flags = 0
287 if value.get("deferred"):
288 flags |= HEADER_FLAG_DEFERRED
290 if "page_assoc_long" in value:
291 flags |= HEADER_FLAG_PAGE_ASSOC_LONG if value["page_assoc_long"] else flags
292 else:
293 flags |= (
294 HEADER_FLAG_PAGE_ASSOC_LONG
295 if cast(int, segment.get("page", 0)) > 255
296 else flags
297 )
299 flags |= mask_value(SEG_TYPE_MASK, value["type"])
301 return pack(">B", flags)
303 def encode_retention_flags(
304 self,
305 value: JBIG2RetentionFlags,
306 segment: JBIG2Segment,
307 ) -> bytes:
308 flags = []
309 flags_format = ">B"
310 ref_count = value["ref_count"]
311 assert isinstance(ref_count, int)
312 retain_segments = cast(List[bool], value.get("retain_segments", []))
314 if ref_count <= 4:
315 flags_byte = mask_value(REF_COUNT_SHORT_MASK, ref_count)
316 for ref_index, ref_retain in enumerate(retain_segments):
317 if ref_retain:
318 flags_byte |= 1 << ref_index
319 flags.append(flags_byte)
320 else:
321 bytes_count = math.ceil((ref_count + 1) / 8)
322 flags_format = ">L" + ("B" * bytes_count)
323 flags_dword = mask_value(REF_COUNT_SHORT_MASK, REF_COUNT_LONG) << 24
324 flags.append(flags_dword)
326 for byte_index in range(bytes_count):
327 ret_byte = 0
328 ret_part = retain_segments[byte_index * 8 : byte_index * 8 + 8]
329 for bit_pos, ret_seg in enumerate(ret_part):
330 ret_byte |= 1 << bit_pos if ret_seg else ret_byte
332 flags.append(ret_byte)
334 ref_segments = cast(List[int], value.get("ref_segments", []))
336 seg_num = cast(int, segment["number"])
337 if seg_num <= 256:
338 ref_format = "B"
339 elif seg_num <= 65536:
340 ref_format = "I"
341 else:
342 ref_format = "L"
344 for ref in ref_segments:
345 flags_format += ref_format
346 flags.append(ref)
348 return pack(flags_format, *flags)
350 def encode_data_length(self, value: int, segment: JBIG2Segment) -> bytes:
351 data = pack(">L", value)
352 data += cast(bytes, segment["raw_data"])
353 return data
355 def get_eop_segment(self, seg_number: int, page_number: int) -> JBIG2Segment:
356 return {
357 "data_length": 0,
358 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_PAGE},
359 "number": seg_number,
360 "page_assoc": page_number,
361 "raw_data": b"",
362 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS,
363 }
365 def get_eof_segment(self, seg_number: int) -> JBIG2Segment:
366 return {
367 "data_length": 0,
368 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_FILE},
369 "number": seg_number,
370 "page_assoc": 0,
371 "raw_data": b"",
372 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS,
373 }