Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/PdfParser.py: 19%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1from __future__ import annotations
3import calendar
4import codecs
5import collections
6import mmap
7import os
8import re
9import time
10import zlib
11from typing import Any, NamedTuple
13TYPE_CHECKING = False
14if TYPE_CHECKING:
15 from typing import IO
17 _DictBase = collections.UserDict[str | bytes, Any]
18else:
19 _DictBase = collections.UserDict
22# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set
23# on page 656
24def encode_text(s: str) -> bytes:
25 return codecs.BOM_UTF16_BE + s.encode("utf_16_be")
28PDFDocEncoding = {
29 0x16: "\u0017",
30 0x18: "\u02d8",
31 0x19: "\u02c7",
32 0x1A: "\u02c6",
33 0x1B: "\u02d9",
34 0x1C: "\u02dd",
35 0x1D: "\u02db",
36 0x1E: "\u02da",
37 0x1F: "\u02dc",
38 0x80: "\u2022",
39 0x81: "\u2020",
40 0x82: "\u2021",
41 0x83: "\u2026",
42 0x84: "\u2014",
43 0x85: "\u2013",
44 0x86: "\u0192",
45 0x87: "\u2044",
46 0x88: "\u2039",
47 0x89: "\u203a",
48 0x8A: "\u2212",
49 0x8B: "\u2030",
50 0x8C: "\u201e",
51 0x8D: "\u201c",
52 0x8E: "\u201d",
53 0x8F: "\u2018",
54 0x90: "\u2019",
55 0x91: "\u201a",
56 0x92: "\u2122",
57 0x93: "\ufb01",
58 0x94: "\ufb02",
59 0x95: "\u0141",
60 0x96: "\u0152",
61 0x97: "\u0160",
62 0x98: "\u0178",
63 0x99: "\u017d",
64 0x9A: "\u0131",
65 0x9B: "\u0142",
66 0x9C: "\u0153",
67 0x9D: "\u0161",
68 0x9E: "\u017e",
69 0xA0: "\u20ac",
70}
73def decode_text(b: bytes) -> str:
74 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
75 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be")
76 else:
77 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b)
80class PdfFormatError(RuntimeError):
81 """An error that probably indicates a syntactic or semantic error in the
82 PDF file structure"""
84 pass
87def check_format_condition(condition: bool, error_message: str) -> None:
88 if not condition:
89 raise PdfFormatError(error_message)
92class IndirectReferenceTuple(NamedTuple):
93 object_id: int
94 generation: int
97class IndirectReference(IndirectReferenceTuple):
98 def __str__(self) -> str:
99 return f"{self.object_id} {self.generation} R"
101 def __bytes__(self) -> bytes:
102 return self.__str__().encode("us-ascii")
104 def __eq__(self, other: object) -> bool:
105 if self.__class__ is not other.__class__:
106 return False
107 assert isinstance(other, IndirectReference)
108 return other.object_id == self.object_id and other.generation == self.generation
110 def __ne__(self, other: object) -> bool:
111 return not (self == other)
113 def __hash__(self) -> int:
114 return hash((self.object_id, self.generation))
117class IndirectObjectDef(IndirectReference):
118 def __str__(self) -> str:
119 return f"{self.object_id} {self.generation} obj"
122class XrefTable:
123 def __init__(self) -> None:
124 self.existing_entries: dict[int, tuple[int, int]] = (
125 {}
126 ) # object ID => (offset, generation)
127 self.new_entries: dict[int, tuple[int, int]] = (
128 {}
129 ) # object ID => (offset, generation)
130 self.deleted_entries = {0: 65536} # object ID => generation
131 self.reading_finished = False
133 def __setitem__(self, key: int, value: tuple[int, int]) -> None:
134 if self.reading_finished:
135 self.new_entries[key] = value
136 else:
137 self.existing_entries[key] = value
138 if key in self.deleted_entries:
139 del self.deleted_entries[key]
141 def __getitem__(self, key: int) -> tuple[int, int]:
142 try:
143 return self.new_entries[key]
144 except KeyError:
145 return self.existing_entries[key]
147 def __delitem__(self, key: int) -> None:
148 if key in self.new_entries:
149 generation = self.new_entries[key][1] + 1
150 del self.new_entries[key]
151 self.deleted_entries[key] = generation
152 elif key in self.existing_entries:
153 generation = self.existing_entries[key][1] + 1
154 self.deleted_entries[key] = generation
155 elif key in self.deleted_entries:
156 generation = self.deleted_entries[key]
157 else:
158 msg = f"object ID {key} cannot be deleted because it doesn't exist"
159 raise IndexError(msg)
161 def __contains__(self, key: int) -> bool:
162 return key in self.existing_entries or key in self.new_entries
164 def __len__(self) -> int:
165 return len(
166 set(self.existing_entries.keys())
167 | set(self.new_entries.keys())
168 | set(self.deleted_entries.keys())
169 )
171 def keys(self) -> set[int]:
172 return (
173 set(self.existing_entries.keys()) - set(self.deleted_entries.keys())
174 ) | set(self.new_entries.keys())
176 def write(self, f: IO[bytes]) -> int:
177 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
178 deleted_keys = sorted(set(self.deleted_entries.keys()))
179 startxref = f.tell()
180 f.write(b"xref\n")
181 while keys:
182 # find a contiguous sequence of object IDs
183 prev: int | None = None
184 for index, key in enumerate(keys):
185 if prev is None or prev + 1 == key:
186 prev = key
187 else:
188 contiguous_keys = keys[:index]
189 keys = keys[index:]
190 break
191 else:
192 contiguous_keys = keys
193 keys = []
194 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys)))
195 for object_id in contiguous_keys:
196 if object_id in self.new_entries:
197 f.write(b"%010d %05d n \n" % self.new_entries[object_id])
198 else:
199 this_deleted_object_id = deleted_keys.pop(0)
200 check_format_condition(
201 object_id == this_deleted_object_id,
202 f"expected the next deleted object ID to be {object_id}, "
203 f"instead found {this_deleted_object_id}",
204 )
205 try:
206 next_in_linked_list = deleted_keys[0]
207 except IndexError:
208 next_in_linked_list = 0
209 f.write(
210 b"%010d %05d f \n"
211 % (next_in_linked_list, self.deleted_entries[object_id])
212 )
213 return startxref
216class PdfName:
217 name: bytes
219 def __init__(self, name: PdfName | bytes | str) -> None:
220 if isinstance(name, PdfName):
221 self.name = name.name
222 elif isinstance(name, bytes):
223 self.name = name
224 else:
225 self.name = name.encode("us-ascii")
227 def name_as_str(self) -> str:
228 return self.name.decode("us-ascii")
230 def __eq__(self, other: object) -> bool:
231 return (
232 isinstance(other, PdfName) and other.name == self.name
233 ) or other == self.name
235 def __hash__(self) -> int:
236 return hash(self.name)
238 def __repr__(self) -> str:
239 return f"{self.__class__.__name__}({repr(self.name)})"
241 @classmethod
242 def from_pdf_stream(cls, data: bytes) -> PdfName:
243 return cls(PdfParser.interpret_name(data))
245 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"}
247 def __bytes__(self) -> bytes:
248 result = bytearray(b"/")
249 for b in self.name:
250 if b in self.allowed_chars:
251 result.append(b)
252 else:
253 result.extend(b"#%02X" % b)
254 return bytes(result)
257class PdfArray(list[Any]):
258 def __bytes__(self) -> bytes:
259 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]"
262class PdfDict(_DictBase):
263 def __setattr__(self, key: str, value: Any) -> None:
264 if key == "data":
265 collections.UserDict.__setattr__(self, key, value)
266 else:
267 self[key.encode("us-ascii")] = value
269 def __getattr__(self, key: str) -> str | time.struct_time:
270 try:
271 value = self[key.encode("us-ascii")]
272 except KeyError as e:
273 raise AttributeError(key) from e
274 if isinstance(value, bytes):
275 value = decode_text(value)
276 if key.endswith("Date"):
277 if value.startswith("D:"):
278 value = value[2:]
280 relationship = "Z"
281 if len(value) > 17:
282 relationship = value[14]
283 offset = int(value[15:17]) * 60
284 if len(value) > 20:
285 offset += int(value[18:20])
287 format = "%Y%m%d%H%M%S"[: len(value) - 2]
288 value = time.strptime(value[: len(format) + 2], format)
289 if relationship in ["+", "-"]:
290 offset *= 60
291 if relationship == "+":
292 offset *= -1
293 value = time.gmtime(calendar.timegm(value) + offset)
294 return value
296 def __bytes__(self) -> bytes:
297 out = bytearray(b"<<")
298 for key, value in self.items():
299 if value is None:
300 continue
301 value = pdf_repr(value)
302 out.extend(b"\n")
303 out.extend(bytes(PdfName(key)))
304 out.extend(b" ")
305 out.extend(value)
306 out.extend(b"\n>>")
307 return bytes(out)
310class PdfBinary:
311 def __init__(self, data: list[int] | bytes) -> None:
312 self.data = data
314 def __bytes__(self) -> bytes:
315 return b"<%s>" % b"".join(b"%02X" % b for b in self.data)
318class PdfStream:
319 def __init__(self, dictionary: PdfDict, buf: bytes) -> None:
320 self.dictionary = dictionary
321 self.buf = buf
323 def decode(self) -> bytes:
324 try:
325 filter = self.dictionary[b"Filter"]
326 except KeyError:
327 return self.buf
328 if filter == b"FlateDecode":
329 try:
330 expected_length = self.dictionary[b"DL"]
331 except KeyError:
332 expected_length = self.dictionary[b"Length"]
333 return zlib.decompress(self.buf, bufsize=int(expected_length))
334 else:
335 msg = f"stream filter {repr(filter)} unknown/unsupported"
336 raise NotImplementedError(msg)
339def pdf_repr(x: Any) -> bytes:
340 if x is True:
341 return b"true"
342 elif x is False:
343 return b"false"
344 elif x is None:
345 return b"null"
346 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)):
347 return bytes(x)
348 elif isinstance(x, (int, float)):
349 return str(x).encode("us-ascii")
350 elif isinstance(x, time.struct_time):
351 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")"
352 elif isinstance(x, dict):
353 return bytes(PdfDict(x))
354 elif isinstance(x, list):
355 return bytes(PdfArray(x))
356 elif isinstance(x, str):
357 return pdf_repr(encode_text(x))
358 elif isinstance(x, bytes):
359 # XXX escape more chars? handle binary garbage
360 x = x.replace(b"\\", b"\\\\")
361 x = x.replace(b"(", b"\\(")
362 x = x.replace(b")", b"\\)")
363 return b"(" + x + b")"
364 else:
365 return bytes(x)
368class PdfParser:
369 """Based on
370 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf
371 Supports PDF up to 1.4
372 """
374 def __init__(
375 self,
376 filename: str | None = None,
377 f: IO[bytes] | None = None,
378 buf: bytes | bytearray | None = None,
379 start_offset: int = 0,
380 mode: str = "rb",
381 ) -> None:
382 if buf and f:
383 msg = "specify buf or f or filename, but not both buf and f"
384 raise RuntimeError(msg)
385 self.filename = filename
386 self.buf: bytes | bytearray | mmap.mmap | None = buf
387 self.f = f
388 self.start_offset = start_offset
389 self.should_close_buf = False
390 self.should_close_file = False
391 if filename is not None and f is None:
392 self.f = f = open(filename, mode)
393 self.should_close_file = True
394 if f is not None:
395 self.buf = self.get_buf_from_file(f)
396 self.should_close_buf = True
397 if not filename and hasattr(f, "name"):
398 self.filename = f.name
399 self.cached_objects: dict[IndirectReference, Any] = {}
400 self.root_ref: IndirectReference | None
401 self.info_ref: IndirectReference | None
402 self.pages_ref: IndirectReference | None
403 self.last_xref_section_offset: int | None
404 if self.buf:
405 self.read_pdf_info()
406 else:
407 self.file_size_total = self.file_size_this = 0
408 self.root = PdfDict()
409 self.root_ref = None
410 self.info = PdfDict()
411 self.info_ref = None
412 self.page_tree_root = PdfDict()
413 self.pages: list[IndirectReference] = []
414 self.orig_pages: list[IndirectReference] = []
415 self.pages_ref = None
416 self.last_xref_section_offset = None
417 self.trailer_dict: dict[bytes, Any] = {}
418 self.xref_table = XrefTable()
419 self.xref_table.reading_finished = True
420 if f:
421 self.seek_end()
423 def __enter__(self) -> PdfParser:
424 return self
426 def __exit__(self, *args: object) -> None:
427 self.close()
429 def start_writing(self) -> None:
430 self.close_buf()
431 self.seek_end()
433 def close_buf(self) -> None:
434 if isinstance(self.buf, mmap.mmap):
435 self.buf.close()
436 self.buf = None
438 def close(self) -> None:
439 if self.should_close_buf:
440 self.close_buf()
441 if self.f is not None and self.should_close_file:
442 self.f.close()
443 self.f = None
445 def seek_end(self) -> None:
446 assert self.f is not None
447 self.f.seek(0, os.SEEK_END)
449 def write_header(self) -> None:
450 assert self.f is not None
451 self.f.write(b"%PDF-1.4\n")
453 def write_comment(self, s: str) -> None:
454 assert self.f is not None
455 self.f.write(f"% {s}\n".encode())
457 def write_catalog(self) -> IndirectReference:
458 assert self.f is not None
459 self.del_root()
460 self.root_ref = self.next_object_id(self.f.tell())
461 self.pages_ref = self.next_object_id(0)
462 self.rewrite_pages()
463 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref)
464 self.write_obj(
465 self.pages_ref,
466 Type=PdfName(b"Pages"),
467 Count=len(self.pages),
468 Kids=self.pages,
469 )
470 return self.root_ref
472 def rewrite_pages(self) -> None:
473 pages_tree_nodes_to_delete = []
474 for i, page_ref in enumerate(self.orig_pages):
475 page_info = self.cached_objects[page_ref]
476 del self.xref_table[page_ref.object_id]
477 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")])
478 if page_ref not in self.pages:
479 # the page has been deleted
480 continue
481 # make dict keys into strings for passing to write_page
482 stringified_page_info = {}
483 for key, value in page_info.items():
484 # key should be a PdfName
485 stringified_page_info[key.name_as_str()] = value
486 stringified_page_info["Parent"] = self.pages_ref
487 new_page_ref = self.write_page(None, **stringified_page_info)
488 for j, cur_page_ref in enumerate(self.pages):
489 if cur_page_ref == page_ref:
490 # replace the page reference with the new one
491 self.pages[j] = new_page_ref
492 # delete redundant Pages tree nodes from xref table
493 for pages_tree_node_ref in pages_tree_nodes_to_delete:
494 while pages_tree_node_ref:
495 pages_tree_node = self.cached_objects[pages_tree_node_ref]
496 if pages_tree_node_ref.object_id in self.xref_table:
497 del self.xref_table[pages_tree_node_ref.object_id]
498 pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
499 self.orig_pages = []
501 def write_xref_and_trailer(
502 self, new_root_ref: IndirectReference | None = None
503 ) -> None:
504 assert self.f is not None
505 if new_root_ref:
506 self.del_root()
507 self.root_ref = new_root_ref
508 if self.info:
509 self.info_ref = self.write_obj(None, self.info)
510 start_xref = self.xref_table.write(self.f)
511 num_entries = len(self.xref_table)
512 trailer_dict: dict[str | bytes, Any] = {
513 b"Root": self.root_ref,
514 b"Size": num_entries,
515 }
516 if self.last_xref_section_offset is not None:
517 trailer_dict[b"Prev"] = self.last_xref_section_offset
518 if self.info:
519 trailer_dict[b"Info"] = self.info_ref
520 self.last_xref_section_offset = start_xref
521 self.f.write(
522 b"trailer\n"
523 + bytes(PdfDict(trailer_dict))
524 + b"\nstartxref\n%d\n%%%%EOF" % start_xref
525 )
527 def write_page(
528 self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any
529 ) -> IndirectReference:
530 obj_ref = self.pages[ref] if isinstance(ref, int) else ref
531 if "Type" not in dict_obj:
532 dict_obj["Type"] = PdfName(b"Page")
533 if "Parent" not in dict_obj:
534 dict_obj["Parent"] = self.pages_ref
535 return self.write_obj(obj_ref, *objs, **dict_obj)
537 def write_obj(
538 self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any
539 ) -> IndirectReference:
540 assert self.f is not None
541 f = self.f
542 if ref is None:
543 ref = self.next_object_id(f.tell())
544 else:
545 self.xref_table[ref.object_id] = (f.tell(), ref.generation)
546 f.write(bytes(IndirectObjectDef(*ref)))
547 stream = dict_obj.pop("stream", None)
548 if stream is not None:
549 dict_obj["Length"] = len(stream)
550 if dict_obj:
551 f.write(pdf_repr(dict_obj))
552 for obj in objs:
553 f.write(pdf_repr(obj))
554 if stream is not None:
555 f.write(b"stream\n")
556 f.write(stream)
557 f.write(b"\nendstream\n")
558 f.write(b"endobj\n")
559 return ref
561 def del_root(self) -> None:
562 if self.root_ref is None:
563 return
564 del self.xref_table[self.root_ref.object_id]
565 del self.xref_table[self.root[b"Pages"].object_id]
567 @staticmethod
568 def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap:
569 if hasattr(f, "getbuffer"):
570 return f.getbuffer()
571 elif hasattr(f, "getvalue"):
572 return f.getvalue()
573 else:
574 try:
575 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
576 except ValueError: # cannot mmap an empty file
577 return b""
579 def read_pdf_info(self) -> None:
580 assert self.buf is not None
581 self.file_size_total = len(self.buf)
582 self.file_size_this = self.file_size_total - self.start_offset
583 self.read_trailer()
584 check_format_condition(
585 self.trailer_dict.get(b"Root") is not None, "Root is missing"
586 )
587 self.root_ref = self.trailer_dict[b"Root"]
588 assert self.root_ref is not None
589 self.info_ref = self.trailer_dict.get(b"Info", None)
590 self.root = PdfDict(self.read_indirect(self.root_ref))
591 if self.info_ref is None:
592 self.info = PdfDict()
593 else:
594 self.info = PdfDict(self.read_indirect(self.info_ref))
595 check_format_condition(b"Type" in self.root, "/Type missing in Root")
596 check_format_condition(
597 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog"
598 )
599 check_format_condition(
600 self.root.get(b"Pages") is not None, "/Pages missing in Root"
601 )
602 check_format_condition(
603 isinstance(self.root[b"Pages"], IndirectReference),
604 "/Pages in Root is not an indirect reference",
605 )
606 self.pages_ref = self.root[b"Pages"]
607 assert self.pages_ref is not None
608 self.page_tree_root = self.read_indirect(self.pages_ref)
609 self.pages = self.linearize_page_tree(self.page_tree_root)
610 # save the original list of page references
611 # in case the user modifies, adds or deletes some pages
612 # and we need to rewrite the pages and their list
613 self.orig_pages = self.pages[:]
615 def next_object_id(self, offset: int | None = None) -> IndirectReference:
616 try:
617 # TODO: support reuse of deleted objects
618 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
619 except ValueError:
620 reference = IndirectReference(1, 0)
621 if offset is not None:
622 self.xref_table[reference.object_id] = (offset, 0)
623 return reference
625 delimiter = rb"[][()<>{}/%]"
626 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]"
627 whitespace = rb"[\000\011\012\014\015\040]"
628 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]"
629 whitespace_optional = whitespace + b"*"
630 whitespace_mandatory = whitespace + b"+"
631 # No "\012" aka "\n" or "\015" aka "\r":
632 whitespace_optional_no_nl = rb"[\000\011\014\040]*"
633 newline_only = rb"[\r\n]+"
634 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl
635 re_trailer_end = re.compile(
636 whitespace_mandatory
637 + rb"trailer"
638 + whitespace_optional
639 + rb"<<(.*>>)"
640 + newline
641 + rb"startxref"
642 + newline
643 + rb"([0-9]+)"
644 + newline
645 + rb"%%EOF"
646 + whitespace_optional
647 + rb"$",
648 re.DOTALL,
649 )
650 re_trailer_prev = re.compile(
651 whitespace_optional
652 + rb"trailer"
653 + whitespace_optional
654 + rb"<<(.*?>>)"
655 + newline
656 + rb"startxref"
657 + newline
658 + rb"([0-9]+)"
659 + newline
660 + rb"%%EOF"
661 + whitespace_optional,
662 re.DOTALL,
663 )
665 def read_trailer(self) -> None:
666 assert self.buf is not None
667 search_start_offset = len(self.buf) - 16384
668 if search_start_offset < self.start_offset:
669 search_start_offset = self.start_offset
670 m = self.re_trailer_end.search(self.buf, search_start_offset)
671 check_format_condition(m is not None, "trailer end not found")
672 # make sure we found the LAST trailer
673 last_match = m
674 while m:
675 last_match = m
676 m = self.re_trailer_end.search(self.buf, m.start() + 16)
677 if not m:
678 m = last_match
679 assert m is not None
680 trailer_data = m.group(1)
681 self.last_xref_section_offset = int(m.group(2))
682 self.trailer_dict = self.interpret_trailer(trailer_data)
683 self.xref_table = XrefTable()
684 self.read_xref_table(xref_section_offset=self.last_xref_section_offset)
685 if b"Prev" in self.trailer_dict:
686 self.read_prev_trailer(self.trailer_dict[b"Prev"])
688 def read_prev_trailer(self, xref_section_offset: int) -> None:
689 assert self.buf is not None
690 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
691 m = self.re_trailer_prev.search(
692 self.buf[trailer_offset : trailer_offset + 16384]
693 )
694 check_format_condition(m is not None, "previous trailer not found")
695 assert m is not None
696 trailer_data = m.group(1)
697 check_format_condition(
698 int(m.group(2)) == xref_section_offset,
699 "xref section offset in previous trailer doesn't match what was expected",
700 )
701 trailer_dict = self.interpret_trailer(trailer_data)
702 if b"Prev" in trailer_dict:
703 self.read_prev_trailer(trailer_dict[b"Prev"])
705 re_whitespace_optional = re.compile(whitespace_optional)
706 re_name = re.compile(
707 whitespace_optional
708 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?="
709 + delimiter_or_ws
710 + rb")"
711 )
712 re_dict_start = re.compile(whitespace_optional + rb"<<")
713 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional)
715 @classmethod
716 def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]:
717 trailer = {}
718 offset = 0
719 while True:
720 m = cls.re_name.match(trailer_data, offset)
721 if not m:
722 m = cls.re_dict_end.match(trailer_data, offset)
723 check_format_condition(
724 m is not None and m.end() == len(trailer_data),
725 "name not found in trailer, remaining data: "
726 + repr(trailer_data[offset:]),
727 )
728 break
729 key = cls.interpret_name(m.group(1))
730 assert isinstance(key, bytes)
731 value, value_offset = cls.get_value(trailer_data, m.end())
732 trailer[key] = value
733 if value_offset is None:
734 break
735 offset = value_offset
736 check_format_condition(
737 b"Size" in trailer and isinstance(trailer[b"Size"], int),
738 "/Size not in trailer or not an integer",
739 )
740 check_format_condition(
741 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference),
742 "/Root not in trailer or not an indirect reference",
743 )
744 return trailer
746 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?")
748 @classmethod
749 def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes:
750 name = b""
751 for m in cls.re_hashes_in_name.finditer(raw):
752 if m.group(3):
753 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii"))
754 else:
755 name += m.group(1)
756 if as_text:
757 return name.decode("utf-8")
758 else:
759 return bytes(name)
761 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")")
762 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")")
763 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")")
764 re_int = re.compile(
765 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")"
766 )
767 re_real = re.compile(
768 whitespace_optional
769 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?="
770 + delimiter_or_ws
771 + rb")"
772 )
773 re_array_start = re.compile(whitespace_optional + rb"\[")
774 re_array_end = re.compile(whitespace_optional + rb"]")
775 re_string_hex = re.compile(
776 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>"
777 )
778 re_string_lit = re.compile(whitespace_optional + rb"\(")
779 re_indirect_reference = re.compile(
780 whitespace_optional
781 + rb"([-+]?[0-9]+)"
782 + whitespace_mandatory
783 + rb"([-+]?[0-9]+)"
784 + whitespace_mandatory
785 + rb"R(?="
786 + delimiter_or_ws
787 + rb")"
788 )
789 re_indirect_def_start = re.compile(
790 whitespace_optional
791 + rb"([-+]?[0-9]+)"
792 + whitespace_mandatory
793 + rb"([-+]?[0-9]+)"
794 + whitespace_mandatory
795 + rb"obj(?="
796 + delimiter_or_ws
797 + rb")"
798 )
799 re_indirect_def_end = re.compile(
800 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")"
801 )
802 re_comment = re.compile(
803 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*"
804 )
805 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n")
806 re_stream_end = re.compile(
807 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")"
808 )
810 @classmethod
811 def get_value(
812 cls,
813 data: bytes | bytearray | mmap.mmap,
814 offset: int,
815 expect_indirect: IndirectReference | None = None,
816 max_nesting: int = -1,
817 ) -> tuple[Any, int | None]:
818 if max_nesting == 0:
819 return None, None
820 m = cls.re_comment.match(data, offset)
821 if m:
822 offset = m.end()
823 m = cls.re_indirect_def_start.match(data, offset)
824 if m:
825 check_format_condition(
826 int(m.group(1)) > 0,
827 "indirect object definition: object ID must be greater than 0",
828 )
829 check_format_condition(
830 int(m.group(2)) >= 0,
831 "indirect object definition: generation must be non-negative",
832 )
833 check_format_condition(
834 expect_indirect is None
835 or expect_indirect
836 == IndirectReference(int(m.group(1)), int(m.group(2))),
837 "indirect object definition different than expected",
838 )
839 object, object_offset = cls.get_value(
840 data, m.end(), max_nesting=max_nesting - 1
841 )
842 if object_offset is None:
843 return object, None
844 m = cls.re_indirect_def_end.match(data, object_offset)
845 check_format_condition(
846 m is not None, "indirect object definition end not found"
847 )
848 assert m is not None
849 return object, m.end()
850 check_format_condition(
851 not expect_indirect, "indirect object definition not found"
852 )
853 m = cls.re_indirect_reference.match(data, offset)
854 if m:
855 check_format_condition(
856 int(m.group(1)) > 0,
857 "indirect object reference: object ID must be greater than 0",
858 )
859 check_format_condition(
860 int(m.group(2)) >= 0,
861 "indirect object reference: generation must be non-negative",
862 )
863 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end()
864 m = cls.re_dict_start.match(data, offset)
865 if m:
866 offset = m.end()
867 result: dict[Any, Any] = {}
868 m = cls.re_dict_end.match(data, offset)
869 current_offset: int | None = offset
870 while not m:
871 assert current_offset is not None
872 key, current_offset = cls.get_value(
873 data, current_offset, max_nesting=max_nesting - 1
874 )
875 if current_offset is None:
876 return result, None
877 value, current_offset = cls.get_value(
878 data, current_offset, max_nesting=max_nesting - 1
879 )
880 result[key] = value
881 if current_offset is None:
882 return result, None
883 m = cls.re_dict_end.match(data, current_offset)
884 current_offset = m.end()
885 m = cls.re_stream_start.match(data, current_offset)
886 if m:
887 stream_len = result.get(b"Length")
888 if stream_len is None or not isinstance(stream_len, int):
889 msg = f"bad or missing Length in stream dict ({stream_len})"
890 raise PdfFormatError(msg)
891 stream_data = data[m.end() : m.end() + stream_len]
892 m = cls.re_stream_end.match(data, m.end() + stream_len)
893 check_format_condition(m is not None, "stream end not found")
894 assert m is not None
895 current_offset = m.end()
896 return PdfStream(PdfDict(result), stream_data), current_offset
897 return PdfDict(result), current_offset
898 m = cls.re_array_start.match(data, offset)
899 if m:
900 offset = m.end()
901 results = []
902 m = cls.re_array_end.match(data, offset)
903 current_offset = offset
904 while not m:
905 assert current_offset is not None
906 value, current_offset = cls.get_value(
907 data, current_offset, max_nesting=max_nesting - 1
908 )
909 results.append(value)
910 if current_offset is None:
911 return results, None
912 m = cls.re_array_end.match(data, current_offset)
913 return results, m.end()
914 m = cls.re_null.match(data, offset)
915 if m:
916 return None, m.end()
917 m = cls.re_true.match(data, offset)
918 if m:
919 return True, m.end()
920 m = cls.re_false.match(data, offset)
921 if m:
922 return False, m.end()
923 m = cls.re_name.match(data, offset)
924 if m:
925 return PdfName(cls.interpret_name(m.group(1))), m.end()
926 m = cls.re_int.match(data, offset)
927 if m:
928 return int(m.group(1)), m.end()
929 m = cls.re_real.match(data, offset)
930 if m:
931 # XXX Decimal instead of float???
932 return float(m.group(1)), m.end()
933 m = cls.re_string_hex.match(data, offset)
934 if m:
935 # filter out whitespace
936 hex_string = bytearray(
937 b for b in m.group(1) if b in b"0123456789abcdefABCDEF"
938 )
939 if len(hex_string) % 2 == 1:
940 # append a 0 if the length is not even - yes, at the end
941 hex_string.append(ord(b"0"))
942 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end()
943 m = cls.re_string_lit.match(data, offset)
944 if m:
945 return cls.get_literal_string(data, m.end())
946 # return None, offset # fallback (only for debugging)
947 msg = f"unrecognized object: {repr(data[offset : offset + 32])}"
948 raise PdfFormatError(msg)
950 re_lit_str_token = re.compile(
951 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))"
952 )
953 escaped_chars = {
954 b"n": b"\n",
955 b"r": b"\r",
956 b"t": b"\t",
957 b"b": b"\b",
958 b"f": b"\f",
959 b"(": b"(",
960 b")": b")",
961 b"\\": b"\\",
962 ord(b"n"): b"\n",
963 ord(b"r"): b"\r",
964 ord(b"t"): b"\t",
965 ord(b"b"): b"\b",
966 ord(b"f"): b"\f",
967 ord(b"("): b"(",
968 ord(b")"): b")",
969 ord(b"\\"): b"\\",
970 }
972 @classmethod
973 def get_literal_string(
974 cls, data: bytes | bytearray | mmap.mmap, offset: int
975 ) -> tuple[bytes, int]:
976 nesting_depth = 0
977 result = bytearray()
978 for m in cls.re_lit_str_token.finditer(data, offset):
979 result.extend(data[offset : m.start()])
980 if m.group(1):
981 result.extend(cls.escaped_chars[m.group(1)[1]])
982 elif m.group(2):
983 result.append(int(m.group(2)[1:], 8))
984 elif m.group(3):
985 pass
986 elif m.group(5):
987 result.extend(b"\n")
988 elif m.group(6):
989 result.extend(b"(")
990 nesting_depth += 1
991 elif m.group(7):
992 if nesting_depth == 0:
993 return bytes(result), m.end()
994 result.extend(b")")
995 nesting_depth -= 1
996 offset = m.end()
997 msg = "unfinished literal string"
998 raise PdfFormatError(msg)
1000 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline)
1001 re_xref_subsection_start = re.compile(
1002 whitespace_optional
1003 + rb"([0-9]+)"
1004 + whitespace_mandatory
1005 + rb"([0-9]+)"
1006 + whitespace_optional
1007 + newline_only
1008 )
1009 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
1011 def read_xref_table(self, xref_section_offset: int) -> int:
1012 assert self.buf is not None
1013 subsection_found = False
1014 m = self.re_xref_section_start.match(
1015 self.buf, xref_section_offset + self.start_offset
1016 )
1017 check_format_condition(m is not None, "xref section start not found")
1018 assert m is not None
1019 offset = m.end()
1020 while True:
1021 m = self.re_xref_subsection_start.match(self.buf, offset)
1022 if not m:
1023 check_format_condition(
1024 subsection_found, "xref subsection start not found"
1025 )
1026 break
1027 subsection_found = True
1028 offset = m.end()
1029 first_object = int(m.group(1))
1030 num_objects = int(m.group(2))
1031 for i in range(first_object, first_object + num_objects):
1032 m = self.re_xref_entry.match(self.buf, offset)
1033 check_format_condition(m is not None, "xref entry not found")
1034 assert m is not None
1035 offset = m.end()
1036 is_free = m.group(3) == b"f"
1037 if not is_free:
1038 generation = int(m.group(2))
1039 new_entry = (int(m.group(1)), generation)
1040 if i not in self.xref_table:
1041 self.xref_table[i] = new_entry
1042 return offset
1044 def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any:
1045 offset, generation = self.xref_table[ref[0]]
1046 check_format_condition(
1047 generation == ref[1],
1048 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref "
1049 f"table, instead found generation {generation} at offset {offset}",
1050 )
1051 assert self.buf is not None
1052 value = self.get_value(
1053 self.buf,
1054 offset + self.start_offset,
1055 expect_indirect=IndirectReference(*ref),
1056 max_nesting=max_nesting,
1057 )[0]
1058 self.cached_objects[ref] = value
1059 return value
1061 def linearize_page_tree(
1062 self, node: PdfDict | None = None
1063 ) -> list[IndirectReference]:
1064 page_node = node if node is not None else self.page_tree_root
1065 check_format_condition(
1066 page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages"
1067 )
1068 pages = []
1069 for kid in page_node[b"Kids"]:
1070 kid_object = self.read_indirect(kid)
1071 if kid_object[b"Type"] == b"Page":
1072 pages.append(kid)
1073 else:
1074 pages.extend(self.linearize_page_tree(node=kid_object))
1075 return pages