1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from collections.abc import Iterable, Sequence
36from io import BytesIO
37from math import ceil
38from typing import (
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
45
46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
47from .._utils import (
48 WHITESPACES,
49 StreamType,
50 deprecation_no_replacement,
51 logger_warning,
52 read_non_whitespace,
53 read_until_regex,
54 read_until_whitespace,
55 skip_over_comment,
56)
57from ..constants import (
58 CheckboxRadioButtonAttributes,
59 FieldDictionaryAttributes,
60 OutlineFontFlag,
61)
62from ..constants import FilterTypes as FT
63from ..constants import StreamAttributes as SA
64from ..constants import TypArguments as TA
65from ..constants import TypFitArguments as TF
66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError
67from ._base import (
68 BooleanObject,
69 ByteStringObject,
70 FloatObject,
71 IndirectObject,
72 NameObject,
73 NullObject,
74 NumberObject,
75 PdfObject,
76 TextStringObject,
77 is_null_or_none,
78)
79from ._fit import Fit
80from ._image_inline import (
81 extract_inline_A85,
82 extract_inline_AHx,
83 extract_inline_DCT,
84 extract_inline_default,
85 extract_inline_RL,
86)
87from ._utils import read_hex_string_from_stream, read_string_from_stream
88
89if sys.version_info >= (3, 11):
90 from typing import Self
91else:
92 from typing_extensions import Self
93
94logger = logging.getLogger(__name__)
95
96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
97
98
99class ArrayObject(list[Any], PdfObject):
100 def replicate(
101 self,
102 pdf_dest: PdfWriterProtocol,
103 ) -> "ArrayObject":
104 arr = cast(
105 "ArrayObject",
106 self._reference_clone(ArrayObject(), pdf_dest, False),
107 )
108 for data in self:
109 if hasattr(data, "replicate"):
110 arr.append(data.replicate(pdf_dest))
111 else:
112 arr.append(data)
113 return arr
114
115 def clone(
116 self,
117 pdf_dest: PdfWriterProtocol,
118 force_duplicate: bool = False,
119 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
120 ) -> "ArrayObject":
121 """Clone object into pdf_dest."""
122 try:
123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
124 return self
125 except Exception:
126 pass
127 arr = cast(
128 "ArrayObject",
129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate),
130 )
131 for data in self:
132 if isinstance(data, StreamObject):
133 dup = data._reference_clone(
134 data.clone(pdf_dest, force_duplicate, ignore_fields),
135 pdf_dest,
136 force_duplicate,
137 )
138 arr.append(dup.indirect_reference)
139 elif hasattr(data, "clone"):
140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
141 else:
142 arr.append(data)
143 return arr
144
145 def hash_bin(self) -> int:
146 """
147 Used to detect modified object.
148
149 Returns:
150 Hash considering type and value.
151
152 """
153 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
154
155 def items(self) -> Iterable[Any]:
156 """Emulate DictionaryObject.items for a list (index, object)."""
157 return enumerate(self)
158
159 def _to_lst(self, lst: Any) -> list[Any]:
160 # Convert to list, internal
161 if isinstance(lst, (list, tuple, set)):
162 pass
163 elif isinstance(lst, PdfObject):
164 lst = [lst]
165 elif isinstance(lst, str):
166 if lst[0] == "/":
167 lst = [NameObject(lst)]
168 else:
169 lst = [TextStringObject(lst)]
170 elif isinstance(lst, bytes):
171 lst = [ByteStringObject(lst)]
172 else: # for numbers,...
173 lst = [lst]
174 return lst
175
176 def __add__(self, lst: Any) -> "ArrayObject":
177 """
178 Allow extension by adding list or add one element only
179
180 Args:
181 lst: any list, tuples are extended the list.
182 other types(numbers,...) will be appended.
183 if str is passed it will be converted into TextStringObject
184 or NameObject (if starting with "/")
185 if bytes is passed it will be converted into ByteStringObject
186
187 Returns:
188 ArrayObject with all elements
189
190 """
191 temp = ArrayObject(self)
192 temp.extend(self._to_lst(lst))
193 return temp
194
195 def __iadd__(self, lst: Any) -> Self:
196 """
197 Allow extension by adding list or add one element only
198
199 Args:
200 lst: any list, tuples are extended the list.
201 other types(numbers,...) will be appended.
202 if str is passed it will be converted into TextStringObject
203 or NameObject (if starting with "/")
204 if bytes is passed it will be converted into ByteStringObject
205
206 """
207 self.extend(self._to_lst(lst))
208 return self
209
210 def __isub__(self, lst: Any) -> Self:
211 """Allow to remove items"""
212 for x in self._to_lst(lst):
213 try:
214 index = self.index(x)
215 del self[index]
216 except ValueError:
217 pass
218 return self
219
220 def write_to_stream(
221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
222 ) -> None:
223 if encryption_key is not None: # deprecated
224 deprecation_no_replacement(
225 "the encryption_key parameter of write_to_stream", "5.0.0"
226 )
227 stream.write(b"[")
228 for data in self:
229 stream.write(b" ")
230 data.write_to_stream(stream)
231 stream.write(b" ]")
232
233 @staticmethod
234 def read_from_stream(
235 stream: StreamType,
236 pdf: Optional[PdfReaderProtocol],
237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
238 ) -> "ArrayObject":
239 arr = ArrayObject()
240 tmp = stream.read(1)
241 if tmp != b"[":
242 raise PdfReadError("Could not read array")
243 while True:
244 # skip leading whitespace
245 tok = stream.read(1)
246 while tok.isspace():
247 tok = stream.read(1)
248 if tok == b"":
249 break
250 if tok == b"%":
251 stream.seek(-1, 1)
252 skip_over_comment(stream)
253 continue
254 stream.seek(-1, 1)
255 # check for array ending
256 peek_ahead = stream.read(1)
257 if peek_ahead == b"]":
258 break
259 stream.seek(-1, 1)
260 # read and append object
261 arr.append(read_object(stream, pdf, forced_encoding))
262 return arr
263
264
265class DictionaryObject(dict[Any, Any], PdfObject):
266 def replicate(
267 self,
268 pdf_dest: PdfWriterProtocol,
269 ) -> "DictionaryObject":
270 d__ = cast(
271 "DictionaryObject",
272 self._reference_clone(self.__class__(), pdf_dest, False),
273 )
274 for k, v in self.items():
275 d__[k.replicate(pdf_dest)] = (
276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
277 )
278 return d__
279
280 def clone(
281 self,
282 pdf_dest: PdfWriterProtocol,
283 force_duplicate: bool = False,
284 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
285 ) -> "DictionaryObject":
286 """Clone object into pdf_dest."""
287 try:
288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
289 return self
290 except Exception:
291 pass
292
293 visited: set[tuple[int, int]] = set() # (idnum, generation)
294 d__ = cast(
295 "DictionaryObject",
296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
297 )
298 if ignore_fields is None:
299 ignore_fields = []
300 if len(d__.keys()) == 0:
301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
302 return d__
303
304 def _clone(
305 self,
306 src: "DictionaryObject",
307 pdf_dest: PdfWriterProtocol,
308 force_duplicate: bool,
309 ignore_fields: Optional[Sequence[Union[str, int]]],
310 visited: set[tuple[int, int]], # (idnum, generation)
311 ) -> None:
312 """
313 Update the object from src.
314
315 Args:
316 src: "DictionaryObject":
317 pdf_dest:
318 force_duplicate:
319 ignore_fields:
320
321 """
322 # First we remove the ignore_fields
323 # that are for a limited number of levels
324 assert ignore_fields is not None
325 ignore_fields = list(ignore_fields)
326 x = 0
327 while x < len(ignore_fields):
328 if isinstance(ignore_fields[x], int):
329 if cast(int, ignore_fields[x]) <= 0:
330 del ignore_fields[x]
331 del ignore_fields[x]
332 continue
333 ignore_fields[x] -= 1 # type:ignore
334 x += 1
335 # Check if this is a chain list, we need to loop to prevent recur
336 if any(
337 field not in ignore_fields
338 and field in src
339 and isinstance(src.raw_get(field), IndirectObject)
340 and isinstance(src[field], DictionaryObject)
341 and (
342 src.get("/Type", None) is None
343 or cast(DictionaryObject, src[field]).get("/Type", None) is None
344 or src.get("/Type", None)
345 == cast(DictionaryObject, src[field]).get("/Type", None)
346 )
347 for field in ["/Next", "/Prev", "/N", "/V"]
348 ):
349 ignore_fields = list(ignore_fields)
350 for lst in (("/Next", "/Prev"), ("/N", "/V")):
351 for k in lst:
352 objs = []
353 if (
354 k in src
355 and k not in self
356 and isinstance(src.raw_get(k), IndirectObject)
357 and isinstance(src[k], DictionaryObject)
358 # If need to go further the idea is to check
359 # that the types are the same
360 and (
361 src.get("/Type", None) is None
362 or cast(DictionaryObject, src[k]).get("/Type", None) is None
363 or src.get("/Type", None)
364 == cast(DictionaryObject, src[k]).get("/Type", None)
365 )
366 ):
367 cur_obj: Optional[DictionaryObject] = cast(
368 "DictionaryObject", src[k]
369 )
370 prev_obj: Optional[DictionaryObject] = self
371 while cur_obj is not None:
372 clon = cast(
373 "DictionaryObject",
374 cur_obj._reference_clone(
375 cur_obj.__class__(), pdf_dest, force_duplicate
376 ),
377 )
378 # Check to see if we've previously processed our item
379 if clon.indirect_reference is not None:
380 idnum = clon.indirect_reference.idnum
381 generation = clon.indirect_reference.generation
382 if (idnum, generation) in visited:
383 cur_obj = None
384 break
385 visited.add((idnum, generation))
386 objs.append((cur_obj, clon))
387 assert prev_obj is not None
388 prev_obj[NameObject(k)] = clon.indirect_reference
389 prev_obj = clon
390 try:
391 if cur_obj == src:
392 cur_obj = None
393 else:
394 cur_obj = cast("DictionaryObject", cur_obj[k])
395 except Exception:
396 cur_obj = None
397 for s, c in objs:
398 c._clone(
399 s, pdf_dest, force_duplicate, ignore_fields, visited
400 )
401
402 for k, v in src.items():
403 if k not in ignore_fields:
404 if isinstance(v, StreamObject):
405 if not hasattr(v, "indirect_reference"):
406 v.indirect_reference = None
407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
408 assert vv.indirect_reference is not None
409 self[k.clone(pdf_dest)] = vv.indirect_reference
410 elif k not in self:
411 self[NameObject(k)] = (
412 v.clone(pdf_dest, force_duplicate, ignore_fields)
413 if hasattr(v, "clone")
414 else v
415 )
416
417 def hash_bin(self) -> int:
418 """
419 Used to detect modified object.
420
421 Returns:
422 Hash considering type and value.
423
424 """
425 return hash(
426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
427 )
428
429 def raw_get(self, key: Any) -> Any:
430 return dict.__getitem__(self, key)
431
432 def get_inherited(self, key: str, default: Any = None) -> Any:
433 """
434 Returns the value of a key or from the parent if not found.
435 If not found returns default.
436
437 Args:
438 key: string identifying the field to return
439
440 default: default value to return
441
442 Returns:
443 Current key or inherited one, otherwise default value.
444
445 """
446 if key in self:
447 return self[key]
448 try:
449 if "/Parent" not in self:
450 return default
451 raise KeyError("Not present")
452 except KeyError:
453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited(
454 key, default
455 )
456
457 def __setitem__(self, key: Any, value: Any) -> Any:
458 if not isinstance(key, PdfObject):
459 raise ValueError("Key must be a PdfObject")
460 if not isinstance(value, PdfObject):
461 raise ValueError("Value must be a PdfObject")
462 return dict.__setitem__(self, key, value)
463
464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
465 if not isinstance(key, PdfObject):
466 raise ValueError("Key must be a PdfObject")
467 if not isinstance(value, PdfObject):
468 raise ValueError("Value must be a PdfObject")
469 return dict.setdefault(self, key, value)
470
471 def __getitem__(self, key: Any) -> PdfObject:
472 return dict.__getitem__(self, key).get_object()
473
474 @property
475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
476 """
477 Retrieve XMP (Extensible Metadata Platform) data relevant to this
478 object, if available.
479
480 See Table 347 — Additional entries in a metadata stream dictionary.
481
482 Returns:
483 Returns a :class:`~pypdf.xmp.XmpInformation` instance
484 that can be used to access XMP metadata from the document. Can also
485 return None if no metadata was found on the document root.
486
487 """
488 from ..xmp import XmpInformation # noqa: PLC0415
489
490 metadata = self.get("/Metadata", None)
491 if is_null_or_none(metadata):
492 return None
493 assert metadata is not None, "mypy"
494 metadata = metadata.get_object()
495 return XmpInformation(metadata)
496
497 def write_to_stream(
498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
499 ) -> None:
500 if encryption_key is not None: # deprecated
501 deprecation_no_replacement(
502 "the encryption_key parameter of write_to_stream", "5.0.0"
503 )
504 stream.write(b"<<\n")
505 for key, value in self.items():
506 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
507 continue
508 key.write_to_stream(stream, encryption_key)
509 stream.write(b" ")
510 value.write_to_stream(stream)
511 stream.write(b"\n")
512 stream.write(b">>")
513
514 @staticmethod
515 def read_from_stream(
516 stream: StreamType,
517 pdf: Optional[PdfReaderProtocol],
518 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
519 ) -> "DictionaryObject":
520 def get_next_obj_pos(
521 p: int, p1: int, rem_gens: list[int], pdf: PdfReaderProtocol
522 ) -> int:
523 out = p1
524 for gen in rem_gens:
525 loc = pdf.xref[gen]
526 try:
527 values = [x for x in loc.values() if p < x <= p1]
528 if values:
529 out = min(out, *values)
530 except ValueError:
531 pass
532 return out
533
534 def read_unsized_from_stream(
535 stream: StreamType, pdf: PdfReaderProtocol
536 ) -> bytes:
537 # we are just pointing at beginning of the stream
538 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1
539 curr = stream.tell()
540 rw = stream.read(eon - stream.tell())
541 p = rw.find(b"endstream")
542 if p < 0:
543 raise PdfReadError(
544 f"Unable to find 'endstream' marker for obj starting at {curr}."
545 )
546 stream.seek(curr + p + 9)
547 return rw[: p - 1]
548
549 tmp = stream.read(2)
550 if tmp != b"<<":
551 raise PdfReadError(
552 f"Dictionary read error at byte {hex(stream.tell())}: "
553 "stream must begin with '<<'"
554 )
555 data: dict[Any, Any] = {}
556 while True:
557 tok = read_non_whitespace(stream)
558 if tok == b"\x00":
559 continue
560 if tok == b"%":
561 stream.seek(-1, 1)
562 skip_over_comment(stream)
563 continue
564 if not tok:
565 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
566
567 if tok == b">":
568 stream.read(1)
569 break
570 stream.seek(-1, 1)
571 try:
572 try:
573 key = read_object(stream, pdf)
574 if isinstance(key, NullObject):
575 break
576 if not isinstance(key, NameObject):
577 raise PdfReadError(
578 f"Expecting a NameObject for key but found {key!r}"
579 )
580 except PdfReadError as exc:
581 if pdf is not None and pdf.strict:
582 raise
583 logger_warning(exc.__repr__(), __name__)
584 continue
585 tok = read_non_whitespace(stream)
586 stream.seek(-1, 1)
587 value = read_object(stream, pdf, forced_encoding)
588 except Exception as exc:
589 if pdf is not None and pdf.strict:
590 raise PdfReadError(exc.__repr__())
591 logger_warning(exc.__repr__(), __name__)
592 retval = DictionaryObject()
593 retval.update(data)
594 return retval # return partial data
595
596 if not data.get(key):
597 data[key] = value
598 else:
599 # multiple definitions of key not permitted
600 msg = (
601 f"Multiple definitions in dictionary at byte "
602 f"{hex(stream.tell())} for key {key}"
603 )
604 if pdf is not None and pdf.strict:
605 raise PdfReadError(msg)
606 logger_warning(msg, __name__)
607
608 pos = stream.tell()
609 s = read_non_whitespace(stream)
610 if s == b"s" and stream.read(5) == b"tream":
611 eol = stream.read(1)
612 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
613 # patch provided by Danial Sandler
614 while eol == b" ":
615 eol = stream.read(1)
616 if eol not in (b"\n", b"\r"):
617 raise PdfStreamError("Stream data must be followed by a newline")
618 if eol == b"\r" and stream.read(1) != b"\n":
619 stream.seek(-1, 1)
620 # this is a stream object, not a dictionary
621 if SA.LENGTH not in data:
622 if pdf is not None and pdf.strict:
623 raise PdfStreamError("Stream length not defined")
624 logger_warning(
625 f"Stream length not defined @pos={stream.tell()}", __name__
626 )
627 data[NameObject(SA.LENGTH)] = NumberObject(-1)
628 length = data[SA.LENGTH]
629 if isinstance(length, IndirectObject):
630 t = stream.tell()
631 assert pdf is not None, "mypy"
632 length = pdf.get_object(length)
633 stream.seek(t, 0)
634 if length is None: # if the PDF is damaged
635 length = -1
636 pstart = stream.tell()
637 if length > 0:
638 data["__streamdata__"] = stream.read(length)
639 else:
640 data["__streamdata__"] = read_until_regex(
641 stream, re.compile(b"endstream")
642 )
643 e = read_non_whitespace(stream)
644 ndstream = stream.read(8)
645 if (e + ndstream) != b"endstream":
646 # the odd PDF file has a length that is too long, so
647 # we need to read backwards to find the "endstream" ending.
648 # ReportLab (unknown version) generates files with this bug,
649 # and Python users into PDF files tend to be our audience.
650 # we need to do this to correct the streamdata and chop off
651 # an extra character.
652 pos = stream.tell()
653 stream.seek(-10, 1)
654 end = stream.read(9)
655 if end == b"endstream":
656 # we found it by looking back one character further.
657 data["__streamdata__"] = data["__streamdata__"][:-1]
658 elif pdf is not None and not pdf.strict:
659 stream.seek(pstart, 0)
660 data["__streamdata__"] = read_unsized_from_stream(stream, pdf)
661 pos = stream.tell()
662 else:
663 stream.seek(pos, 0)
664 raise PdfReadError(
665 "Unable to find 'endstream' marker after stream at byte "
666 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
667 )
668 else:
669 stream.seek(pos, 0)
670 if "__streamdata__" in data:
671 return StreamObject.initialize_from_dictionary(data)
672 retval = DictionaryObject()
673 retval.update(data)
674 return retval
675
676
677class TreeObject(DictionaryObject):
678 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
679 DictionaryObject.__init__(self)
680 if dct:
681 self.update(dct)
682
683 def has_children(self) -> bool:
684 return "/First" in self
685
686 def __iter__(self) -> Any:
687 return self.children()
688
689 def children(self) -> Iterable[Any]:
690 if not self.has_children():
691 return
692
693 child_ref = self[NameObject("/First")]
694 child = child_ref.get_object()
695 while True:
696 yield child
697 if child == self[NameObject("/Last")]:
698 return
699 child_ref = child.get(NameObject("/Next")) # type: ignore
700 if is_null_or_none(child_ref):
701 return
702 child = child_ref.get_object()
703
704 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
705 self.insert_child(child, None, pdf)
706
707 def inc_parent_counter_default(
708 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
709 ) -> None:
710 if is_null_or_none(parent):
711 return
712 assert parent is not None, "mypy"
713 parent = cast("TreeObject", parent.get_object())
714 if "/Count" in parent:
715 parent[NameObject("/Count")] = NumberObject(
716 max(0, cast(int, parent[NameObject("/Count")]) + n)
717 )
718 self.inc_parent_counter_default(parent.get("/Parent", None), n)
719
720 def inc_parent_counter_outline(
721 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
722 ) -> None:
723 if is_null_or_none(parent):
724 return
725 assert parent is not None, "mypy"
726 parent = cast("TreeObject", parent.get_object())
727 # BooleanObject requires comparison with == not is
728 opn = parent.get("/%is_open%", True) == True # noqa: E712
729 c = cast(int, parent.get("/Count", 0))
730 if c < 0:
731 c = abs(c)
732 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
733 if not opn:
734 return
735 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
736
737 def insert_child(
738 self,
739 child: Any,
740 before: Any,
741 pdf: PdfWriterProtocol,
742 inc_parent_counter: Optional[Callable[..., Any]] = None,
743 ) -> IndirectObject:
744 if inc_parent_counter is None:
745 inc_parent_counter = self.inc_parent_counter_default
746 child_obj = child.get_object()
747 child = child.indirect_reference # get_reference(child_obj)
748
749 prev: Optional[DictionaryObject]
750 if "/First" not in self: # no child yet
751 self[NameObject("/First")] = child
752 self[NameObject("/Count")] = NumberObject(0)
753 self[NameObject("/Last")] = child
754 child_obj[NameObject("/Parent")] = self.indirect_reference
755 inc_parent_counter(self, child_obj.get("/Count", 1))
756 if "/Next" in child_obj:
757 del child_obj["/Next"]
758 if "/Prev" in child_obj:
759 del child_obj["/Prev"]
760 return child
761 prev = cast("DictionaryObject", self["/Last"])
762
763 while prev.indirect_reference != before:
764 if "/Next" in prev:
765 prev = cast("TreeObject", prev["/Next"])
766 else: # append at the end
767 prev[NameObject("/Next")] = cast("TreeObject", child)
768 child_obj[NameObject("/Prev")] = prev.indirect_reference
769 child_obj[NameObject("/Parent")] = self.indirect_reference
770 if "/Next" in child_obj:
771 del child_obj["/Next"]
772 self[NameObject("/Last")] = child
773 inc_parent_counter(self, child_obj.get("/Count", 1))
774 return child
775 try: # insert as first or in the middle
776 assert isinstance(prev["/Prev"], DictionaryObject)
777 prev["/Prev"][NameObject("/Next")] = child
778 child_obj[NameObject("/Prev")] = prev["/Prev"]
779 except Exception: # it means we are inserting in first position
780 del child_obj["/Next"]
781 child_obj[NameObject("/Next")] = prev
782 prev[NameObject("/Prev")] = child
783 child_obj[NameObject("/Parent")] = self.indirect_reference
784 inc_parent_counter(self, child_obj.get("/Count", 1))
785 return child
786
787 def _remove_node_from_tree(
788 self, prev: Any, prev_ref: Any, cur: Any, last: Any
789 ) -> None:
790 """
791 Adjust the pointers of the linked list and tree node count.
792
793 Args:
794 prev:
795 prev_ref:
796 cur:
797 last:
798
799 """
800 next_ref = cur.get(NameObject("/Next"), None)
801 if prev is None:
802 if next_ref:
803 # Removing first tree node
804 next_obj = next_ref.get_object()
805 del next_obj[NameObject("/Prev")]
806 self[NameObject("/First")] = next_ref
807 self[NameObject("/Count")] = NumberObject(
808 self[NameObject("/Count")] - 1 # type: ignore
809 )
810
811 else:
812 # Removing only tree node
813 self[NameObject("/Count")] = NumberObject(0)
814 del self[NameObject("/First")]
815 if NameObject("/Last") in self:
816 del self[NameObject("/Last")]
817 else:
818 if next_ref:
819 # Removing middle tree node
820 next_obj = next_ref.get_object()
821 next_obj[NameObject("/Prev")] = prev_ref
822 prev[NameObject("/Next")] = next_ref
823 else:
824 # Removing last tree node
825 assert cur == last
826 del prev[NameObject("/Next")]
827 self[NameObject("/Last")] = prev_ref
828 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore
829
830 def remove_child(self, child: Any) -> None:
831 child_obj = child.get_object()
832 child = child_obj.indirect_reference
833
834 if NameObject("/Parent") not in child_obj:
835 raise ValueError("Removed child does not appear to be a tree item")
836 if child_obj[NameObject("/Parent")] != self:
837 raise ValueError("Removed child is not a member of this tree")
838
839 found = False
840 prev_ref = None
841 prev = None
842 cur_ref: Optional[Any] = self[NameObject("/First")]
843 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore
844 last_ref = self[NameObject("/Last")]
845 last = last_ref.get_object()
846 while cur is not None:
847 if cur == child_obj:
848 self._remove_node_from_tree(prev, prev_ref, cur, last)
849 found = True
850 break
851
852 # Go to the next node
853 prev_ref = cur_ref
854 prev = cur
855 if NameObject("/Next") in cur:
856 cur_ref = cur[NameObject("/Next")]
857 cur = cur_ref.get_object()
858 else:
859 cur_ref = None
860 cur = None
861
862 if not found:
863 raise ValueError("Removal couldn't find item in tree")
864
865 _reset_node_tree_relationship(child_obj)
866
867 def remove_from_tree(self) -> None:
868 """Remove the object from the tree it is in."""
869 if NameObject("/Parent") not in self:
870 raise ValueError("Removed child does not appear to be a tree item")
871 cast("TreeObject", self["/Parent"]).remove_child(self)
872
873 def empty_tree(self) -> None:
874 for child in self:
875 child_obj = child.get_object()
876 _reset_node_tree_relationship(child_obj)
877
878 if NameObject("/Count") in self:
879 del self[NameObject("/Count")]
880 if NameObject("/First") in self:
881 del self[NameObject("/First")]
882 if NameObject("/Last") in self:
883 del self[NameObject("/Last")]
884
885
886def _reset_node_tree_relationship(child_obj: Any) -> None:
887 """
888 Call this after a node has been removed from a tree.
889
890 This resets the nodes attributes in respect to that tree.
891
892 Args:
893 child_obj:
894
895 """
896 del child_obj[NameObject("/Parent")]
897 if NameObject("/Next") in child_obj:
898 del child_obj[NameObject("/Next")]
899 if NameObject("/Prev") in child_obj:
900 del child_obj[NameObject("/Prev")]
901
902
903class StreamObject(DictionaryObject):
904 def __init__(self) -> None:
905 self._data: bytes = b""
906 self.decoded_self: Optional[DecodedStreamObject] = None
907
908 def replicate(
909 self,
910 pdf_dest: PdfWriterProtocol,
911 ) -> "StreamObject":
912 d__ = cast(
913 "StreamObject",
914 self._reference_clone(self.__class__(), pdf_dest, False),
915 )
916 d__._data = self._data
917 try:
918 decoded_self = self.decoded_self
919 if decoded_self is None:
920 self.decoded_self = None
921 else:
922 self.decoded_self = cast(
923 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
924 )
925 except Exception:
926 pass
927 for k, v in self.items():
928 d__[k.replicate(pdf_dest)] = (
929 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
930 )
931 return d__
932
933 def _clone(
934 self,
935 src: DictionaryObject,
936 pdf_dest: PdfWriterProtocol,
937 force_duplicate: bool,
938 ignore_fields: Optional[Sequence[Union[str, int]]],
939 visited: set[tuple[int, int]],
940 ) -> None:
941 """
942 Update the object from src.
943
944 Args:
945 src:
946 pdf_dest:
947 force_duplicate:
948 ignore_fields:
949
950 """
951 self._data = cast("StreamObject", src)._data
952 try:
953 decoded_self = cast("StreamObject", src).decoded_self
954 if decoded_self is None:
955 self.decoded_self = None
956 else:
957 self.decoded_self = cast(
958 "DecodedStreamObject",
959 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
960 )
961 except Exception:
962 pass
963 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
964
965 def hash_bin(self) -> int:
966 """
967 Used to detect modified object.
968
969 Returns:
970 Hash considering type and value.
971
972 """
973 # Use _data to prevent errors on non-decoded streams.
974 return hash((super().hash_bin(), self._data))
975
976 def get_data(self) -> bytes:
977 return self._data
978
979 def set_data(self, data: bytes) -> None:
980 self._data = data
981
982 def hash_value_data(self) -> bytes:
983 data = super().hash_value_data()
984 data += self.get_data()
985 return data
986
987 def write_to_stream(
988 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
989 ) -> None:
990 if encryption_key is not None: # deprecated
991 deprecation_no_replacement(
992 "the encryption_key parameter of write_to_stream", "5.0.0"
993 )
994 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data))
995 DictionaryObject.write_to_stream(self, stream)
996 del self[SA.LENGTH]
997 stream.write(b"\nstream\n")
998 stream.write(self._data)
999 stream.write(b"\nendstream")
1000
1001 @staticmethod
1002 def initialize_from_dictionary(
1003 data: dict[str, Any]
1004 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1005 retval: Union[EncodedStreamObject, DecodedStreamObject]
1006 if SA.FILTER in data:
1007 retval = EncodedStreamObject()
1008 else:
1009 retval = DecodedStreamObject()
1010 retval._data = data["__streamdata__"]
1011 del data["__streamdata__"]
1012 if SA.LENGTH in data:
1013 del data[SA.LENGTH]
1014 retval.update(data)
1015 return retval
1016
1017 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1018 from ..filters import FlateDecode # noqa: PLC0415
1019
1020 if SA.FILTER in self:
1021 f = self[SA.FILTER]
1022 if isinstance(f, ArrayObject):
1023 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1024 try:
1025 params = ArrayObject(
1026 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())]
1027 )
1028 except TypeError:
1029 # case of error where the * operator is not working (not an array
1030 params = ArrayObject(
1031 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())]
1032 )
1033 else:
1034 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1035 params = ArrayObject(
1036 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())]
1037 )
1038 else:
1039 f = NameObject(FT.FLATE_DECODE)
1040 params = None
1041 retval = EncodedStreamObject()
1042 retval.update(self)
1043 retval[NameObject(SA.FILTER)] = f
1044 if params is not None:
1045 retval[NameObject(SA.DECODE_PARMS)] = params
1046 retval._data = FlateDecode.encode(self._data, level)
1047 return retval
1048
1049 def decode_as_image(self) -> Any:
1050 """
1051 Try to decode the stream object as an image
1052
1053 Returns:
1054 a PIL image if proper decoding has been found
1055 Raises:
1056 Exception: Errors during decoding will be reported.
1057 It is recommended to catch exceptions to prevent
1058 stops in your program.
1059
1060 """
1061 from ..filters import _xobj_to_image # noqa: PLC0415
1062
1063 if self.get("/Subtype", "") != "/Image":
1064 try:
1065 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover
1066 except AttributeError:
1067 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover
1068 logger_warning(msg, __name__)
1069 extension, _, img = _xobj_to_image(self)
1070 if extension is None:
1071 return None # pragma: no cover
1072 return img
1073
1074
1075class DecodedStreamObject(StreamObject):
1076 pass
1077
1078
1079class EncodedStreamObject(StreamObject):
1080 def __init__(self) -> None:
1081 self.decoded_self: Optional[DecodedStreamObject] = None
1082
1083 # This overrides the parent method
1084 def get_data(self) -> bytes:
1085 from ..filters import decode_stream_data # noqa: PLC0415
1086
1087 if self.decoded_self is not None:
1088 # Cached version of decoded object
1089 return self.decoded_self.get_data()
1090
1091 # Create decoded object
1092 decoded = DecodedStreamObject()
1093 decoded.set_data(decode_stream_data(self))
1094 for key, value in self.items():
1095 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS):
1096 decoded[key] = value
1097 self.decoded_self = decoded
1098 return decoded.get_data()
1099
1100 # This overrides the parent method:
1101 def set_data(self, data: bytes) -> None:
1102 from ..filters import FlateDecode # noqa: PLC0415
1103
1104 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1105 if not isinstance(data, bytes):
1106 raise TypeError("Data must be bytes")
1107 if self.decoded_self is None:
1108 self.get_data() # to create self.decoded_self
1109 assert self.decoded_self is not None, "mypy"
1110 self.decoded_self.set_data(data)
1111 super().set_data(FlateDecode.encode(data))
1112 else:
1113 raise PdfReadError(
1114 "Streams encoded with a filter different from FlateDecode are not supported"
1115 )
1116
1117
1118class ContentStream(DecodedStreamObject):
1119 """
1120 In order to be fast, this data structure can contain either:
1121
1122 * raw data in ._data
1123 * parsed stream operations in ._operations.
1124
1125 At any time, ContentStream object can either have both of those fields defined,
1126 or one field defined and the other set to None.
1127
1128 These fields are "rebuilt" lazily, when accessed:
1129
1130 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1131 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1132
1133 Conversely, these fields can be invalidated:
1134
1135 * when .set_data() is called, ._operations is set to None.
1136 * when .operations is set, ._data is set to None.
1137 """
1138
1139 def __init__(
1140 self,
1141 stream: Any,
1142 pdf: Any,
1143 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1144 ) -> None:
1145 self.pdf = pdf
1146 self._operations: list[tuple[Any, bytes]] = []
1147
1148 # stream may be a StreamObject or an ArrayObject containing
1149 # StreamObjects to be concatenated together.
1150 if stream is None:
1151 super().set_data(b"")
1152 else:
1153 stream = stream.get_object()
1154 if isinstance(stream, ArrayObject):
1155 data = b""
1156 for s in stream:
1157 s_resolved = s.get_object()
1158 if isinstance(s_resolved, NullObject):
1159 continue
1160 if not isinstance(s_resolved, StreamObject):
1161 # No need to emit an exception here for now - the PDF structure
1162 # seems to already be broken beforehand in these cases.
1163 logger_warning(
1164 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.",
1165 __name__
1166 )
1167 else:
1168 data += s_resolved.get_data()
1169 if len(data) == 0 or data[-1] != b"\n":
1170 data += b"\n"
1171 super().set_data(bytes(data))
1172 else:
1173 stream_data = stream.get_data()
1174 assert stream_data is not None
1175 super().set_data(stream_data)
1176 self.forced_encoding = forced_encoding
1177
1178 def replicate(
1179 self,
1180 pdf_dest: PdfWriterProtocol,
1181 ) -> "ContentStream":
1182 d__ = cast(
1183 "ContentStream",
1184 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1185 )
1186 d__._data = self._data
1187 try:
1188 decoded_self = self.decoded_self
1189 if decoded_self is None:
1190 self.decoded_self = None
1191 else:
1192 self.decoded_self = cast(
1193 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1194 )
1195 except Exception:
1196 pass
1197 for k, v in self.items():
1198 d__[k.replicate(pdf_dest)] = (
1199 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1200 )
1201 return d__
1202 d__.set_data(self._data)
1203 d__.pdf = pdf_dest
1204 d__._operations = list(self._operations)
1205 d__.forced_encoding = self.forced_encoding
1206 return d__
1207
1208 def clone(
1209 self,
1210 pdf_dest: Any,
1211 force_duplicate: bool = False,
1212 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1213 ) -> "ContentStream":
1214 """
1215 Clone object into pdf_dest.
1216
1217 Args:
1218 pdf_dest:
1219 force_duplicate:
1220 ignore_fields:
1221
1222 Returns:
1223 The cloned ContentStream
1224
1225 """
1226 try:
1227 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
1228 return self
1229 except Exception:
1230 pass
1231
1232 visited: set[tuple[int, int]] = set()
1233 d__ = cast(
1234 "ContentStream",
1235 self._reference_clone(
1236 self.__class__(None, None), pdf_dest, force_duplicate
1237 ),
1238 )
1239 if ignore_fields is None:
1240 ignore_fields = []
1241 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1242 return d__
1243
1244 def _clone(
1245 self,
1246 src: DictionaryObject,
1247 pdf_dest: PdfWriterProtocol,
1248 force_duplicate: bool,
1249 ignore_fields: Optional[Sequence[Union[str, int]]],
1250 visited: set[tuple[int, int]],
1251 ) -> None:
1252 """
1253 Update the object from src.
1254
1255 Args:
1256 src:
1257 pdf_dest:
1258 force_duplicate:
1259 ignore_fields:
1260
1261 """
1262 src_cs = cast("ContentStream", src)
1263 super().set_data(src_cs._data)
1264 self.pdf = pdf_dest
1265 self._operations = list(src_cs._operations)
1266 self.forced_encoding = src_cs.forced_encoding
1267 # no need to call DictionaryObjection or anything
1268 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1269
1270 def _parse_content_stream(self, stream: StreamType) -> None:
1271 # 7.8.2 Content Streams
1272 stream.seek(0, 0)
1273 operands: list[Union[int, str, PdfObject]] = []
1274 while True:
1275 peek = read_non_whitespace(stream)
1276 if peek in (b"", 0):
1277 break
1278 stream.seek(-1, 1)
1279 if peek.isalpha() or peek in (b"'", b'"'):
1280 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1281 if operator == b"BI":
1282 # begin inline image - a completely different parsing
1283 # mechanism is required, of course... thanks buddy...
1284 assert operands == []
1285 ii = self._read_inline_image(stream)
1286 self._operations.append((ii, b"INLINE IMAGE"))
1287 else:
1288 self._operations.append((operands, operator))
1289 operands = []
1290 elif peek == b"%":
1291 # If we encounter a comment in the content stream, we have to
1292 # handle it here. Typically, read_object will handle
1293 # encountering a comment -- but read_object assumes that
1294 # following the comment must be the object we're trying to
1295 # read. In this case, it could be an operator instead.
1296 while peek not in (b"\r", b"\n", b""):
1297 peek = stream.read(1)
1298 else:
1299 operands.append(read_object(stream, None, self.forced_encoding))
1300
1301 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]:
1302 # begin reading just after the "BI" - begin image
1303 # first read the dictionary of settings.
1304 settings = DictionaryObject()
1305 while True:
1306 tok = read_non_whitespace(stream)
1307 stream.seek(-1, 1)
1308 if tok == b"I":
1309 # "ID" - begin of image data
1310 break
1311 key = read_object(stream, self.pdf)
1312 tok = read_non_whitespace(stream)
1313 stream.seek(-1, 1)
1314 value = read_object(stream, self.pdf)
1315 settings[key] = value
1316 # left at beginning of ID
1317 tmp = stream.read(3)
1318 assert tmp[:2] == b"ID"
1319 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1320 savpos = stream.tell()
1321 if isinstance(filtr, list):
1322 filtr = filtr[0] # used forencoding
1323 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1324 data = extract_inline_AHx(stream)
1325 elif "A85" in filtr or "ASCII85Decode" in filtr:
1326 data = extract_inline_A85(stream)
1327 elif "RL" in filtr or "RunLengthDecode" in filtr:
1328 data = extract_inline_RL(stream)
1329 elif "DCT" in filtr or "DCTDecode" in filtr:
1330 data = extract_inline_DCT(stream)
1331 elif filtr == "not set":
1332 cs = settings.get("/CS", "")
1333 if isinstance(cs, list):
1334 cs = cs[0]
1335 if "RGB" in cs:
1336 lcs = 3
1337 elif "CMYK" in cs:
1338 lcs = 4
1339 else:
1340 bits = settings.get(
1341 "/BPC",
1342 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1343 )
1344 if bits > 0:
1345 lcs = bits / 8.0
1346 else:
1347 data = extract_inline_default(stream)
1348 lcs = -1
1349 if lcs > 0:
1350 data = stream.read(
1351 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1352 )
1353 # Move to the `EI` if possible.
1354 ei = read_non_whitespace(stream)
1355 stream.seek(-1, 1)
1356 else:
1357 data = extract_inline_default(stream)
1358
1359 ei = stream.read(3)
1360 stream.seek(-1, 1)
1361 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1362 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1363 stream.seek(savpos, 0)
1364 data = extract_inline_default(stream)
1365 ei = stream.read(3)
1366 stream.seek(-1, 1)
1367 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1368 # Check the same condition again. This should never fail as
1369 # edge cases are covered by `extract_inline_default` above,
1370 # but check this ot make sure that we are behind the `EI` afterwards.
1371 raise PdfStreamError(
1372 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1373 )
1374 return {"settings": settings, "data": data}
1375
1376 # This overrides the parent method
1377 def get_data(self) -> bytes:
1378 if not self._data:
1379 new_data = BytesIO()
1380 for operands, operator in self._operations:
1381 if operator == b"INLINE IMAGE":
1382 new_data.write(b"BI")
1383 dict_text = BytesIO()
1384 operands["settings"].write_to_stream(dict_text)
1385 new_data.write(dict_text.getvalue()[2:-2])
1386 new_data.write(b"ID ")
1387 new_data.write(operands["data"])
1388 new_data.write(b"EI")
1389 else:
1390 for op in operands:
1391 op.write_to_stream(new_data)
1392 new_data.write(b" ")
1393 new_data.write(operator)
1394 new_data.write(b"\n")
1395 self._data = new_data.getvalue()
1396 return self._data
1397
1398 # This overrides the parent method
1399 def set_data(self, data: bytes) -> None:
1400 super().set_data(data)
1401 self._operations = []
1402
1403 @property
1404 def operations(self) -> list[tuple[Any, bytes]]:
1405 if not self._operations and self._data:
1406 self._parse_content_stream(BytesIO(self._data))
1407 self._data = b""
1408 return self._operations
1409
1410 @operations.setter
1411 def operations(self, operations: list[tuple[Any, bytes]]) -> None:
1412 self._operations = operations
1413 self._data = b""
1414
1415 def isolate_graphics_state(self) -> None:
1416 if self._operations:
1417 self._operations.insert(0, ([], b"q"))
1418 self._operations.append(([], b"Q"))
1419 elif self._data:
1420 self._data = b"q\n" + self._data + b"\nQ\n"
1421
1422 # This overrides the parent method
1423 def write_to_stream(
1424 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1425 ) -> None:
1426 if not self._data and self._operations:
1427 self.get_data() # this ensures ._data is rebuilt
1428 super().write_to_stream(stream, encryption_key)
1429
1430
1431def read_object(
1432 stream: StreamType,
1433 pdf: Optional[PdfReaderProtocol],
1434 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1435) -> Union[PdfObject, int, str, ContentStream]:
1436 tok = stream.read(1)
1437 stream.seek(-1, 1) # reset to start
1438 if tok == b"/":
1439 return NameObject.read_from_stream(stream, pdf)
1440 if tok == b"<":
1441 # hexadecimal string OR dictionary
1442 peek = stream.read(2)
1443 stream.seek(-2, 1) # reset to start
1444 if peek == b"<<":
1445 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1446 return read_hex_string_from_stream(stream, forced_encoding)
1447 if tok == b"[":
1448 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1449 if tok in (b"t", b"f"):
1450 return BooleanObject.read_from_stream(stream)
1451 if tok == b"(":
1452 return read_string_from_stream(stream, forced_encoding)
1453 if tok == b"e" and stream.read(6) == b"endobj":
1454 return NullObject()
1455 if tok == b"n":
1456 return NullObject.read_from_stream(stream)
1457 if tok == b"%":
1458 # comment
1459 skip_over_comment(stream)
1460 tok = read_non_whitespace(stream)
1461 stream.seek(-1, 1)
1462 return read_object(stream, pdf, forced_encoding)
1463 if tok in b"0123456789+-.":
1464 # number object OR indirect reference
1465 peek = stream.read(20)
1466 stream.seek(-len(peek), 1) # reset to start
1467 if IndirectPattern.match(peek) is not None:
1468 assert pdf is not None, "mypy"
1469 return IndirectObject.read_from_stream(stream, pdf)
1470 return NumberObject.read_from_stream(stream)
1471 pos = stream.tell()
1472 stream.seek(-20, 1)
1473 stream_extract = stream.read(80)
1474 stream.seek(pos)
1475 read_until_whitespace(stream)
1476 raise PdfReadError(
1477 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1478 )
1479
1480
1481class Field(TreeObject):
1482 """
1483 A class representing a field dictionary.
1484
1485 This class is accessed through
1486 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1487 """
1488
1489 def __init__(self, data: DictionaryObject) -> None:
1490 DictionaryObject.__init__(self)
1491 field_attributes = (
1492 FieldDictionaryAttributes.attributes()
1493 + CheckboxRadioButtonAttributes.attributes()
1494 )
1495 self.indirect_reference = data.indirect_reference
1496 for attr in field_attributes:
1497 try:
1498 self[NameObject(attr)] = data[attr]
1499 except KeyError:
1500 pass
1501 if isinstance(self.get("/V"), EncodedStreamObject):
1502 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1503 if isinstance(d, bytes):
1504 d_str = d.decode()
1505 elif d is None:
1506 d_str = ""
1507 else:
1508 raise Exception("Should never happen")
1509 self[NameObject("/V")] = TextStringObject(d_str)
1510
1511 # TABLE 8.69 Entries common to all field dictionaries
1512 @property
1513 def field_type(self) -> Optional[NameObject]:
1514 """Read-only property accessing the type of this field."""
1515 return self.get(FieldDictionaryAttributes.FT)
1516
1517 @property
1518 def parent(self) -> Optional[DictionaryObject]:
1519 """Read-only property accessing the parent of this field."""
1520 return self.get(FieldDictionaryAttributes.Parent)
1521
1522 @property
1523 def kids(self) -> Optional["ArrayObject"]:
1524 """Read-only property accessing the kids of this field."""
1525 return self.get(FieldDictionaryAttributes.Kids)
1526
1527 @property
1528 def name(self) -> Optional[str]:
1529 """Read-only property accessing the name of this field."""
1530 return self.get(FieldDictionaryAttributes.T)
1531
1532 @property
1533 def alternate_name(self) -> Optional[str]:
1534 """Read-only property accessing the alternate name of this field."""
1535 return self.get(FieldDictionaryAttributes.TU)
1536
1537 @property
1538 def mapping_name(self) -> Optional[str]:
1539 """
1540 Read-only property accessing the mapping name of this field.
1541
1542 This name is used by pypdf as a key in the dictionary returned by
1543 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1544 """
1545 return self.get(FieldDictionaryAttributes.TM)
1546
1547 @property
1548 def flags(self) -> Optional[int]:
1549 """
1550 Read-only property accessing the field flags, specifying various
1551 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1552 """
1553 return self.get(FieldDictionaryAttributes.Ff)
1554
1555 @property
1556 def value(self) -> Optional[Any]:
1557 """
1558 Read-only property accessing the value of this field.
1559
1560 Format varies based on field type.
1561 """
1562 return self.get(FieldDictionaryAttributes.V)
1563
1564 @property
1565 def default_value(self) -> Optional[Any]:
1566 """Read-only property accessing the default value of this field."""
1567 return self.get(FieldDictionaryAttributes.DV)
1568
1569 @property
1570 def additional_actions(self) -> Optional[DictionaryObject]:
1571 """
1572 Read-only property accessing the additional actions dictionary.
1573
1574 This dictionary defines the field's behavior in response to trigger
1575 events. See Section 8.5.2 of the PDF 1.7 reference.
1576 """
1577 return self.get(FieldDictionaryAttributes.AA)
1578
1579
1580class Destination(TreeObject):
1581 """
1582 A class representing a destination within a PDF file.
1583
1584 See section 12.3.2 of the PDF 2.0 reference.
1585
1586 Args:
1587 title: Title of this destination.
1588 page: Reference to the page of this destination. Should
1589 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1590 fit: How the destination is displayed.
1591
1592 Raises:
1593 PdfReadError: If destination type is invalid.
1594
1595 """
1596
1597 node: Optional[
1598 DictionaryObject
1599 ] = None # node provide access to the original Object
1600
1601 def __init__(
1602 self,
1603 title: Union[str, bytes],
1604 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1605 fit: Fit,
1606 ) -> None:
1607 self._filtered_children: list[Any] = [] # used in PdfWriter
1608
1609 typ = fit.fit_type
1610 args = fit.fit_args
1611
1612 DictionaryObject.__init__(self)
1613 self[NameObject("/Title")] = TextStringObject(title)
1614 self[NameObject("/Page")] = page
1615 self[NameObject("/Type")] = typ
1616
1617 # from table 8.2 of the PDF 1.7 reference.
1618 if typ == "/XYZ":
1619 if len(args) < 1: # left is missing : should never occur
1620 args.append(NumberObject(0.0))
1621 if len(args) < 2: # top is missing
1622 args.append(NumberObject(0.0))
1623 if len(args) < 3: # zoom is missing
1624 args.append(NumberObject(0.0))
1625 (
1626 self[NameObject(TA.LEFT)],
1627 self[NameObject(TA.TOP)],
1628 self[NameObject("/Zoom")],
1629 ) = args
1630 elif len(args) == 0:
1631 pass
1632 elif typ == TF.FIT_R:
1633 (
1634 self[NameObject(TA.LEFT)],
1635 self[NameObject(TA.BOTTOM)],
1636 self[NameObject(TA.RIGHT)],
1637 self[NameObject(TA.TOP)],
1638 ) = args
1639 elif typ in [TF.FIT_H, TF.FIT_BH]:
1640 try: # Prefer to be more robust not only to null parameters
1641 (self[NameObject(TA.TOP)],) = args
1642 except Exception:
1643 (self[NameObject(TA.TOP)],) = (NullObject(),)
1644 elif typ in [TF.FIT_V, TF.FIT_BV]:
1645 try: # Prefer to be more robust not only to null parameters
1646 (self[NameObject(TA.LEFT)],) = args
1647 except Exception:
1648 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1649 elif typ in [TF.FIT, TF.FIT_B]:
1650 pass
1651 else:
1652 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1653
1654 @property
1655 def dest_array(self) -> "ArrayObject":
1656 return ArrayObject(
1657 [self.raw_get("/Page"), self["/Type"]]
1658 + [
1659 self[x]
1660 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1661 if x in self
1662 ]
1663 )
1664
1665 def write_to_stream(
1666 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1667 ) -> None:
1668 if encryption_key is not None: # deprecated
1669 deprecation_no_replacement(
1670 "the encryption_key parameter of write_to_stream", "5.0.0"
1671 )
1672 stream.write(b"<<\n")
1673 key = NameObject("/D")
1674 key.write_to_stream(stream)
1675 stream.write(b" ")
1676 value = self.dest_array
1677 value.write_to_stream(stream)
1678
1679 key = NameObject("/S")
1680 key.write_to_stream(stream)
1681 stream.write(b" ")
1682 value_s = NameObject("/GoTo")
1683 value_s.write_to_stream(stream)
1684
1685 stream.write(b"\n")
1686 stream.write(b">>")
1687
1688 @property
1689 def title(self) -> Optional[str]:
1690 """Read-only property accessing the destination title."""
1691 return self.get("/Title")
1692
1693 @property
1694 def page(self) -> Optional[IndirectObject]:
1695 """Read-only property accessing the IndirectObject of the destination page."""
1696 return self.get("/Page")
1697
1698 @property
1699 def typ(self) -> Optional[str]:
1700 """Read-only property accessing the destination type."""
1701 return self.get("/Type")
1702
1703 @property
1704 def zoom(self) -> Optional[int]:
1705 """Read-only property accessing the zoom factor."""
1706 return self.get("/Zoom", None)
1707
1708 @property
1709 def left(self) -> Optional[FloatObject]:
1710 """Read-only property accessing the left horizontal coordinate."""
1711 return self.get("/Left", None)
1712
1713 @property
1714 def right(self) -> Optional[FloatObject]:
1715 """Read-only property accessing the right horizontal coordinate."""
1716 return self.get("/Right", None)
1717
1718 @property
1719 def top(self) -> Optional[FloatObject]:
1720 """Read-only property accessing the top vertical coordinate."""
1721 return self.get("/Top", None)
1722
1723 @property
1724 def bottom(self) -> Optional[FloatObject]:
1725 """Read-only property accessing the bottom vertical coordinate."""
1726 return self.get("/Bottom", None)
1727
1728 @property
1729 def color(self) -> Optional["ArrayObject"]:
1730 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1731 return self.get(
1732 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])
1733 )
1734
1735 @property
1736 def font_format(self) -> Optional[OutlineFontFlag]:
1737 """
1738 Read-only property accessing the font type.
1739
1740 1=italic, 2=bold, 3=both
1741 """
1742 return self.get("/F", 0)
1743
1744 @property
1745 def outline_count(self) -> Optional[int]:
1746 """
1747 Read-only property accessing the outline count.
1748
1749 positive = expanded
1750 negative = collapsed
1751 absolute value = number of visible descendants at all levels
1752 """
1753 return self.get("/Count", None)