1from __future__ import annotations
2
3import calendar
4import codecs
5import collections
6import mmap
7import os
8import re
9import time
10import zlib
11from typing import TYPE_CHECKING, Any, List, NamedTuple, Union
12
13
14# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set
15# on page 656
16def encode_text(s: str) -> bytes:
17 return codecs.BOM_UTF16_BE + s.encode("utf_16_be")
18
19
20PDFDocEncoding = {
21 0x16: "\u0017",
22 0x18: "\u02D8",
23 0x19: "\u02C7",
24 0x1A: "\u02C6",
25 0x1B: "\u02D9",
26 0x1C: "\u02DD",
27 0x1D: "\u02DB",
28 0x1E: "\u02DA",
29 0x1F: "\u02DC",
30 0x80: "\u2022",
31 0x81: "\u2020",
32 0x82: "\u2021",
33 0x83: "\u2026",
34 0x84: "\u2014",
35 0x85: "\u2013",
36 0x86: "\u0192",
37 0x87: "\u2044",
38 0x88: "\u2039",
39 0x89: "\u203A",
40 0x8A: "\u2212",
41 0x8B: "\u2030",
42 0x8C: "\u201E",
43 0x8D: "\u201C",
44 0x8E: "\u201D",
45 0x8F: "\u2018",
46 0x90: "\u2019",
47 0x91: "\u201A",
48 0x92: "\u2122",
49 0x93: "\uFB01",
50 0x94: "\uFB02",
51 0x95: "\u0141",
52 0x96: "\u0152",
53 0x97: "\u0160",
54 0x98: "\u0178",
55 0x99: "\u017D",
56 0x9A: "\u0131",
57 0x9B: "\u0142",
58 0x9C: "\u0153",
59 0x9D: "\u0161",
60 0x9E: "\u017E",
61 0xA0: "\u20AC",
62}
63
64
65def decode_text(b):
66 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE:
67 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be")
68 else:
69 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b)
70
71
72class PdfFormatError(RuntimeError):
73 """An error that probably indicates a syntactic or semantic error in the
74 PDF file structure"""
75
76 pass
77
78
79def check_format_condition(condition: bool, error_message: str) -> None:
80 if not condition:
81 raise PdfFormatError(error_message)
82
83
84class IndirectReferenceTuple(NamedTuple):
85 object_id: int
86 generation: int
87
88
89class IndirectReference(IndirectReferenceTuple):
90 def __str__(self) -> str:
91 return f"{self.object_id} {self.generation} R"
92
93 def __bytes__(self) -> bytes:
94 return self.__str__().encode("us-ascii")
95
96 def __eq__(self, other: object) -> bool:
97 if self.__class__ is not other.__class__:
98 return False
99 assert isinstance(other, IndirectReference)
100 return other.object_id == self.object_id and other.generation == self.generation
101
102 def __ne__(self, other):
103 return not (self == other)
104
105 def __hash__(self) -> int:
106 return hash((self.object_id, self.generation))
107
108
109class IndirectObjectDef(IndirectReference):
110 def __str__(self) -> str:
111 return f"{self.object_id} {self.generation} obj"
112
113
114class XrefTable:
115 def __init__(self):
116 self.existing_entries = {} # object ID => (offset, generation)
117 self.new_entries = {} # object ID => (offset, generation)
118 self.deleted_entries = {0: 65536} # object ID => generation
119 self.reading_finished = False
120
121 def __setitem__(self, key, value):
122 if self.reading_finished:
123 self.new_entries[key] = value
124 else:
125 self.existing_entries[key] = value
126 if key in self.deleted_entries:
127 del self.deleted_entries[key]
128
129 def __getitem__(self, key):
130 try:
131 return self.new_entries[key]
132 except KeyError:
133 return self.existing_entries[key]
134
135 def __delitem__(self, key):
136 if key in self.new_entries:
137 generation = self.new_entries[key][1] + 1
138 del self.new_entries[key]
139 self.deleted_entries[key] = generation
140 elif key in self.existing_entries:
141 generation = self.existing_entries[key][1] + 1
142 self.deleted_entries[key] = generation
143 elif key in self.deleted_entries:
144 generation = self.deleted_entries[key]
145 else:
146 msg = f"object ID {key} cannot be deleted because it doesn't exist"
147 raise IndexError(msg)
148
149 def __contains__(self, key):
150 return key in self.existing_entries or key in self.new_entries
151
152 def __len__(self) -> int:
153 return len(
154 set(self.existing_entries.keys())
155 | set(self.new_entries.keys())
156 | set(self.deleted_entries.keys())
157 )
158
159 def keys(self):
160 return (
161 set(self.existing_entries.keys()) - set(self.deleted_entries.keys())
162 ) | set(self.new_entries.keys())
163
164 def write(self, f):
165 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys()))
166 deleted_keys = sorted(set(self.deleted_entries.keys()))
167 startxref = f.tell()
168 f.write(b"xref\n")
169 while keys:
170 # find a contiguous sequence of object IDs
171 prev = None
172 for index, key in enumerate(keys):
173 if prev is None or prev + 1 == key:
174 prev = key
175 else:
176 contiguous_keys = keys[:index]
177 keys = keys[index:]
178 break
179 else:
180 contiguous_keys = keys
181 keys = None
182 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys)))
183 for object_id in contiguous_keys:
184 if object_id in self.new_entries:
185 f.write(b"%010d %05d n \n" % self.new_entries[object_id])
186 else:
187 this_deleted_object_id = deleted_keys.pop(0)
188 check_format_condition(
189 object_id == this_deleted_object_id,
190 f"expected the next deleted object ID to be {object_id}, "
191 f"instead found {this_deleted_object_id}",
192 )
193 try:
194 next_in_linked_list = deleted_keys[0]
195 except IndexError:
196 next_in_linked_list = 0
197 f.write(
198 b"%010d %05d f \n"
199 % (next_in_linked_list, self.deleted_entries[object_id])
200 )
201 return startxref
202
203
204class PdfName:
205 def __init__(self, name):
206 if isinstance(name, PdfName):
207 self.name = name.name
208 elif isinstance(name, bytes):
209 self.name = name
210 else:
211 self.name = name.encode("us-ascii")
212
213 def name_as_str(self) -> str:
214 return self.name.decode("us-ascii")
215
216 def __eq__(self, other):
217 return (
218 isinstance(other, PdfName) and other.name == self.name
219 ) or other == self.name
220
221 def __hash__(self) -> int:
222 return hash(self.name)
223
224 def __repr__(self) -> str:
225 return f"{self.__class__.__name__}({repr(self.name)})"
226
227 @classmethod
228 def from_pdf_stream(cls, data):
229 return cls(PdfParser.interpret_name(data))
230
231 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"}
232
233 def __bytes__(self) -> bytes:
234 result = bytearray(b"/")
235 for b in self.name:
236 if b in self.allowed_chars:
237 result.append(b)
238 else:
239 result.extend(b"#%02X" % b)
240 return bytes(result)
241
242
243class PdfArray(List[Any]):
244 def __bytes__(self) -> bytes:
245 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]"
246
247
248if TYPE_CHECKING:
249 _DictBase = collections.UserDict[Union[str, bytes], Any]
250else:
251 _DictBase = collections.UserDict
252
253
254class PdfDict(_DictBase):
255 def __setattr__(self, key, value):
256 if key == "data":
257 collections.UserDict.__setattr__(self, key, value)
258 else:
259 self[key.encode("us-ascii")] = value
260
261 def __getattr__(self, key):
262 try:
263 value = self[key.encode("us-ascii")]
264 except KeyError as e:
265 raise AttributeError(key) from e
266 if isinstance(value, bytes):
267 value = decode_text(value)
268 if key.endswith("Date"):
269 if value.startswith("D:"):
270 value = value[2:]
271
272 relationship = "Z"
273 if len(value) > 17:
274 relationship = value[14]
275 offset = int(value[15:17]) * 60
276 if len(value) > 20:
277 offset += int(value[18:20])
278
279 format = "%Y%m%d%H%M%S"[: len(value) - 2]
280 value = time.strptime(value[: len(format) + 2], format)
281 if relationship in ["+", "-"]:
282 offset *= 60
283 if relationship == "+":
284 offset *= -1
285 value = time.gmtime(calendar.timegm(value) + offset)
286 return value
287
288 def __bytes__(self) -> bytes:
289 out = bytearray(b"<<")
290 for key, value in self.items():
291 if value is None:
292 continue
293 value = pdf_repr(value)
294 out.extend(b"\n")
295 out.extend(bytes(PdfName(key)))
296 out.extend(b" ")
297 out.extend(value)
298 out.extend(b"\n>>")
299 return bytes(out)
300
301
302class PdfBinary:
303 def __init__(self, data):
304 self.data = data
305
306 def __bytes__(self) -> bytes:
307 return b"<%s>" % b"".join(b"%02X" % b for b in self.data)
308
309
310class PdfStream:
311 def __init__(self, dictionary, buf):
312 self.dictionary = dictionary
313 self.buf = buf
314
315 def decode(self):
316 try:
317 filter = self.dictionary.Filter
318 except AttributeError:
319 return self.buf
320 if filter == b"FlateDecode":
321 try:
322 expected_length = self.dictionary.DL
323 except AttributeError:
324 expected_length = self.dictionary.Length
325 return zlib.decompress(self.buf, bufsize=int(expected_length))
326 else:
327 msg = f"stream filter {repr(self.dictionary.Filter)} unknown/unsupported"
328 raise NotImplementedError(msg)
329
330
331def pdf_repr(x):
332 if x is True:
333 return b"true"
334 elif x is False:
335 return b"false"
336 elif x is None:
337 return b"null"
338 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)):
339 return bytes(x)
340 elif isinstance(x, (int, float)):
341 return str(x).encode("us-ascii")
342 elif isinstance(x, time.struct_time):
343 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")"
344 elif isinstance(x, dict):
345 return bytes(PdfDict(x))
346 elif isinstance(x, list):
347 return bytes(PdfArray(x))
348 elif isinstance(x, str):
349 return pdf_repr(encode_text(x))
350 elif isinstance(x, bytes):
351 # XXX escape more chars? handle binary garbage
352 x = x.replace(b"\\", b"\\\\")
353 x = x.replace(b"(", b"\\(")
354 x = x.replace(b")", b"\\)")
355 return b"(" + x + b")"
356 else:
357 return bytes(x)
358
359
360class PdfParser:
361 """Based on
362 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf
363 Supports PDF up to 1.4
364 """
365
366 def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"):
367 if buf and f:
368 msg = "specify buf or f or filename, but not both buf and f"
369 raise RuntimeError(msg)
370 self.filename = filename
371 self.buf = buf
372 self.f = f
373 self.start_offset = start_offset
374 self.should_close_buf = False
375 self.should_close_file = False
376 if filename is not None and f is None:
377 self.f = f = open(filename, mode)
378 self.should_close_file = True
379 if f is not None:
380 self.buf = buf = self.get_buf_from_file(f)
381 self.should_close_buf = True
382 if not filename and hasattr(f, "name"):
383 self.filename = f.name
384 self.cached_objects = {}
385 if buf:
386 self.read_pdf_info()
387 else:
388 self.file_size_total = self.file_size_this = 0
389 self.root = PdfDict()
390 self.root_ref = None
391 self.info = PdfDict()
392 self.info_ref = None
393 self.page_tree_root = {}
394 self.pages = []
395 self.orig_pages = []
396 self.pages_ref = None
397 self.last_xref_section_offset = None
398 self.trailer_dict = {}
399 self.xref_table = XrefTable()
400 self.xref_table.reading_finished = True
401 if f:
402 self.seek_end()
403
404 def __enter__(self) -> PdfParser:
405 return self
406
407 def __exit__(self, *args: object) -> None:
408 self.close()
409
410 def start_writing(self) -> None:
411 self.close_buf()
412 self.seek_end()
413
414 def close_buf(self) -> None:
415 try:
416 self.buf.close()
417 except AttributeError:
418 pass
419 self.buf = None
420
421 def close(self) -> None:
422 if self.should_close_buf:
423 self.close_buf()
424 if self.f is not None and self.should_close_file:
425 self.f.close()
426 self.f = None
427
428 def seek_end(self) -> None:
429 self.f.seek(0, os.SEEK_END)
430
431 def write_header(self) -> None:
432 self.f.write(b"%PDF-1.4\n")
433
434 def write_comment(self, s):
435 self.f.write(f"% {s}\n".encode())
436
437 def write_catalog(self) -> IndirectReference:
438 self.del_root()
439 self.root_ref = self.next_object_id(self.f.tell())
440 self.pages_ref = self.next_object_id(0)
441 self.rewrite_pages()
442 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref)
443 self.write_obj(
444 self.pages_ref,
445 Type=PdfName(b"Pages"),
446 Count=len(self.pages),
447 Kids=self.pages,
448 )
449 return self.root_ref
450
451 def rewrite_pages(self) -> None:
452 pages_tree_nodes_to_delete = []
453 for i, page_ref in enumerate(self.orig_pages):
454 page_info = self.cached_objects[page_ref]
455 del self.xref_table[page_ref.object_id]
456 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")])
457 if page_ref not in self.pages:
458 # the page has been deleted
459 continue
460 # make dict keys into strings for passing to write_page
461 stringified_page_info = {}
462 for key, value in page_info.items():
463 # key should be a PdfName
464 stringified_page_info[key.name_as_str()] = value
465 stringified_page_info["Parent"] = self.pages_ref
466 new_page_ref = self.write_page(None, **stringified_page_info)
467 for j, cur_page_ref in enumerate(self.pages):
468 if cur_page_ref == page_ref:
469 # replace the page reference with the new one
470 self.pages[j] = new_page_ref
471 # delete redundant Pages tree nodes from xref table
472 for pages_tree_node_ref in pages_tree_nodes_to_delete:
473 while pages_tree_node_ref:
474 pages_tree_node = self.cached_objects[pages_tree_node_ref]
475 if pages_tree_node_ref.object_id in self.xref_table:
476 del self.xref_table[pages_tree_node_ref.object_id]
477 pages_tree_node_ref = pages_tree_node.get(b"Parent", None)
478 self.orig_pages = []
479
480 def write_xref_and_trailer(self, new_root_ref=None):
481 if new_root_ref:
482 self.del_root()
483 self.root_ref = new_root_ref
484 if self.info:
485 self.info_ref = self.write_obj(None, self.info)
486 start_xref = self.xref_table.write(self.f)
487 num_entries = len(self.xref_table)
488 trailer_dict = {b"Root": self.root_ref, b"Size": num_entries}
489 if self.last_xref_section_offset is not None:
490 trailer_dict[b"Prev"] = self.last_xref_section_offset
491 if self.info:
492 trailer_dict[b"Info"] = self.info_ref
493 self.last_xref_section_offset = start_xref
494 self.f.write(
495 b"trailer\n"
496 + bytes(PdfDict(trailer_dict))
497 + b"\nstartxref\n%d\n%%%%EOF" % start_xref
498 )
499
500 def write_page(self, ref, *objs, **dict_obj):
501 if isinstance(ref, int):
502 ref = self.pages[ref]
503 if "Type" not in dict_obj:
504 dict_obj["Type"] = PdfName(b"Page")
505 if "Parent" not in dict_obj:
506 dict_obj["Parent"] = self.pages_ref
507 return self.write_obj(ref, *objs, **dict_obj)
508
509 def write_obj(self, ref, *objs, **dict_obj):
510 f = self.f
511 if ref is None:
512 ref = self.next_object_id(f.tell())
513 else:
514 self.xref_table[ref.object_id] = (f.tell(), ref.generation)
515 f.write(bytes(IndirectObjectDef(*ref)))
516 stream = dict_obj.pop("stream", None)
517 if stream is not None:
518 dict_obj["Length"] = len(stream)
519 if dict_obj:
520 f.write(pdf_repr(dict_obj))
521 for obj in objs:
522 f.write(pdf_repr(obj))
523 if stream is not None:
524 f.write(b"stream\n")
525 f.write(stream)
526 f.write(b"\nendstream\n")
527 f.write(b"endobj\n")
528 return ref
529
530 def del_root(self) -> None:
531 if self.root_ref is None:
532 return
533 del self.xref_table[self.root_ref.object_id]
534 del self.xref_table[self.root[b"Pages"].object_id]
535
536 @staticmethod
537 def get_buf_from_file(f):
538 if hasattr(f, "getbuffer"):
539 return f.getbuffer()
540 elif hasattr(f, "getvalue"):
541 return f.getvalue()
542 else:
543 try:
544 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ)
545 except ValueError: # cannot mmap an empty file
546 return b""
547
548 def read_pdf_info(self) -> None:
549 self.file_size_total = len(self.buf)
550 self.file_size_this = self.file_size_total - self.start_offset
551 self.read_trailer()
552 self.root_ref = self.trailer_dict[b"Root"]
553 self.info_ref = self.trailer_dict.get(b"Info", None)
554 self.root = PdfDict(self.read_indirect(self.root_ref))
555 if self.info_ref is None:
556 self.info = PdfDict()
557 else:
558 self.info = PdfDict(self.read_indirect(self.info_ref))
559 check_format_condition(b"Type" in self.root, "/Type missing in Root")
560 check_format_condition(
561 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog"
562 )
563 check_format_condition(b"Pages" in self.root, "/Pages missing in Root")
564 check_format_condition(
565 isinstance(self.root[b"Pages"], IndirectReference),
566 "/Pages in Root is not an indirect reference",
567 )
568 self.pages_ref = self.root[b"Pages"]
569 self.page_tree_root = self.read_indirect(self.pages_ref)
570 self.pages = self.linearize_page_tree(self.page_tree_root)
571 # save the original list of page references
572 # in case the user modifies, adds or deletes some pages
573 # and we need to rewrite the pages and their list
574 self.orig_pages = self.pages[:]
575
576 def next_object_id(self, offset=None):
577 try:
578 # TODO: support reuse of deleted objects
579 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0)
580 except ValueError:
581 reference = IndirectReference(1, 0)
582 if offset is not None:
583 self.xref_table[reference.object_id] = (offset, 0)
584 return reference
585
586 delimiter = rb"[][()<>{}/%]"
587 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]"
588 whitespace = rb"[\000\011\012\014\015\040]"
589 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]"
590 whitespace_optional = whitespace + b"*"
591 whitespace_mandatory = whitespace + b"+"
592 # No "\012" aka "\n" or "\015" aka "\r":
593 whitespace_optional_no_nl = rb"[\000\011\014\040]*"
594 newline_only = rb"[\r\n]+"
595 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl
596 re_trailer_end = re.compile(
597 whitespace_mandatory
598 + rb"trailer"
599 + whitespace_optional
600 + rb"<<(.*>>)"
601 + newline
602 + rb"startxref"
603 + newline
604 + rb"([0-9]+)"
605 + newline
606 + rb"%%EOF"
607 + whitespace_optional
608 + rb"$",
609 re.DOTALL,
610 )
611 re_trailer_prev = re.compile(
612 whitespace_optional
613 + rb"trailer"
614 + whitespace_optional
615 + rb"<<(.*?>>)"
616 + newline
617 + rb"startxref"
618 + newline
619 + rb"([0-9]+)"
620 + newline
621 + rb"%%EOF"
622 + whitespace_optional,
623 re.DOTALL,
624 )
625
626 def read_trailer(self):
627 search_start_offset = len(self.buf) - 16384
628 if search_start_offset < self.start_offset:
629 search_start_offset = self.start_offset
630 m = self.re_trailer_end.search(self.buf, search_start_offset)
631 check_format_condition(m, "trailer end not found")
632 # make sure we found the LAST trailer
633 last_match = m
634 while m:
635 last_match = m
636 m = self.re_trailer_end.search(self.buf, m.start() + 16)
637 if not m:
638 m = last_match
639 trailer_data = m.group(1)
640 self.last_xref_section_offset = int(m.group(2))
641 self.trailer_dict = self.interpret_trailer(trailer_data)
642 self.xref_table = XrefTable()
643 self.read_xref_table(xref_section_offset=self.last_xref_section_offset)
644 if b"Prev" in self.trailer_dict:
645 self.read_prev_trailer(self.trailer_dict[b"Prev"])
646
647 def read_prev_trailer(self, xref_section_offset):
648 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset)
649 m = self.re_trailer_prev.search(
650 self.buf[trailer_offset : trailer_offset + 16384]
651 )
652 check_format_condition(m, "previous trailer not found")
653 trailer_data = m.group(1)
654 check_format_condition(
655 int(m.group(2)) == xref_section_offset,
656 "xref section offset in previous trailer doesn't match what was expected",
657 )
658 trailer_dict = self.interpret_trailer(trailer_data)
659 if b"Prev" in trailer_dict:
660 self.read_prev_trailer(trailer_dict[b"Prev"])
661
662 re_whitespace_optional = re.compile(whitespace_optional)
663 re_name = re.compile(
664 whitespace_optional
665 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?="
666 + delimiter_or_ws
667 + rb")"
668 )
669 re_dict_start = re.compile(whitespace_optional + rb"<<")
670 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional)
671
672 @classmethod
673 def interpret_trailer(cls, trailer_data):
674 trailer = {}
675 offset = 0
676 while True:
677 m = cls.re_name.match(trailer_data, offset)
678 if not m:
679 m = cls.re_dict_end.match(trailer_data, offset)
680 check_format_condition(
681 m and m.end() == len(trailer_data),
682 "name not found in trailer, remaining data: "
683 + repr(trailer_data[offset:]),
684 )
685 break
686 key = cls.interpret_name(m.group(1))
687 value, offset = cls.get_value(trailer_data, m.end())
688 trailer[key] = value
689 check_format_condition(
690 b"Size" in trailer and isinstance(trailer[b"Size"], int),
691 "/Size not in trailer or not an integer",
692 )
693 check_format_condition(
694 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference),
695 "/Root not in trailer or not an indirect reference",
696 )
697 return trailer
698
699 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?")
700
701 @classmethod
702 def interpret_name(cls, raw, as_text=False):
703 name = b""
704 for m in cls.re_hashes_in_name.finditer(raw):
705 if m.group(3):
706 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii"))
707 else:
708 name += m.group(1)
709 if as_text:
710 return name.decode("utf-8")
711 else:
712 return bytes(name)
713
714 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")")
715 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")")
716 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")")
717 re_int = re.compile(
718 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")"
719 )
720 re_real = re.compile(
721 whitespace_optional
722 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?="
723 + delimiter_or_ws
724 + rb")"
725 )
726 re_array_start = re.compile(whitespace_optional + rb"\[")
727 re_array_end = re.compile(whitespace_optional + rb"]")
728 re_string_hex = re.compile(
729 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>"
730 )
731 re_string_lit = re.compile(whitespace_optional + rb"\(")
732 re_indirect_reference = re.compile(
733 whitespace_optional
734 + rb"([-+]?[0-9]+)"
735 + whitespace_mandatory
736 + rb"([-+]?[0-9]+)"
737 + whitespace_mandatory
738 + rb"R(?="
739 + delimiter_or_ws
740 + rb")"
741 )
742 re_indirect_def_start = re.compile(
743 whitespace_optional
744 + rb"([-+]?[0-9]+)"
745 + whitespace_mandatory
746 + rb"([-+]?[0-9]+)"
747 + whitespace_mandatory
748 + rb"obj(?="
749 + delimiter_or_ws
750 + rb")"
751 )
752 re_indirect_def_end = re.compile(
753 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")"
754 )
755 re_comment = re.compile(
756 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*"
757 )
758 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n")
759 re_stream_end = re.compile(
760 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")"
761 )
762
763 @classmethod
764 def get_value(cls, data, offset, expect_indirect=None, max_nesting=-1):
765 if max_nesting == 0:
766 return None, None
767 m = cls.re_comment.match(data, offset)
768 if m:
769 offset = m.end()
770 m = cls.re_indirect_def_start.match(data, offset)
771 if m:
772 check_format_condition(
773 int(m.group(1)) > 0,
774 "indirect object definition: object ID must be greater than 0",
775 )
776 check_format_condition(
777 int(m.group(2)) >= 0,
778 "indirect object definition: generation must be non-negative",
779 )
780 check_format_condition(
781 expect_indirect is None
782 or expect_indirect
783 == IndirectReference(int(m.group(1)), int(m.group(2))),
784 "indirect object definition different than expected",
785 )
786 object, offset = cls.get_value(data, m.end(), max_nesting=max_nesting - 1)
787 if offset is None:
788 return object, None
789 m = cls.re_indirect_def_end.match(data, offset)
790 check_format_condition(m, "indirect object definition end not found")
791 return object, m.end()
792 check_format_condition(
793 not expect_indirect, "indirect object definition not found"
794 )
795 m = cls.re_indirect_reference.match(data, offset)
796 if m:
797 check_format_condition(
798 int(m.group(1)) > 0,
799 "indirect object reference: object ID must be greater than 0",
800 )
801 check_format_condition(
802 int(m.group(2)) >= 0,
803 "indirect object reference: generation must be non-negative",
804 )
805 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end()
806 m = cls.re_dict_start.match(data, offset)
807 if m:
808 offset = m.end()
809 result = {}
810 m = cls.re_dict_end.match(data, offset)
811 while not m:
812 key, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1)
813 if offset is None:
814 return result, None
815 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1)
816 result[key] = value
817 if offset is None:
818 return result, None
819 m = cls.re_dict_end.match(data, offset)
820 offset = m.end()
821 m = cls.re_stream_start.match(data, offset)
822 if m:
823 try:
824 stream_len_str = result.get(b"Length")
825 stream_len = int(stream_len_str)
826 except (TypeError, ValueError) as e:
827 msg = f"bad or missing Length in stream dict ({stream_len_str})"
828 raise PdfFormatError(msg) from e
829 stream_data = data[m.end() : m.end() + stream_len]
830 m = cls.re_stream_end.match(data, m.end() + stream_len)
831 check_format_condition(m, "stream end not found")
832 offset = m.end()
833 result = PdfStream(PdfDict(result), stream_data)
834 else:
835 result = PdfDict(result)
836 return result, offset
837 m = cls.re_array_start.match(data, offset)
838 if m:
839 offset = m.end()
840 result = []
841 m = cls.re_array_end.match(data, offset)
842 while not m:
843 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1)
844 result.append(value)
845 if offset is None:
846 return result, None
847 m = cls.re_array_end.match(data, offset)
848 return result, m.end()
849 m = cls.re_null.match(data, offset)
850 if m:
851 return None, m.end()
852 m = cls.re_true.match(data, offset)
853 if m:
854 return True, m.end()
855 m = cls.re_false.match(data, offset)
856 if m:
857 return False, m.end()
858 m = cls.re_name.match(data, offset)
859 if m:
860 return PdfName(cls.interpret_name(m.group(1))), m.end()
861 m = cls.re_int.match(data, offset)
862 if m:
863 return int(m.group(1)), m.end()
864 m = cls.re_real.match(data, offset)
865 if m:
866 # XXX Decimal instead of float???
867 return float(m.group(1)), m.end()
868 m = cls.re_string_hex.match(data, offset)
869 if m:
870 # filter out whitespace
871 hex_string = bytearray(
872 b for b in m.group(1) if b in b"0123456789abcdefABCDEF"
873 )
874 if len(hex_string) % 2 == 1:
875 # append a 0 if the length is not even - yes, at the end
876 hex_string.append(ord(b"0"))
877 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end()
878 m = cls.re_string_lit.match(data, offset)
879 if m:
880 return cls.get_literal_string(data, m.end())
881 # return None, offset # fallback (only for debugging)
882 msg = f"unrecognized object: {repr(data[offset : offset + 32])}"
883 raise PdfFormatError(msg)
884
885 re_lit_str_token = re.compile(
886 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))"
887 )
888 escaped_chars = {
889 b"n": b"\n",
890 b"r": b"\r",
891 b"t": b"\t",
892 b"b": b"\b",
893 b"f": b"\f",
894 b"(": b"(",
895 b")": b")",
896 b"\\": b"\\",
897 ord(b"n"): b"\n",
898 ord(b"r"): b"\r",
899 ord(b"t"): b"\t",
900 ord(b"b"): b"\b",
901 ord(b"f"): b"\f",
902 ord(b"("): b"(",
903 ord(b")"): b")",
904 ord(b"\\"): b"\\",
905 }
906
907 @classmethod
908 def get_literal_string(cls, data, offset):
909 nesting_depth = 0
910 result = bytearray()
911 for m in cls.re_lit_str_token.finditer(data, offset):
912 result.extend(data[offset : m.start()])
913 if m.group(1):
914 result.extend(cls.escaped_chars[m.group(1)[1]])
915 elif m.group(2):
916 result.append(int(m.group(2)[1:], 8))
917 elif m.group(3):
918 pass
919 elif m.group(5):
920 result.extend(b"\n")
921 elif m.group(6):
922 result.extend(b"(")
923 nesting_depth += 1
924 elif m.group(7):
925 if nesting_depth == 0:
926 return bytes(result), m.end()
927 result.extend(b")")
928 nesting_depth -= 1
929 offset = m.end()
930 msg = "unfinished literal string"
931 raise PdfFormatError(msg)
932
933 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline)
934 re_xref_subsection_start = re.compile(
935 whitespace_optional
936 + rb"([0-9]+)"
937 + whitespace_mandatory
938 + rb"([0-9]+)"
939 + whitespace_optional
940 + newline_only
941 )
942 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)")
943
944 def read_xref_table(self, xref_section_offset):
945 subsection_found = False
946 m = self.re_xref_section_start.match(
947 self.buf, xref_section_offset + self.start_offset
948 )
949 check_format_condition(m, "xref section start not found")
950 offset = m.end()
951 while True:
952 m = self.re_xref_subsection_start.match(self.buf, offset)
953 if not m:
954 check_format_condition(
955 subsection_found, "xref subsection start not found"
956 )
957 break
958 subsection_found = True
959 offset = m.end()
960 first_object = int(m.group(1))
961 num_objects = int(m.group(2))
962 for i in range(first_object, first_object + num_objects):
963 m = self.re_xref_entry.match(self.buf, offset)
964 check_format_condition(m, "xref entry not found")
965 offset = m.end()
966 is_free = m.group(3) == b"f"
967 if not is_free:
968 generation = int(m.group(2))
969 new_entry = (int(m.group(1)), generation)
970 if i not in self.xref_table:
971 self.xref_table[i] = new_entry
972 return offset
973
974 def read_indirect(self, ref, max_nesting=-1):
975 offset, generation = self.xref_table[ref[0]]
976 check_format_condition(
977 generation == ref[1],
978 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref "
979 f"table, instead found generation {generation} at offset {offset}",
980 )
981 value = self.get_value(
982 self.buf,
983 offset + self.start_offset,
984 expect_indirect=IndirectReference(*ref),
985 max_nesting=max_nesting,
986 )[0]
987 self.cached_objects[ref] = value
988 return value
989
990 def linearize_page_tree(self, node=None):
991 if node is None:
992 node = self.page_tree_root
993 check_format_condition(
994 node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages"
995 )
996 pages = []
997 for kid in node[b"Kids"]:
998 kid_object = self.read_indirect(kid)
999 if kid_object[b"Type"] == b"Page":
1000 pages.append(kid)
1001 else:
1002 pages.extend(self.linearize_page_tree(node=kid_object))
1003 return pages