1from __future__ import annotations
2
3import calendar
4import codecs
5import collections
6import mmap
7import os
8import re
9import time
10import zlib
11from typing import IO, TYPE_CHECKING, Any, NamedTuple, Union
12
13
14# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set
15# on page 656
16def encode_text(s: str) -> bytes:
17 return codecs.BOM_UTF16_BE + s.encode("utf_16_be")
18
19
20PDFDocEncoding = {
21 0x16: "\u0017",
22 0x18: "\u02D8",
23 0x19: "\u02C7",
24 0x1A: "\u02C6",
25 0x1B: "\u02D9",
26 0x1C: "\u02DD",
27 0x1D: "\u02DB",
28 0x1E: "\u02DA",
29 0x1F: "\u02DC",
30 0x80: "\u2022",
31 0x81: "\u2020",
32 0x82: "\u2021",
33 0x83: "\u2026",
34 0x84: "\u2014",
35 0x85: "\u2013",
36 0x86: "\u0192",
37 0x87: "\u2044",
38 0x88: "\u2039",
39 0x89: "\u203A",
40 0x8A: "\u2212",
41 0x8B: "\u2030",
42 0x8C: "\u201E",
43 0x8D: "\u201C",
44 0x8E: "\u201D",
45 0x8F: "\u2018",
46 0x90: "\u2019",
47 0x91: "\u201A",
48 0x92: "\u2122",
49 0x93: "\uFB01",
50 0x94: "\uFB02",
51 0x95: "\u0141",
52 0x96: "\u0152",
53 0x97: "\u0160",
54 0x98: "\u0178",
55 0x99: "\u017D",
56 0x9A: "\u0131",
57 0x9B: "\u0142",
58 0x9C: "\u0153",
59 0x9D: "\u0161",
60 0x9E: "\u017E",
61 0xA0: "\u20AC",
62}
63
64
65def decode_text(b: bytes) -> str:
66 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
67 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be")
68 else:
69 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b)
70
71
72class PdfFormatError(RuntimeError):
73 """An error that probably indicates a syntactic or semantic error in the
74 PDF file structure"""
75
76 pass
77
78
79def check_format_condition(condition: bool, error_message: str) -> None:
80 if not condition:
81 raise PdfFormatError(error_message)
82
83
84class IndirectReferenceTuple(NamedTuple):
85 object_id: int
86 generation: int
87
88
89class IndirectReference(IndirectReferenceTuple):
90 def __str__(self) -> str:
91 return f"{self.object_id} {self.generation} R"
92
93 def __bytes__(self) -> bytes:
94 return self.__str__().encode("us-ascii")
95
96 def __eq__(self, other: object) -> bool:
97 if self.__class__ is not other.__class__:
98 return False
99 assert isinstance(other, IndirectReference)
100 return other.object_id == self.object_id and other.generation == self.generation
101
102 def __ne__(self, other: object) -> bool:
103 return not (self == other)
104
105 def __hash__(self) -> int:
106 return hash((self.object_id, self.generation))
107
108
109class IndirectObjectDef(IndirectReference):
110 def __str__(self) -> str:
111 return f"{self.object_id} {self.generation} obj"
112
113
114class XrefTable:
115 def __init__(self) -> None:
116 self.existing_entries: dict[int, tuple[int, int]] = (
117 {}
118 ) # object ID => (offset, generation)
119 self.new_entries: dict[int, tuple[int, int]] = (
120 {}
121 ) # object ID => (offset, generation)
122 self.deleted_entries = {0: 65536} # object ID => generation
123 self.reading_finished = False
124
125 def __setitem__(self, key: int, value: tuple[int, int]) -> None:
126 if self.reading_finished:
127 self.new_entries[key] = value
128 else:
129 self.existing_entries[key] = value
130 if key in self.deleted_entries:
131 del self.deleted_entries[key]
132
133 def __getitem__(self, key: int) -> tuple[int, int]:
134 try:
135 return self.new_entries[key]
136 except KeyError:
137 return self.existing_entries[key]
138
139 def __delitem__(self, key: int) -> None:
140 if key in self.new_entries:
141 generation = self.new_entries[key][1] + 1
142 del self.new_entries[key]
143 self.deleted_entries[key] = generation
144 elif key in self.existing_entries:
145 generation = self.existing_entries[key][1] + 1
146 self.deleted_entries[key] = generation
147 elif key in self.deleted_entries:
148 generation = self.deleted_entries[key]
149 else:
150 msg = f"object ID {key} cannot be deleted because it doesn't exist"
151 raise IndexError(msg)
152
153 def __contains__(self, key: int) -> bool:
154 return key in self.existing_entries or key in self.new_entries
155
156 def __len__(self) -> int:
157 return len(
158 set(self.existing_entries.keys())
159 | set(self.new_entries.keys())
160 | set(self.deleted_entries.keys())
161 )
162
163 def keys(self) -> set[int]:
164 return (
165 set(self.existing_entries.keys()) - set(self.deleted_entries.keys())
166 ) | set(self.new_entries.keys())
167
168 def write(self, f: IO[bytes]) -> int:
169 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
170 deleted_keys = sorted(set(self.deleted_entries.keys()))
171 startxref = f.tell()
172 f.write(b"xref\n")
173 while keys:
174 # find a contiguous sequence of object IDs
175 prev: int | None = None
176 for index, key in enumerate(keys):
177 if prev is None or prev + 1 == key:
178 prev = key
179 else:
180 contiguous_keys = keys[:index]
181 keys = keys[index:]
182 break
183 else:
184 contiguous_keys = keys
185 keys = []
186 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys)))
187 for object_id in contiguous_keys:
188 if object_id in self.new_entries:
189 f.write(b"%010d %05d n \n" % self.new_entries[object_id])
190 else:
191 this_deleted_object_id = deleted_keys.pop(0)
192 check_format_condition(
193 object_id == this_deleted_object_id,
194 f"expected the next deleted object ID to be {object_id}, "
195 f"instead found {this_deleted_object_id}",
196 )
197 try:
198 next_in_linked_list = deleted_keys[0]
199 except IndexError:
200 next_in_linked_list = 0
201 f.write(
202 b"%010d %05d f \n"
203 % (next_in_linked_list, self.deleted_entries[object_id])
204 )
205 return startxref
206
207
208class PdfName:
209 name: bytes
210
211 def __init__(self, name: PdfName | bytes | str) -> None:
212 if isinstance(name, PdfName):
213 self.name = name.name
214 elif isinstance(name, bytes):
215 self.name = name
216 else:
217 self.name = name.encode("us-ascii")
218
219 def name_as_str(self) -> str:
220 return self.name.decode("us-ascii")
221
222 def __eq__(self, other: object) -> bool:
223 return (
224 isinstance(other, PdfName) and other.name == self.name
225 ) or other == self.name
226
227 def __hash__(self) -> int:
228 return hash(self.name)
229
230 def __repr__(self) -> str:
231 return f"{self.__class__.__name__}({repr(self.name)})"
232
233 @classmethod
234 def from_pdf_stream(cls, data: bytes) -> PdfName:
235 return cls(PdfParser.interpret_name(data))
236
237 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"}
238
239 def __bytes__(self) -> bytes:
240 result = bytearray(b"/")
241 for b in self.name:
242 if b in self.allowed_chars:
243 result.append(b)
244 else:
245 result.extend(b"#%02X" % b)
246 return bytes(result)
247
248
249class PdfArray(list[Any]):
250 def __bytes__(self) -> bytes:
251 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]"
252
253
254if TYPE_CHECKING:
255 _DictBase = collections.UserDict[Union[str, bytes], Any]
256else:
257 _DictBase = collections.UserDict
258
259
260class PdfDict(_DictBase):
261 def __setattr__(self, key: str, value: Any) -> None:
262 if key == "data":
263 collections.UserDict.__setattr__(self, key, value)
264 else:
265 self[key.encode("us-ascii")] = value
266
267 def __getattr__(self, key: str) -> str | time.struct_time:
268 try:
269 value = self[key.encode("us-ascii")]
270 except KeyError as e:
271 raise AttributeError(key) from e
272 if isinstance(value, bytes):
273 value = decode_text(value)
274 if key.endswith("Date"):
275 if value.startswith("D:"):
276 value = value[2:]
277
278 relationship = "Z"
279 if len(value) > 17:
280 relationship = value[14]
281 offset = int(value[15:17]) * 60
282 if len(value) > 20:
283 offset += int(value[18:20])
284
285 format = "%Y%m%d%H%M%S"[: len(value) - 2]
286 value = time.strptime(value[: len(format) + 2], format)
287 if relationship in ["+", "-"]:
288 offset *= 60
289 if relationship == "+":
290 offset *= -1
291 value = time.gmtime(calendar.timegm(value) + offset)
292 return value
293
294 def __bytes__(self) -> bytes:
295 out = bytearray(b"<<")
296 for key, value in self.items():
297 if value is None:
298 continue
299 value = pdf_repr(value)
300 out.extend(b"\n")
301 out.extend(bytes(PdfName(key)))
302 out.extend(b" ")
303 out.extend(value)
304 out.extend(b"\n>>")
305 return bytes(out)
306
307
308class PdfBinary:
309 def __init__(self, data: list[int] | bytes) -> None:
310 self.data = data
311
312 def __bytes__(self) -> bytes:
313 return b"<%s>" % b"".join(b"%02X" % b for b in self.data)
314
315
316class PdfStream:
317 def __init__(self, dictionary: PdfDict, buf: bytes) -> None:
318 self.dictionary = dictionary
319 self.buf = buf
320
321 def decode(self) -> bytes:
322 try:
323 filter = self.dictionary[b"Filter"]
324 except KeyError:
325 return self.buf
326 if filter == b"FlateDecode":
327 try:
328 expected_length = self.dictionary[b"DL"]
329 except KeyError:
330 expected_length = self.dictionary[b"Length"]
331 return zlib.decompress(self.buf, bufsize=int(expected_length))
332 else:
333 msg = f"stream filter {repr(filter)} unknown/unsupported"
334 raise NotImplementedError(msg)
335
336
337def pdf_repr(x: Any) -> bytes:
338 if x is True:
339 return b"true"
340 elif x is False:
341 return b"false"
342 elif x is None:
343 return b"null"
344 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)):
345 return bytes(x)
346 elif isinstance(x, (int, float)):
347 return str(x).encode("us-ascii")
348 elif isinstance(x, time.struct_time):
349 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")"
350 elif isinstance(x, dict):
351 return bytes(PdfDict(x))
352 elif isinstance(x, list):
353 return bytes(PdfArray(x))
354 elif isinstance(x, str):
355 return pdf_repr(encode_text(x))
356 elif isinstance(x, bytes):
357 # XXX escape more chars? handle binary garbage
358 x = x.replace(b"\\", b"\\\\")
359 x = x.replace(b"(", b"\\(")
360 x = x.replace(b")", b"\\)")
361 return b"(" + x + b")"
362 else:
363 return bytes(x)
364
365
366class PdfParser:
367 """Based on
368 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf
369 Supports PDF up to 1.4
370 """
371
372 def __init__(
373 self,
374 filename: str | None = None,
375 f: IO[bytes] | None = None,
376 buf: bytes | bytearray | None = None,
377 start_offset: int = 0,
378 mode: str = "rb",
379 ) -> None:
380 if buf and f:
381 msg = "specify buf or f or filename, but not both buf and f"
382 raise RuntimeError(msg)
383 self.filename = filename
384 self.buf: bytes | bytearray | mmap.mmap | None = buf
385 self.f = f
386 self.start_offset = start_offset
387 self.should_close_buf = False
388 self.should_close_file = False
389 if filename is not None and f is None:
390 self.f = f = open(filename, mode)
391 self.should_close_file = True
392 if f is not None:
393 self.buf = self.get_buf_from_file(f)
394 self.should_close_buf = True
395 if not filename and hasattr(f, "name"):
396 self.filename = f.name
397 self.cached_objects: dict[IndirectReference, Any] = {}
398 self.root_ref: IndirectReference | None
399 self.info_ref: IndirectReference | None
400 self.pages_ref: IndirectReference | None
401 self.last_xref_section_offset: int | None
402 if self.buf:
403 self.read_pdf_info()
404 else:
405 self.file_size_total = self.file_size_this = 0
406 self.root = PdfDict()
407 self.root_ref = None
408 self.info = PdfDict()
409 self.info_ref = None
410 self.page_tree_root = PdfDict()
411 self.pages: list[IndirectReference] = []
412 self.orig_pages: list[IndirectReference] = []
413 self.pages_ref = None
414 self.last_xref_section_offset = None
415 self.trailer_dict: dict[bytes, Any] = {}
416 self.xref_table = XrefTable()
417 self.xref_table.reading_finished = True
418 if f:
419 self.seek_end()
420
421 def __enter__(self) -> PdfParser:
422 return self
423
424 def __exit__(self, *args: object) -> None:
425 self.close()
426
427 def start_writing(self) -> None:
428 self.close_buf()
429 self.seek_end()
430
431 def close_buf(self) -> None:
432 if isinstance(self.buf, mmap.mmap):
433 self.buf.close()
434 self.buf = None
435
436 def close(self) -> None:
437 if self.should_close_buf:
438 self.close_buf()
439 if self.f is not None and self.should_close_file:
440 self.f.close()
441 self.f = None
442
443 def seek_end(self) -> None:
444 assert self.f is not None
445 self.f.seek(0, os.SEEK_END)
446
447 def write_header(self) -> None:
448 assert self.f is not None
449 self.f.write(b"%PDF-1.4\n")
450
451 def write_comment(self, s: str) -> None:
452 assert self.f is not None
453 self.f.write(f"% {s}\n".encode())
454
455 def write_catalog(self) -> IndirectReference:
456 assert self.f is not None
457 self.del_root()
458 self.root_ref = self.next_object_id(self.f.tell())
459 self.pages_ref = self.next_object_id(0)
460 self.rewrite_pages()
461 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref)
462 self.write_obj(
463 self.pages_ref,
464 Type=PdfName(b"Pages"),
465 Count=len(self.pages),
466 Kids=self.pages,
467 )
468 return self.root_ref
469
470 def rewrite_pages(self) -> None:
471 pages_tree_nodes_to_delete = []
472 for i, page_ref in enumerate(self.orig_pages):
473 page_info = self.cached_objects[page_ref]
474 del self.xref_table[page_ref.object_id]
475 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")])
476 if page_ref not in self.pages:
477 # the page has been deleted
478 continue
479 # make dict keys into strings for passing to write_page
480 stringified_page_info = {}
481 for key, value in page_info.items():
482 # key should be a PdfName
483 stringified_page_info[key.name_as_str()] = value
484 stringified_page_info["Parent"] = self.pages_ref
485 new_page_ref = self.write_page(None, **stringified_page_info)
486 for j, cur_page_ref in enumerate(self.pages):
487 if cur_page_ref == page_ref:
488 # replace the page reference with the new one
489 self.pages[j] = new_page_ref
490 # delete redundant Pages tree nodes from xref table
491 for pages_tree_node_ref in pages_tree_nodes_to_delete:
492 while pages_tree_node_ref:
493 pages_tree_node = self.cached_objects[pages_tree_node_ref]
494 if pages_tree_node_ref.object_id in self.xref_table:
495 del self.xref_table[pages_tree_node_ref.object_id]
496 pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
497 self.orig_pages = []
498
499 def write_xref_and_trailer(
500 self, new_root_ref: IndirectReference | None = None
501 ) -> None:
502 assert self.f is not None
503 if new_root_ref:
504 self.del_root()
505 self.root_ref = new_root_ref
506 if self.info:
507 self.info_ref = self.write_obj(None, self.info)
508 start_xref = self.xref_table.write(self.f)
509 num_entries = len(self.xref_table)
510 trailer_dict: dict[str | bytes, Any] = {
511 b"Root": self.root_ref,
512 b"Size": num_entries,
513 }
514 if self.last_xref_section_offset is not None:
515 trailer_dict[b"Prev"] = self.last_xref_section_offset
516 if self.info:
517 trailer_dict[b"Info"] = self.info_ref
518 self.last_xref_section_offset = start_xref
519 self.f.write(
520 b"trailer\n"
521 + bytes(PdfDict(trailer_dict))
522 + b"\nstartxref\n%d\n%%%%EOF" % start_xref
523 )
524
525 def write_page(
526 self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any
527 ) -> IndirectReference:
528 obj_ref = self.pages[ref] if isinstance(ref, int) else ref
529 if "Type" not in dict_obj:
530 dict_obj["Type"] = PdfName(b"Page")
531 if "Parent" not in dict_obj:
532 dict_obj["Parent"] = self.pages_ref
533 return self.write_obj(obj_ref, *objs, **dict_obj)
534
535 def write_obj(
536 self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any
537 ) -> IndirectReference:
538 assert self.f is not None
539 f = self.f
540 if ref is None:
541 ref = self.next_object_id(f.tell())
542 else:
543 self.xref_table[ref.object_id] = (f.tell(), ref.generation)
544 f.write(bytes(IndirectObjectDef(*ref)))
545 stream = dict_obj.pop("stream", None)
546 if stream is not None:
547 dict_obj["Length"] = len(stream)
548 if dict_obj:
549 f.write(pdf_repr(dict_obj))
550 for obj in objs:
551 f.write(pdf_repr(obj))
552 if stream is not None:
553 f.write(b"stream\n")
554 f.write(stream)
555 f.write(b"\nendstream\n")
556 f.write(b"endobj\n")
557 return ref
558
559 def del_root(self) -> None:
560 if self.root_ref is None:
561 return
562 del self.xref_table[self.root_ref.object_id]
563 del self.xref_table[self.root[b"Pages"].object_id]
564
565 @staticmethod
566 def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap:
567 if hasattr(f, "getbuffer"):
568 return f.getbuffer()
569 elif hasattr(f, "getvalue"):
570 return f.getvalue()
571 else:
572 try:
573 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
574 except ValueError: # cannot mmap an empty file
575 return b""
576
577 def read_pdf_info(self) -> None:
578 assert self.buf is not None
579 self.file_size_total = len(self.buf)
580 self.file_size_this = self.file_size_total - self.start_offset
581 self.read_trailer()
582 check_format_condition(
583 self.trailer_dict.get(b"Root") is not None, "Root is missing"
584 )
585 self.root_ref = self.trailer_dict[b"Root"]
586 assert self.root_ref is not None
587 self.info_ref = self.trailer_dict.get(b"Info", None)
588 self.root = PdfDict(self.read_indirect(self.root_ref))
589 if self.info_ref is None:
590 self.info = PdfDict()
591 else:
592 self.info = PdfDict(self.read_indirect(self.info_ref))
593 check_format_condition(b"Type" in self.root, "/Type missing in Root")
594 check_format_condition(
595 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog"
596 )
597 check_format_condition(
598 self.root.get(b"Pages") is not None, "/Pages missing in Root"
599 )
600 check_format_condition(
601 isinstance(self.root[b"Pages"], IndirectReference),
602 "/Pages in Root is not an indirect reference",
603 )
604 self.pages_ref = self.root[b"Pages"]
605 assert self.pages_ref is not None
606 self.page_tree_root = self.read_indirect(self.pages_ref)
607 self.pages = self.linearize_page_tree(self.page_tree_root)
608 # save the original list of page references
609 # in case the user modifies, adds or deletes some pages
610 # and we need to rewrite the pages and their list
611 self.orig_pages = self.pages[:]
612
613 def next_object_id(self, offset: int | None = None) -> IndirectReference:
614 try:
615 # TODO: support reuse of deleted objects
616 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
617 except ValueError:
618 reference = IndirectReference(1, 0)
619 if offset is not None:
620 self.xref_table[reference.object_id] = (offset, 0)
621 return reference
622
623 delimiter = rb"[][()<>{}/%]"
624 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]"
625 whitespace = rb"[\000\011\012\014\015\040]"
626 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]"
627 whitespace_optional = whitespace + b"*"
628 whitespace_mandatory = whitespace + b"+"
629 # No "\012" aka "\n" or "\015" aka "\r":
630 whitespace_optional_no_nl = rb"[\000\011\014\040]*"
631 newline_only = rb"[\r\n]+"
632 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl
633 re_trailer_end = re.compile(
634 whitespace_mandatory
635 + rb"trailer"
636 + whitespace_optional
637 + rb"<<(.*>>)"
638 + newline
639 + rb"startxref"
640 + newline
641 + rb"([0-9]+)"
642 + newline
643 + rb"%%EOF"
644 + whitespace_optional
645 + rb"$",
646 re.DOTALL,
647 )
648 re_trailer_prev = re.compile(
649 whitespace_optional
650 + rb"trailer"
651 + whitespace_optional
652 + rb"<<(.*?>>)"
653 + newline
654 + rb"startxref"
655 + newline
656 + rb"([0-9]+)"
657 + newline
658 + rb"%%EOF"
659 + whitespace_optional,
660 re.DOTALL,
661 )
662
663 def read_trailer(self) -> None:
664 assert self.buf is not None
665 search_start_offset = len(self.buf) - 16384
666 if search_start_offset < self.start_offset:
667 search_start_offset = self.start_offset
668 m = self.re_trailer_end.search(self.buf, search_start_offset)
669 check_format_condition(m is not None, "trailer end not found")
670 # make sure we found the LAST trailer
671 last_match = m
672 while m:
673 last_match = m
674 m = self.re_trailer_end.search(self.buf, m.start() + 16)
675 if not m:
676 m = last_match
677 assert m is not None
678 trailer_data = m.group(1)
679 self.last_xref_section_offset = int(m.group(2))
680 self.trailer_dict = self.interpret_trailer(trailer_data)
681 self.xref_table = XrefTable()
682 self.read_xref_table(xref_section_offset=self.last_xref_section_offset)
683 if b"Prev" in self.trailer_dict:
684 self.read_prev_trailer(self.trailer_dict[b"Prev"])
685
686 def read_prev_trailer(self, xref_section_offset: int) -> None:
687 assert self.buf is not None
688 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
689 m = self.re_trailer_prev.search(
690 self.buf[trailer_offset : trailer_offset + 16384]
691 )
692 check_format_condition(m is not None, "previous trailer not found")
693 assert m is not None
694 trailer_data = m.group(1)
695 check_format_condition(
696 int(m.group(2)) == xref_section_offset,
697 "xref section offset in previous trailer doesn't match what was expected",
698 )
699 trailer_dict = self.interpret_trailer(trailer_data)
700 if b"Prev" in trailer_dict:
701 self.read_prev_trailer(trailer_dict[b"Prev"])
702
703 re_whitespace_optional = re.compile(whitespace_optional)
704 re_name = re.compile(
705 whitespace_optional
706 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?="
707 + delimiter_or_ws
708 + rb")"
709 )
710 re_dict_start = re.compile(whitespace_optional + rb"<<")
711 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional)
712
713 @classmethod
714 def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]:
715 trailer = {}
716 offset = 0
717 while True:
718 m = cls.re_name.match(trailer_data, offset)
719 if not m:
720 m = cls.re_dict_end.match(trailer_data, offset)
721 check_format_condition(
722 m is not None and m.end() == len(trailer_data),
723 "name not found in trailer, remaining data: "
724 + repr(trailer_data[offset:]),
725 )
726 break
727 key = cls.interpret_name(m.group(1))
728 assert isinstance(key, bytes)
729 value, value_offset = cls.get_value(trailer_data, m.end())
730 trailer[key] = value
731 if value_offset is None:
732 break
733 offset = value_offset
734 check_format_condition(
735 b"Size" in trailer and isinstance(trailer[b"Size"], int),
736 "/Size not in trailer or not an integer",
737 )
738 check_format_condition(
739 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference),
740 "/Root not in trailer or not an indirect reference",
741 )
742 return trailer
743
744 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?")
745
746 @classmethod
747 def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes:
748 name = b""
749 for m in cls.re_hashes_in_name.finditer(raw):
750 if m.group(3):
751 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii"))
752 else:
753 name += m.group(1)
754 if as_text:
755 return name.decode("utf-8")
756 else:
757 return bytes(name)
758
759 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")")
760 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")")
761 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")")
762 re_int = re.compile(
763 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")"
764 )
765 re_real = re.compile(
766 whitespace_optional
767 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?="
768 + delimiter_or_ws
769 + rb")"
770 )
771 re_array_start = re.compile(whitespace_optional + rb"\[")
772 re_array_end = re.compile(whitespace_optional + rb"]")
773 re_string_hex = re.compile(
774 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>"
775 )
776 re_string_lit = re.compile(whitespace_optional + rb"\(")
777 re_indirect_reference = re.compile(
778 whitespace_optional
779 + rb"([-+]?[0-9]+)"
780 + whitespace_mandatory
781 + rb"([-+]?[0-9]+)"
782 + whitespace_mandatory
783 + rb"R(?="
784 + delimiter_or_ws
785 + rb")"
786 )
787 re_indirect_def_start = re.compile(
788 whitespace_optional
789 + rb"([-+]?[0-9]+)"
790 + whitespace_mandatory
791 + rb"([-+]?[0-9]+)"
792 + whitespace_mandatory
793 + rb"obj(?="
794 + delimiter_or_ws
795 + rb")"
796 )
797 re_indirect_def_end = re.compile(
798 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")"
799 )
800 re_comment = re.compile(
801 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*"
802 )
803 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n")
804 re_stream_end = re.compile(
805 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")"
806 )
807
808 @classmethod
809 def get_value(
810 cls,
811 data: bytes | bytearray | mmap.mmap,
812 offset: int,
813 expect_indirect: IndirectReference | None = None,
814 max_nesting: int = -1,
815 ) -> tuple[Any, int | None]:
816 if max_nesting == 0:
817 return None, None
818 m = cls.re_comment.match(data, offset)
819 if m:
820 offset = m.end()
821 m = cls.re_indirect_def_start.match(data, offset)
822 if m:
823 check_format_condition(
824 int(m.group(1)) > 0,
825 "indirect object definition: object ID must be greater than 0",
826 )
827 check_format_condition(
828 int(m.group(2)) >= 0,
829 "indirect object definition: generation must be non-negative",
830 )
831 check_format_condition(
832 expect_indirect is None
833 or expect_indirect
834 == IndirectReference(int(m.group(1)), int(m.group(2))),
835 "indirect object definition different than expected",
836 )
837 object, object_offset = cls.get_value(
838 data, m.end(), max_nesting=max_nesting - 1
839 )
840 if object_offset is None:
841 return object, None
842 m = cls.re_indirect_def_end.match(data, object_offset)
843 check_format_condition(
844 m is not None, "indirect object definition end not found"
845 )
846 assert m is not None
847 return object, m.end()
848 check_format_condition(
849 not expect_indirect, "indirect object definition not found"
850 )
851 m = cls.re_indirect_reference.match(data, offset)
852 if m:
853 check_format_condition(
854 int(m.group(1)) > 0,
855 "indirect object reference: object ID must be greater than 0",
856 )
857 check_format_condition(
858 int(m.group(2)) >= 0,
859 "indirect object reference: generation must be non-negative",
860 )
861 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end()
862 m = cls.re_dict_start.match(data, offset)
863 if m:
864 offset = m.end()
865 result: dict[Any, Any] = {}
866 m = cls.re_dict_end.match(data, offset)
867 current_offset: int | None = offset
868 while not m:
869 assert current_offset is not None
870 key, current_offset = cls.get_value(
871 data, current_offset, max_nesting=max_nesting - 1
872 )
873 if current_offset is None:
874 return result, None
875 value, current_offset = cls.get_value(
876 data, current_offset, max_nesting=max_nesting - 1
877 )
878 result[key] = value
879 if current_offset is None:
880 return result, None
881 m = cls.re_dict_end.match(data, current_offset)
882 current_offset = m.end()
883 m = cls.re_stream_start.match(data, current_offset)
884 if m:
885 stream_len = result.get(b"Length")
886 if stream_len is None or not isinstance(stream_len, int):
887 msg = f"bad or missing Length in stream dict ({stream_len})"
888 raise PdfFormatError(msg)
889 stream_data = data[m.end() : m.end() + stream_len]
890 m = cls.re_stream_end.match(data, m.end() + stream_len)
891 check_format_condition(m is not None, "stream end not found")
892 assert m is not None
893 current_offset = m.end()
894 return PdfStream(PdfDict(result), stream_data), current_offset
895 return PdfDict(result), current_offset
896 m = cls.re_array_start.match(data, offset)
897 if m:
898 offset = m.end()
899 results = []
900 m = cls.re_array_end.match(data, offset)
901 current_offset = offset
902 while not m:
903 assert current_offset is not None
904 value, current_offset = cls.get_value(
905 data, current_offset, max_nesting=max_nesting - 1
906 )
907 results.append(value)
908 if current_offset is None:
909 return results, None
910 m = cls.re_array_end.match(data, current_offset)
911 return results, m.end()
912 m = cls.re_null.match(data, offset)
913 if m:
914 return None, m.end()
915 m = cls.re_true.match(data, offset)
916 if m:
917 return True, m.end()
918 m = cls.re_false.match(data, offset)
919 if m:
920 return False, m.end()
921 m = cls.re_name.match(data, offset)
922 if m:
923 return PdfName(cls.interpret_name(m.group(1))), m.end()
924 m = cls.re_int.match(data, offset)
925 if m:
926 return int(m.group(1)), m.end()
927 m = cls.re_real.match(data, offset)
928 if m:
929 # XXX Decimal instead of float???
930 return float(m.group(1)), m.end()
931 m = cls.re_string_hex.match(data, offset)
932 if m:
933 # filter out whitespace
934 hex_string = bytearray(
935 b for b in m.group(1) if b in b"0123456789abcdefABCDEF"
936 )
937 if len(hex_string) % 2 == 1:
938 # append a 0 if the length is not even - yes, at the end
939 hex_string.append(ord(b"0"))
940 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end()
941 m = cls.re_string_lit.match(data, offset)
942 if m:
943 return cls.get_literal_string(data, m.end())
944 # return None, offset # fallback (only for debugging)
945 msg = f"unrecognized object: {repr(data[offset : offset + 32])}"
946 raise PdfFormatError(msg)
947
948 re_lit_str_token = re.compile(
949 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))"
950 )
951 escaped_chars = {
952 b"n": b"\n",
953 b"r": b"\r",
954 b"t": b"\t",
955 b"b": b"\b",
956 b"f": b"\f",
957 b"(": b"(",
958 b")": b")",
959 b"\\": b"\\",
960 ord(b"n"): b"\n",
961 ord(b"r"): b"\r",
962 ord(b"t"): b"\t",
963 ord(b"b"): b"\b",
964 ord(b"f"): b"\f",
965 ord(b"("): b"(",
966 ord(b")"): b")",
967 ord(b"\\"): b"\\",
968 }
969
970 @classmethod
971 def get_literal_string(
972 cls, data: bytes | bytearray | mmap.mmap, offset: int
973 ) -> tuple[bytes, int]:
974 nesting_depth = 0
975 result = bytearray()
976 for m in cls.re_lit_str_token.finditer(data, offset):
977 result.extend(data[offset : m.start()])
978 if m.group(1):
979 result.extend(cls.escaped_chars[m.group(1)[1]])
980 elif m.group(2):
981 result.append(int(m.group(2)[1:], 8))
982 elif m.group(3):
983 pass
984 elif m.group(5):
985 result.extend(b"\n")
986 elif m.group(6):
987 result.extend(b"(")
988 nesting_depth += 1
989 elif m.group(7):
990 if nesting_depth == 0:
991 return bytes(result), m.end()
992 result.extend(b")")
993 nesting_depth -= 1
994 offset = m.end()
995 msg = "unfinished literal string"
996 raise PdfFormatError(msg)
997
998 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline)
999 re_xref_subsection_start = re.compile(
1000 whitespace_optional
1001 + rb"([0-9]+)"
1002 + whitespace_mandatory
1003 + rb"([0-9]+)"
1004 + whitespace_optional
1005 + newline_only
1006 )
1007 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
1008
1009 def read_xref_table(self, xref_section_offset: int) -> int:
1010 assert self.buf is not None
1011 subsection_found = False
1012 m = self.re_xref_section_start.match(
1013 self.buf, xref_section_offset + self.start_offset
1014 )
1015 check_format_condition(m is not None, "xref section start not found")
1016 assert m is not None
1017 offset = m.end()
1018 while True:
1019 m = self.re_xref_subsection_start.match(self.buf, offset)
1020 if not m:
1021 check_format_condition(
1022 subsection_found, "xref subsection start not found"
1023 )
1024 break
1025 subsection_found = True
1026 offset = m.end()
1027 first_object = int(m.group(1))
1028 num_objects = int(m.group(2))
1029 for i in range(first_object, first_object + num_objects):
1030 m = self.re_xref_entry.match(self.buf, offset)
1031 check_format_condition(m is not None, "xref entry not found")
1032 assert m is not None
1033 offset = m.end()
1034 is_free = m.group(3) == b"f"
1035 if not is_free:
1036 generation = int(m.group(2))
1037 new_entry = (int(m.group(1)), generation)
1038 if i not in self.xref_table:
1039 self.xref_table[i] = new_entry
1040 return offset
1041
1042 def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any:
1043 offset, generation = self.xref_table[ref[0]]
1044 check_format_condition(
1045 generation == ref[1],
1046 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref "
1047 f"table, instead found generation {generation} at offset {offset}",
1048 )
1049 assert self.buf is not None
1050 value = self.get_value(
1051 self.buf,
1052 offset + self.start_offset,
1053 expect_indirect=IndirectReference(*ref),
1054 max_nesting=max_nesting,
1055 )[0]
1056 self.cached_objects[ref] = value
1057 return value
1058
1059 def linearize_page_tree(
1060 self, node: PdfDict | None = None
1061 ) -> list[IndirectReference]:
1062 page_node = node if node is not None else self.page_tree_root
1063 check_format_condition(
1064 page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages"
1065 )
1066 pages = []
1067 for kid in page_node[b"Kids"]:
1068 kid_object = self.read_indirect(kid)
1069 if kid_object[b"Type"] == b"Page":
1070 pages.append(kid)
1071 else:
1072 pages.extend(self.linearize_page_tree(node=kid_object))
1073 return pages