1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from collections.abc import Iterable, Sequence
36from io import BytesIO
37from math import ceil
38from typing import (
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
45
46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
47from .._utils import (
48 WHITESPACES,
49 StreamType,
50 deprecation_no_replacement,
51 logger_warning,
52 read_non_whitespace,
53 read_until_regex,
54 read_until_whitespace,
55 skip_over_comment,
56)
57from ..constants import (
58 CheckboxRadioButtonAttributes,
59 FieldDictionaryAttributes,
60 OutlineFontFlag,
61)
62from ..constants import FilterTypes as FT
63from ..constants import StreamAttributes as SA
64from ..constants import TypArguments as TA
65from ..constants import TypFitArguments as TF
66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError
67from ._base import (
68 BooleanObject,
69 ByteStringObject,
70 FloatObject,
71 IndirectObject,
72 NameObject,
73 NullObject,
74 NumberObject,
75 PdfObject,
76 TextStringObject,
77 is_null_or_none,
78)
79from ._fit import Fit
80from ._image_inline import (
81 extract_inline_A85,
82 extract_inline_AHx,
83 extract_inline_DCT,
84 extract_inline_default,
85 extract_inline_RL,
86)
87from ._utils import read_hex_string_from_stream, read_string_from_stream
88
89if sys.version_info >= (3, 11):
90 from typing import Self
91else:
92 from typing_extensions import Self
93
94logger = logging.getLogger(__name__)
95IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
96
97
98class ArrayObject(list[Any], PdfObject):
99 def replicate(
100 self,
101 pdf_dest: PdfWriterProtocol,
102 ) -> "ArrayObject":
103 arr = cast(
104 "ArrayObject",
105 self._reference_clone(ArrayObject(), pdf_dest, False),
106 )
107 for data in self:
108 if hasattr(data, "replicate"):
109 arr.append(data.replicate(pdf_dest))
110 else:
111 arr.append(data)
112 return arr
113
114 def clone(
115 self,
116 pdf_dest: PdfWriterProtocol,
117 force_duplicate: bool = False,
118 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
119 ) -> "ArrayObject":
120 """Clone object into pdf_dest."""
121 try:
122 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
123 return self
124 except Exception:
125 pass
126 arr = cast(
127 "ArrayObject",
128 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate),
129 )
130 for data in self:
131 if isinstance(data, StreamObject):
132 dup = data._reference_clone(
133 data.clone(pdf_dest, force_duplicate, ignore_fields),
134 pdf_dest,
135 force_duplicate,
136 )
137 arr.append(dup.indirect_reference)
138 elif hasattr(data, "clone"):
139 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
140 else:
141 arr.append(data)
142 return arr
143
144 def hash_bin(self) -> int:
145 """
146 Used to detect modified object.
147
148 Returns:
149 Hash considering type and value.
150
151 """
152 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
153
154 def items(self) -> Iterable[Any]:
155 """Emulate DictionaryObject.items for a list (index, object)."""
156 return enumerate(self)
157
158 def _to_lst(self, lst: Any) -> list[Any]:
159 # Convert to list, internal
160 if isinstance(lst, (list, tuple, set)):
161 pass
162 elif isinstance(lst, PdfObject):
163 lst = [lst]
164 elif isinstance(lst, str):
165 if lst[0] == "/":
166 lst = [NameObject(lst)]
167 else:
168 lst = [TextStringObject(lst)]
169 elif isinstance(lst, bytes):
170 lst = [ByteStringObject(lst)]
171 else: # for numbers,...
172 lst = [lst]
173 return lst
174
175 def __add__(self, lst: Any) -> "ArrayObject":
176 """
177 Allow extension by adding list or add one element only
178
179 Args:
180 lst: any list, tuples are extended the list.
181 other types(numbers,...) will be appended.
182 if str is passed it will be converted into TextStringObject
183 or NameObject (if starting with "/")
184 if bytes is passed it will be converted into ByteStringObject
185
186 Returns:
187 ArrayObject with all elements
188
189 """
190 temp = ArrayObject(self)
191 temp.extend(self._to_lst(lst))
192 return temp
193
194 def __iadd__(self, lst: Any) -> Self:
195 """
196 Allow extension by adding list or add one element only
197
198 Args:
199 lst: any list, tuples are extended the list.
200 other types(numbers,...) will be appended.
201 if str is passed it will be converted into TextStringObject
202 or NameObject (if starting with "/")
203 if bytes is passed it will be converted into ByteStringObject
204
205 """
206 self.extend(self._to_lst(lst))
207 return self
208
209 def __isub__(self, lst: Any) -> Self:
210 """Allow to remove items"""
211 for x in self._to_lst(lst):
212 try:
213 x = self.index(x)
214 del self[x]
215 except ValueError:
216 pass
217 return self
218
219 def write_to_stream(
220 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
221 ) -> None:
222 if encryption_key is not None: # deprecated
223 deprecation_no_replacement(
224 "the encryption_key parameter of write_to_stream", "5.0.0"
225 )
226 stream.write(b"[")
227 for data in self:
228 stream.write(b" ")
229 data.write_to_stream(stream)
230 stream.write(b" ]")
231
232 @staticmethod
233 def read_from_stream(
234 stream: StreamType,
235 pdf: Optional[PdfReaderProtocol],
236 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
237 ) -> "ArrayObject":
238 arr = ArrayObject()
239 tmp = stream.read(1)
240 if tmp != b"[":
241 raise PdfReadError("Could not read array")
242 while True:
243 # skip leading whitespace
244 tok = stream.read(1)
245 while tok.isspace():
246 tok = stream.read(1)
247 if tok == b"":
248 break
249 if tok == b"%":
250 stream.seek(-1, 1)
251 skip_over_comment(stream)
252 continue
253 stream.seek(-1, 1)
254 # check for array ending
255 peek_ahead = stream.read(1)
256 if peek_ahead == b"]":
257 break
258 stream.seek(-1, 1)
259 # read and append object
260 arr.append(read_object(stream, pdf, forced_encoding))
261 return arr
262
263
264class DictionaryObject(dict[Any, Any], PdfObject):
265 def replicate(
266 self,
267 pdf_dest: PdfWriterProtocol,
268 ) -> "DictionaryObject":
269 d__ = cast(
270 "DictionaryObject",
271 self._reference_clone(self.__class__(), pdf_dest, False),
272 )
273 for k, v in self.items():
274 d__[k.replicate(pdf_dest)] = (
275 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
276 )
277 return d__
278
279 def clone(
280 self,
281 pdf_dest: PdfWriterProtocol,
282 force_duplicate: bool = False,
283 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
284 ) -> "DictionaryObject":
285 """Clone object into pdf_dest."""
286 try:
287 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
288 return self
289 except Exception:
290 pass
291
292 visited: set[tuple[int, int]] = set() # (idnum, generation)
293 d__ = cast(
294 "DictionaryObject",
295 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
296 )
297 if ignore_fields is None:
298 ignore_fields = []
299 if len(d__.keys()) == 0:
300 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
301 return d__
302
303 def _clone(
304 self,
305 src: "DictionaryObject",
306 pdf_dest: PdfWriterProtocol,
307 force_duplicate: bool,
308 ignore_fields: Optional[Sequence[Union[str, int]]],
309 visited: set[tuple[int, int]], # (idnum, generation)
310 ) -> None:
311 """
312 Update the object from src.
313
314 Args:
315 src: "DictionaryObject":
316 pdf_dest:
317 force_duplicate:
318 ignore_fields:
319
320 """
321 # first we remove for the ignore_fields
322 # that are for a limited number of levels
323 x = 0
324 assert ignore_fields is not None
325 ignore_fields = list(ignore_fields)
326 while x < len(ignore_fields):
327 if isinstance(ignore_fields[x], int):
328 if cast(int, ignore_fields[x]) <= 0:
329 del ignore_fields[x]
330 del ignore_fields[x]
331 continue
332 ignore_fields[x] -= 1 # type:ignore
333 x += 1
334 # First check if this is a chain list, we need to loop to prevent recur
335 if any(
336 field not in ignore_fields
337 and field in src
338 and isinstance(src.raw_get(field), IndirectObject)
339 and isinstance(src[field], DictionaryObject)
340 and (
341 src.get("/Type", None) is None
342 or cast(DictionaryObject, src[field]).get("/Type", None) is None
343 or src.get("/Type", None)
344 == cast(DictionaryObject, src[field]).get("/Type", None)
345 )
346 for field in ["/Next", "/Prev", "/N", "/V"]
347 ):
348 ignore_fields = list(ignore_fields)
349 for lst in (("/Next", "/Prev"), ("/N", "/V")):
350 for k in lst:
351 objs = []
352 if (
353 k in src
354 and k not in self
355 and isinstance(src.raw_get(k), IndirectObject)
356 and isinstance(src[k], DictionaryObject)
357 # IF need to go further the idea is to check
358 # that the types are the same:
359 and (
360 src.get("/Type", None) is None
361 or cast(DictionaryObject, src[k]).get("/Type", None) is None
362 or src.get("/Type", None)
363 == cast(DictionaryObject, src[k]).get("/Type", None)
364 )
365 ):
366 cur_obj: Optional[DictionaryObject] = cast(
367 "DictionaryObject", src[k]
368 )
369 prev_obj: Optional[DictionaryObject] = self
370 while cur_obj is not None:
371 clon = cast(
372 "DictionaryObject",
373 cur_obj._reference_clone(
374 cur_obj.__class__(), pdf_dest, force_duplicate
375 ),
376 )
377 # check to see if we've previously processed our item
378 if clon.indirect_reference is not None:
379 idnum = clon.indirect_reference.idnum
380 generation = clon.indirect_reference.generation
381 if (idnum, generation) in visited:
382 cur_obj = None
383 break
384 visited.add((idnum, generation))
385 objs.append((cur_obj, clon))
386 assert prev_obj is not None
387 prev_obj[NameObject(k)] = clon.indirect_reference
388 prev_obj = clon
389 try:
390 if cur_obj == src:
391 cur_obj = None
392 else:
393 cur_obj = cast("DictionaryObject", cur_obj[k])
394 except Exception:
395 cur_obj = None
396 for s, c in objs:
397 c._clone(
398 s, pdf_dest, force_duplicate, ignore_fields, visited
399 )
400
401 for k, v in src.items():
402 if k not in ignore_fields:
403 if isinstance(v, StreamObject):
404 if not hasattr(v, "indirect_reference"):
405 v.indirect_reference = None
406 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
407 assert vv.indirect_reference is not None
408 self[k.clone(pdf_dest)] = vv.indirect_reference
409 elif k not in self:
410 self[NameObject(k)] = (
411 v.clone(pdf_dest, force_duplicate, ignore_fields)
412 if hasattr(v, "clone")
413 else v
414 )
415
416 def hash_bin(self) -> int:
417 """
418 Used to detect modified object.
419
420 Returns:
421 Hash considering type and value.
422
423 """
424 return hash(
425 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
426 )
427
428 def raw_get(self, key: Any) -> Any:
429 return dict.__getitem__(self, key)
430
431 def get_inherited(self, key: str, default: Any = None) -> Any:
432 """
433 Returns the value of a key or from the parent if not found.
434 If not found returns default.
435
436 Args:
437 key: string identifying the field to return
438
439 default: default value to return
440
441 Returns:
442 Current key or inherited one, otherwise default value.
443
444 """
445 if key in self:
446 return self[key]
447 try:
448 if "/Parent" not in self:
449 return default
450 raise KeyError("Not present")
451 except KeyError:
452 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited(
453 key, default
454 )
455
456 def __setitem__(self, key: Any, value: Any) -> Any:
457 if not isinstance(key, PdfObject):
458 raise ValueError("Key must be a PdfObject")
459 if not isinstance(value, PdfObject):
460 raise ValueError("Value must be a PdfObject")
461 return dict.__setitem__(self, key, value)
462
463 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
464 if not isinstance(key, PdfObject):
465 raise ValueError("Key must be a PdfObject")
466 if not isinstance(value, PdfObject):
467 raise ValueError("Value must be a PdfObject")
468 return dict.setdefault(self, key, value)
469
470 def __getitem__(self, key: Any) -> PdfObject:
471 return dict.__getitem__(self, key).get_object()
472
473 @property
474 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
475 """
476 Retrieve XMP (Extensible Metadata Platform) data relevant to this
477 object, if available.
478
479 See Table 347 — Additional entries in a metadata stream dictionary.
480
481 Returns:
482 Returns a :class:`~pypdf.xmp.XmpInformation` instance
483 that can be used to access XMP metadata from the document. Can also
484 return None if no metadata was found on the document root.
485
486 """
487 from ..xmp import XmpInformation # noqa: PLC0415
488
489 metadata = self.get("/Metadata", None)
490 if is_null_or_none(metadata):
491 return None
492 assert metadata is not None, "mypy"
493 metadata = metadata.get_object()
494 return XmpInformation(metadata)
495
496 def write_to_stream(
497 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
498 ) -> None:
499 if encryption_key is not None: # deprecated
500 deprecation_no_replacement(
501 "the encryption_key parameter of write_to_stream", "5.0.0"
502 )
503 stream.write(b"<<\n")
504 for key, value in self.items():
505 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
506 continue
507 key.write_to_stream(stream, encryption_key)
508 stream.write(b" ")
509 value.write_to_stream(stream)
510 stream.write(b"\n")
511 stream.write(b">>")
512
513 @staticmethod
514 def read_from_stream(
515 stream: StreamType,
516 pdf: Optional[PdfReaderProtocol],
517 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
518 ) -> "DictionaryObject":
519 def get_next_obj_pos(
520 p: int, p1: int, rem_gens: list[int], pdf: PdfReaderProtocol
521 ) -> int:
522 out = p1
523 for gen in rem_gens:
524 loc = pdf.xref[gen]
525 try:
526 values = [x for x in loc.values() if p < x <= p1]
527 if values:
528 out = min(out, *values)
529 except ValueError:
530 pass
531 return out
532
533 def read_unsized_from_stream(
534 stream: StreamType, pdf: PdfReaderProtocol
535 ) -> bytes:
536 # we are just pointing at beginning of the stream
537 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1
538 curr = stream.tell()
539 rw = stream.read(eon - stream.tell())
540 p = rw.find(b"endstream")
541 if p < 0:
542 raise PdfReadError(
543 f"Unable to find 'endstream' marker for obj starting at {curr}."
544 )
545 stream.seek(curr + p + 9)
546 return rw[: p - 1]
547
548 tmp = stream.read(2)
549 if tmp != b"<<":
550 raise PdfReadError(
551 f"Dictionary read error at byte {hex(stream.tell())}: "
552 "stream must begin with '<<'"
553 )
554 data: dict[Any, Any] = {}
555 while True:
556 tok = read_non_whitespace(stream)
557 if tok == b"\x00":
558 continue
559 if tok == b"%":
560 stream.seek(-1, 1)
561 skip_over_comment(stream)
562 continue
563 if not tok:
564 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
565
566 if tok == b">":
567 stream.read(1)
568 break
569 stream.seek(-1, 1)
570 try:
571 try:
572 key = read_object(stream, pdf)
573 if isinstance(key, NullObject):
574 break
575 if not isinstance(key, NameObject):
576 raise PdfReadError(
577 f"Expecting a NameObject for key but found {key!r}"
578 )
579 except PdfReadError as exc:
580 if pdf is not None and pdf.strict:
581 raise
582 logger_warning(exc.__repr__(), __name__)
583 continue
584 tok = read_non_whitespace(stream)
585 stream.seek(-1, 1)
586 value = read_object(stream, pdf, forced_encoding)
587 except Exception as exc:
588 if pdf is not None and pdf.strict:
589 raise PdfReadError(exc.__repr__())
590 logger_warning(exc.__repr__(), __name__)
591 retval = DictionaryObject()
592 retval.update(data)
593 return retval # return partial data
594
595 if not data.get(key):
596 data[key] = value
597 else:
598 # multiple definitions of key not permitted
599 msg = (
600 f"Multiple definitions in dictionary at byte "
601 f"{hex(stream.tell())} for key {key}"
602 )
603 if pdf is not None and pdf.strict:
604 raise PdfReadError(msg)
605 logger_warning(msg, __name__)
606
607 pos = stream.tell()
608 s = read_non_whitespace(stream)
609 if s == b"s" and stream.read(5) == b"tream":
610 eol = stream.read(1)
611 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
612 # patch provided by Danial Sandler
613 while eol == b" ":
614 eol = stream.read(1)
615 if eol not in (b"\n", b"\r"):
616 raise PdfStreamError("Stream data must be followed by a newline")
617 if eol == b"\r" and stream.read(1) != b"\n":
618 stream.seek(-1, 1)
619 # this is a stream object, not a dictionary
620 if SA.LENGTH not in data:
621 if pdf is not None and pdf.strict:
622 raise PdfStreamError("Stream length not defined")
623 logger_warning(
624 f"Stream length not defined @pos={stream.tell()}", __name__
625 )
626 data[NameObject(SA.LENGTH)] = NumberObject(-1)
627 length = data[SA.LENGTH]
628 if isinstance(length, IndirectObject):
629 t = stream.tell()
630 assert pdf is not None, "mypy"
631 length = pdf.get_object(length)
632 stream.seek(t, 0)
633 if length is None: # if the PDF is damaged
634 length = -1
635 pstart = stream.tell()
636 if length > 0:
637 data["__streamdata__"] = stream.read(length)
638 else:
639 data["__streamdata__"] = read_until_regex(
640 stream, re.compile(b"endstream")
641 )
642 e = read_non_whitespace(stream)
643 ndstream = stream.read(8)
644 if (e + ndstream) != b"endstream":
645 # the odd PDF file has a length that is too long, so
646 # we need to read backwards to find the "endstream" ending.
647 # ReportLab (unknown version) generates files with this bug,
648 # and Python users into PDF files tend to be our audience.
649 # we need to do this to correct the streamdata and chop off
650 # an extra character.
651 pos = stream.tell()
652 stream.seek(-10, 1)
653 end = stream.read(9)
654 if end == b"endstream":
655 # we found it by looking back one character further.
656 data["__streamdata__"] = data["__streamdata__"][:-1]
657 elif pdf is not None and not pdf.strict:
658 stream.seek(pstart, 0)
659 data["__streamdata__"] = read_unsized_from_stream(stream, pdf)
660 pos = stream.tell()
661 else:
662 stream.seek(pos, 0)
663 raise PdfReadError(
664 "Unable to find 'endstream' marker after stream at byte "
665 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
666 )
667 else:
668 stream.seek(pos, 0)
669 if "__streamdata__" in data:
670 return StreamObject.initialize_from_dictionary(data)
671 retval = DictionaryObject()
672 retval.update(data)
673 return retval
674
675
676class TreeObject(DictionaryObject):
677 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
678 DictionaryObject.__init__(self)
679 if dct:
680 self.update(dct)
681
682 def has_children(self) -> bool:
683 return "/First" in self
684
685 def __iter__(self) -> Any:
686 return self.children()
687
688 def children(self) -> Iterable[Any]:
689 if not self.has_children():
690 return
691
692 child_ref = self[NameObject("/First")]
693 child = child_ref.get_object()
694 while True:
695 yield child
696 if child == self[NameObject("/Last")]:
697 return
698 child_ref = child.get(NameObject("/Next")) # type: ignore
699 if is_null_or_none(child_ref):
700 return
701 child = child_ref.get_object()
702
703 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
704 self.insert_child(child, None, pdf)
705
706 def inc_parent_counter_default(
707 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
708 ) -> None:
709 if is_null_or_none(parent):
710 return
711 assert parent is not None, "mypy"
712 parent = cast("TreeObject", parent.get_object())
713 if "/Count" in parent:
714 parent[NameObject("/Count")] = NumberObject(
715 max(0, cast(int, parent[NameObject("/Count")]) + n)
716 )
717 self.inc_parent_counter_default(parent.get("/Parent", None), n)
718
719 def inc_parent_counter_outline(
720 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
721 ) -> None:
722 if is_null_or_none(parent):
723 return
724 assert parent is not None, "mypy"
725 parent = cast("TreeObject", parent.get_object())
726 # BooleanObject requires comparison with == not is
727 opn = parent.get("/%is_open%", True) == True # noqa: E712
728 c = cast(int, parent.get("/Count", 0))
729 if c < 0:
730 c = abs(c)
731 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
732 if not opn:
733 return
734 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
735
736 def insert_child(
737 self,
738 child: Any,
739 before: Any,
740 pdf: PdfWriterProtocol,
741 inc_parent_counter: Optional[Callable[..., Any]] = None,
742 ) -> IndirectObject:
743 if inc_parent_counter is None:
744 inc_parent_counter = self.inc_parent_counter_default
745 child_obj = child.get_object()
746 child = child.indirect_reference # get_reference(child_obj)
747
748 prev: Optional[DictionaryObject]
749 if "/First" not in self: # no child yet
750 self[NameObject("/First")] = child
751 self[NameObject("/Count")] = NumberObject(0)
752 self[NameObject("/Last")] = child
753 child_obj[NameObject("/Parent")] = self.indirect_reference
754 inc_parent_counter(self, child_obj.get("/Count", 1))
755 if "/Next" in child_obj:
756 del child_obj["/Next"]
757 if "/Prev" in child_obj:
758 del child_obj["/Prev"]
759 return child
760 prev = cast("DictionaryObject", self["/Last"])
761
762 while prev.indirect_reference != before:
763 if "/Next" in prev:
764 prev = cast("TreeObject", prev["/Next"])
765 else: # append at the end
766 prev[NameObject("/Next")] = cast("TreeObject", child)
767 child_obj[NameObject("/Prev")] = prev.indirect_reference
768 child_obj[NameObject("/Parent")] = self.indirect_reference
769 if "/Next" in child_obj:
770 del child_obj["/Next"]
771 self[NameObject("/Last")] = child
772 inc_parent_counter(self, child_obj.get("/Count", 1))
773 return child
774 try: # insert as first or in the middle
775 assert isinstance(prev["/Prev"], DictionaryObject)
776 prev["/Prev"][NameObject("/Next")] = child
777 child_obj[NameObject("/Prev")] = prev["/Prev"]
778 except Exception: # it means we are inserting in first position
779 del child_obj["/Next"]
780 child_obj[NameObject("/Next")] = prev
781 prev[NameObject("/Prev")] = child
782 child_obj[NameObject("/Parent")] = self.indirect_reference
783 inc_parent_counter(self, child_obj.get("/Count", 1))
784 return child
785
786 def _remove_node_from_tree(
787 self, prev: Any, prev_ref: Any, cur: Any, last: Any
788 ) -> None:
789 """
790 Adjust the pointers of the linked list and tree node count.
791
792 Args:
793 prev:
794 prev_ref:
795 cur:
796 last:
797
798 """
799 next_ref = cur.get(NameObject("/Next"), None)
800 if prev is None:
801 if next_ref:
802 # Removing first tree node
803 next_obj = next_ref.get_object()
804 del next_obj[NameObject("/Prev")]
805 self[NameObject("/First")] = next_ref
806 self[NameObject("/Count")] = NumberObject(
807 self[NameObject("/Count")] - 1 # type: ignore
808 )
809
810 else:
811 # Removing only tree node
812 self[NameObject("/Count")] = NumberObject(0)
813 del self[NameObject("/First")]
814 if NameObject("/Last") in self:
815 del self[NameObject("/Last")]
816 else:
817 if next_ref:
818 # Removing middle tree node
819 next_obj = next_ref.get_object()
820 next_obj[NameObject("/Prev")] = prev_ref
821 prev[NameObject("/Next")] = next_ref
822 else:
823 # Removing last tree node
824 assert cur == last
825 del prev[NameObject("/Next")]
826 self[NameObject("/Last")] = prev_ref
827 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore
828
829 def remove_child(self, child: Any) -> None:
830 child_obj = child.get_object()
831 child = child_obj.indirect_reference
832
833 if NameObject("/Parent") not in child_obj:
834 raise ValueError("Removed child does not appear to be a tree item")
835 if child_obj[NameObject("/Parent")] != self:
836 raise ValueError("Removed child is not a member of this tree")
837
838 found = False
839 prev_ref = None
840 prev = None
841 cur_ref: Optional[Any] = self[NameObject("/First")]
842 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore
843 last_ref = self[NameObject("/Last")]
844 last = last_ref.get_object()
845 while cur is not None:
846 if cur == child_obj:
847 self._remove_node_from_tree(prev, prev_ref, cur, last)
848 found = True
849 break
850
851 # Go to the next node
852 prev_ref = cur_ref
853 prev = cur
854 if NameObject("/Next") in cur:
855 cur_ref = cur[NameObject("/Next")]
856 cur = cur_ref.get_object()
857 else:
858 cur_ref = None
859 cur = None
860
861 if not found:
862 raise ValueError("Removal couldn't find item in tree")
863
864 _reset_node_tree_relationship(child_obj)
865
866 def remove_from_tree(self) -> None:
867 """Remove the object from the tree it is in."""
868 if NameObject("/Parent") not in self:
869 raise ValueError("Removed child does not appear to be a tree item")
870 cast("TreeObject", self["/Parent"]).remove_child(self)
871
872 def empty_tree(self) -> None:
873 for child in self:
874 child_obj = child.get_object()
875 _reset_node_tree_relationship(child_obj)
876
877 if NameObject("/Count") in self:
878 del self[NameObject("/Count")]
879 if NameObject("/First") in self:
880 del self[NameObject("/First")]
881 if NameObject("/Last") in self:
882 del self[NameObject("/Last")]
883
884
885def _reset_node_tree_relationship(child_obj: Any) -> None:
886 """
887 Call this after a node has been removed from a tree.
888
889 This resets the nodes attributes in respect to that tree.
890
891 Args:
892 child_obj:
893
894 """
895 del child_obj[NameObject("/Parent")]
896 if NameObject("/Next") in child_obj:
897 del child_obj[NameObject("/Next")]
898 if NameObject("/Prev") in child_obj:
899 del child_obj[NameObject("/Prev")]
900
901
902class StreamObject(DictionaryObject):
903 def __init__(self) -> None:
904 self._data: bytes = b""
905 self.decoded_self: Optional[DecodedStreamObject] = None
906
907 def replicate(
908 self,
909 pdf_dest: PdfWriterProtocol,
910 ) -> "StreamObject":
911 d__ = cast(
912 "StreamObject",
913 self._reference_clone(self.__class__(), pdf_dest, False),
914 )
915 d__._data = self._data
916 try:
917 decoded_self = self.decoded_self
918 if decoded_self is None:
919 self.decoded_self = None
920 else:
921 self.decoded_self = cast(
922 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
923 )
924 except Exception:
925 pass
926 for k, v in self.items():
927 d__[k.replicate(pdf_dest)] = (
928 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
929 )
930 return d__
931
932 def _clone(
933 self,
934 src: DictionaryObject,
935 pdf_dest: PdfWriterProtocol,
936 force_duplicate: bool,
937 ignore_fields: Optional[Sequence[Union[str, int]]],
938 visited: set[tuple[int, int]],
939 ) -> None:
940 """
941 Update the object from src.
942
943 Args:
944 src:
945 pdf_dest:
946 force_duplicate:
947 ignore_fields:
948
949 """
950 self._data = cast("StreamObject", src)._data
951 try:
952 decoded_self = cast("StreamObject", src).decoded_self
953 if decoded_self is None:
954 self.decoded_self = None
955 else:
956 self.decoded_self = cast(
957 "DecodedStreamObject",
958 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
959 )
960 except Exception:
961 pass
962 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
963
964 def hash_bin(self) -> int:
965 """
966 Used to detect modified object.
967
968 Returns:
969 Hash considering type and value.
970
971 """
972 # Use _data to prevent errors on non-decoded streams.
973 return hash((super().hash_bin(), self._data))
974
975 def get_data(self) -> bytes:
976 return self._data
977
978 def set_data(self, data: bytes) -> None:
979 self._data = data
980
981 def hash_value_data(self) -> bytes:
982 data = super().hash_value_data()
983 data += self.get_data()
984 return data
985
986 def write_to_stream(
987 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
988 ) -> None:
989 if encryption_key is not None: # deprecated
990 deprecation_no_replacement(
991 "the encryption_key parameter of write_to_stream", "5.0.0"
992 )
993 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data))
994 DictionaryObject.write_to_stream(self, stream)
995 del self[SA.LENGTH]
996 stream.write(b"\nstream\n")
997 stream.write(self._data)
998 stream.write(b"\nendstream")
999
1000 @staticmethod
1001 def initialize_from_dictionary(
1002 data: dict[str, Any]
1003 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1004 retval: Union[EncodedStreamObject, DecodedStreamObject]
1005 if SA.FILTER in data:
1006 retval = EncodedStreamObject()
1007 else:
1008 retval = DecodedStreamObject()
1009 retval._data = data["__streamdata__"]
1010 del data["__streamdata__"]
1011 if SA.LENGTH in data:
1012 del data[SA.LENGTH]
1013 retval.update(data)
1014 return retval
1015
1016 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1017 from ..filters import FlateDecode # noqa: PLC0415
1018
1019 if SA.FILTER in self:
1020 f = self[SA.FILTER]
1021 if isinstance(f, ArrayObject):
1022 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1023 try:
1024 params = ArrayObject(
1025 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())]
1026 )
1027 except TypeError:
1028 # case of error where the * operator is not working (not an array
1029 params = ArrayObject(
1030 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())]
1031 )
1032 else:
1033 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1034 params = ArrayObject(
1035 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())]
1036 )
1037 else:
1038 f = NameObject(FT.FLATE_DECODE)
1039 params = None
1040 retval = EncodedStreamObject()
1041 retval.update(self)
1042 retval[NameObject(SA.FILTER)] = f
1043 if params is not None:
1044 retval[NameObject(SA.DECODE_PARMS)] = params
1045 retval._data = FlateDecode.encode(self._data, level)
1046 return retval
1047
1048 def decode_as_image(self) -> Any:
1049 """
1050 Try to decode the stream object as an image
1051
1052 Returns:
1053 a PIL image if proper decoding has been found
1054 Raises:
1055 Exception: (any)during decoding to to invalid object or
1056 errors during decoding will be reported
1057 It is recommended to catch exceptions to prevent
1058 stops in your program.
1059
1060 """
1061 from ..filters import _xobj_to_image # noqa: PLC0415
1062
1063 if self.get("/Subtype", "") != "/Image":
1064 try:
1065 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover
1066 except AttributeError:
1067 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover
1068 logger_warning(msg, __name__)
1069 extension, byte_stream, img = _xobj_to_image(self)
1070 if extension is None:
1071 return None # pragma: no cover
1072 return img
1073
1074
1075class DecodedStreamObject(StreamObject):
1076 pass
1077
1078
1079class EncodedStreamObject(StreamObject):
1080 def __init__(self) -> None:
1081 self.decoded_self: Optional[DecodedStreamObject] = None
1082
1083 # This overrides the parent method
1084 def get_data(self) -> bytes:
1085 from ..filters import decode_stream_data # noqa: PLC0415
1086
1087 if self.decoded_self is not None:
1088 # Cached version of decoded object
1089 return self.decoded_self.get_data()
1090
1091 # Create decoded object
1092 decoded = DecodedStreamObject()
1093 decoded.set_data(decode_stream_data(self))
1094 for key, value in self.items():
1095 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS):
1096 decoded[key] = value
1097 self.decoded_self = decoded
1098 return decoded.get_data()
1099
1100 # This overrides the parent method:
1101 def set_data(self, data: bytes) -> None:
1102 from ..filters import FlateDecode # noqa: PLC0415
1103
1104 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1105 if not isinstance(data, bytes):
1106 raise TypeError("Data must be bytes")
1107 if self.decoded_self is None:
1108 self.get_data() # to create self.decoded_self
1109 assert self.decoded_self is not None, "mypy"
1110 self.decoded_self.set_data(data)
1111 super().set_data(FlateDecode.encode(data))
1112 else:
1113 raise PdfReadError(
1114 "Streams encoded with a filter different from FlateDecode are not supported"
1115 )
1116
1117
1118class ContentStream(DecodedStreamObject):
1119 """
1120 In order to be fast, this data structure can contain either:
1121
1122 * raw data in ._data
1123 * parsed stream operations in ._operations.
1124
1125 At any time, ContentStream object can either have both of those fields defined,
1126 or one field defined and the other set to None.
1127
1128 These fields are "rebuilt" lazily, when accessed:
1129
1130 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1131 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1132
1133 Conversely, these fields can be invalidated:
1134
1135 * when .set_data() is called, ._operations is set to None.
1136 * when .operations is set, ._data is set to None.
1137 """
1138
1139 def __init__(
1140 self,
1141 stream: Any,
1142 pdf: Any,
1143 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1144 ) -> None:
1145 self.pdf = pdf
1146 self._operations: list[tuple[Any, bytes]] = []
1147
1148 # stream may be a StreamObject or an ArrayObject containing
1149 # StreamObjects to be concatenated together.
1150 if stream is None:
1151 super().set_data(b"")
1152 else:
1153 stream = stream.get_object()
1154 if isinstance(stream, ArrayObject):
1155 data = b""
1156 for s in stream:
1157 s_resolved = s.get_object()
1158 if isinstance(s_resolved, NullObject):
1159 continue
1160 if not isinstance(s_resolved, StreamObject):
1161 # No need to emit an exception here for now - the PDF structure
1162 # seems to already be broken beforehand in these cases.
1163 logger_warning(
1164 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.",
1165 __name__
1166 )
1167 else:
1168 data += s_resolved.get_data()
1169 if len(data) == 0 or data[-1] != b"\n":
1170 data += b"\n"
1171 super().set_data(bytes(data))
1172 else:
1173 stream_data = stream.get_data()
1174 assert stream_data is not None
1175 super().set_data(stream_data)
1176 self.forced_encoding = forced_encoding
1177
1178 def replicate(
1179 self,
1180 pdf_dest: PdfWriterProtocol,
1181 ) -> "ContentStream":
1182 d__ = cast(
1183 "ContentStream",
1184 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1185 )
1186 d__._data = self._data
1187 try:
1188 decoded_self = self.decoded_self
1189 if decoded_self is None:
1190 self.decoded_self = None
1191 else:
1192 self.decoded_self = cast(
1193 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1194 )
1195 except Exception:
1196 pass
1197 for k, v in self.items():
1198 d__[k.replicate(pdf_dest)] = (
1199 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1200 )
1201 return d__
1202 d__.set_data(self._data)
1203 d__.pdf = pdf_dest
1204 d__._operations = list(self._operations)
1205 d__.forced_encoding = self.forced_encoding
1206 return d__
1207
1208 def clone(
1209 self,
1210 pdf_dest: Any,
1211 force_duplicate: bool = False,
1212 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1213 ) -> "ContentStream":
1214 """
1215 Clone object into pdf_dest.
1216
1217 Args:
1218 pdf_dest:
1219 force_duplicate:
1220 ignore_fields:
1221
1222 Returns:
1223 The cloned ContentStream
1224
1225 """
1226 try:
1227 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
1228 return self
1229 except Exception:
1230 pass
1231
1232 visited: set[tuple[int, int]] = set()
1233 d__ = cast(
1234 "ContentStream",
1235 self._reference_clone(
1236 self.__class__(None, None), pdf_dest, force_duplicate
1237 ),
1238 )
1239 if ignore_fields is None:
1240 ignore_fields = []
1241 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1242 return d__
1243
1244 def _clone(
1245 self,
1246 src: DictionaryObject,
1247 pdf_dest: PdfWriterProtocol,
1248 force_duplicate: bool,
1249 ignore_fields: Optional[Sequence[Union[str, int]]],
1250 visited: set[tuple[int, int]],
1251 ) -> None:
1252 """
1253 Update the object from src.
1254
1255 Args:
1256 src:
1257 pdf_dest:
1258 force_duplicate:
1259 ignore_fields:
1260
1261 """
1262 src_cs = cast("ContentStream", src)
1263 super().set_data(src_cs._data)
1264 self.pdf = pdf_dest
1265 self._operations = list(src_cs._operations)
1266 self.forced_encoding = src_cs.forced_encoding
1267 # no need to call DictionaryObjection or anything
1268 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1269
1270 def _parse_content_stream(self, stream: StreamType) -> None:
1271 # 7.8.2 Content Streams
1272 stream.seek(0, 0)
1273 operands: list[Union[int, str, PdfObject]] = []
1274 while True:
1275 peek = read_non_whitespace(stream)
1276 if peek in (b"", 0):
1277 break
1278 stream.seek(-1, 1)
1279 if peek.isalpha() or peek in (b"'", b'"'):
1280 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1281 if operator == b"BI":
1282 # begin inline image - a completely different parsing
1283 # mechanism is required, of course... thanks buddy...
1284 assert operands == []
1285 ii = self._read_inline_image(stream)
1286 self._operations.append((ii, b"INLINE IMAGE"))
1287 else:
1288 self._operations.append((operands, operator))
1289 operands = []
1290 elif peek == b"%":
1291 # If we encounter a comment in the content stream, we have to
1292 # handle it here. Typically, read_object will handle
1293 # encountering a comment -- but read_object assumes that
1294 # following the comment must be the object we're trying to
1295 # read. In this case, it could be an operator instead.
1296 while peek not in (b"\r", b"\n", b""):
1297 peek = stream.read(1)
1298 else:
1299 operands.append(read_object(stream, None, self.forced_encoding))
1300
1301 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]:
1302 # begin reading just after the "BI" - begin image
1303 # first read the dictionary of settings.
1304 settings = DictionaryObject()
1305 while True:
1306 tok = read_non_whitespace(stream)
1307 stream.seek(-1, 1)
1308 if tok == b"I":
1309 # "ID" - begin of image data
1310 break
1311 key = read_object(stream, self.pdf)
1312 tok = read_non_whitespace(stream)
1313 stream.seek(-1, 1)
1314 value = read_object(stream, self.pdf)
1315 settings[key] = value
1316 # left at beginning of ID
1317 tmp = stream.read(3)
1318 assert tmp[:2] == b"ID"
1319 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1320 savpos = stream.tell()
1321 if isinstance(filtr, list):
1322 filtr = filtr[0] # used forencoding
1323 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1324 data = extract_inline_AHx(stream)
1325 elif "A85" in filtr or "ASCII85Decode" in filtr:
1326 data = extract_inline_A85(stream)
1327 elif "RL" in filtr or "RunLengthDecode" in filtr:
1328 data = extract_inline_RL(stream)
1329 elif "DCT" in filtr or "DCTDecode" in filtr:
1330 data = extract_inline_DCT(stream)
1331 elif filtr == "not set":
1332 cs = settings.get("/CS", "")
1333 if isinstance(cs, list):
1334 cs = cs[0]
1335 if "RGB" in cs:
1336 lcs = 3
1337 elif "CMYK" in cs:
1338 lcs = 4
1339 else:
1340 bits = settings.get(
1341 "/BPC",
1342 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1343 )
1344 if bits > 0:
1345 lcs = bits / 8.0
1346 else:
1347 data = extract_inline_default(stream)
1348 lcs = -1
1349 if lcs > 0:
1350 data = stream.read(
1351 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1352 )
1353 # Move to the `EI` if possible.
1354 ei = read_non_whitespace(stream)
1355 stream.seek(-1, 1)
1356 else:
1357 data = extract_inline_default(stream)
1358
1359 ei = stream.read(3)
1360 stream.seek(-1, 1)
1361 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1362 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1363 stream.seek(savpos, 0)
1364 data = extract_inline_default(stream)
1365 ei = stream.read(3)
1366 stream.seek(-1, 1)
1367 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1368 # Check the same condition again. This should never fail as
1369 # edge cases are covered by `extract_inline_default` above,
1370 # but check this ot make sure that we are behind the `EI` afterwards.
1371 raise PdfStreamError(
1372 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1373 )
1374 return {"settings": settings, "data": data}
1375
1376 # This overrides the parent method
1377 def get_data(self) -> bytes:
1378 if not self._data:
1379 new_data = BytesIO()
1380 for operands, operator in self._operations:
1381 if operator == b"INLINE IMAGE":
1382 new_data.write(b"BI")
1383 dict_text = BytesIO()
1384 operands["settings"].write_to_stream(dict_text)
1385 new_data.write(dict_text.getvalue()[2:-2])
1386 new_data.write(b"ID ")
1387 new_data.write(operands["data"])
1388 new_data.write(b"EI")
1389 else:
1390 for op in operands:
1391 op.write_to_stream(new_data)
1392 new_data.write(b" ")
1393 new_data.write(operator)
1394 new_data.write(b"\n")
1395 self._data = new_data.getvalue()
1396 return self._data
1397
1398 # This overrides the parent method
1399 def set_data(self, data: bytes) -> None:
1400 super().set_data(data)
1401 self._operations = []
1402
1403 @property
1404 def operations(self) -> list[tuple[Any, bytes]]:
1405 if not self._operations and self._data:
1406 self._parse_content_stream(BytesIO(self._data))
1407 self._data = b""
1408 return self._operations
1409
1410 @operations.setter
1411 def operations(self, operations: list[tuple[Any, bytes]]) -> None:
1412 self._operations = operations
1413 self._data = b""
1414
1415 def isolate_graphics_state(self) -> None:
1416 if self._operations:
1417 self._operations.insert(0, ([], b"q"))
1418 self._operations.append(([], b"Q"))
1419 elif self._data:
1420 self._data = b"q\n" + self._data + b"\nQ\n"
1421
1422 # This overrides the parent method
1423 def write_to_stream(
1424 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1425 ) -> None:
1426 if not self._data and self._operations:
1427 self.get_data() # this ensures ._data is rebuilt
1428 super().write_to_stream(stream, encryption_key)
1429
1430
1431def read_object(
1432 stream: StreamType,
1433 pdf: Optional[PdfReaderProtocol],
1434 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1435) -> Union[PdfObject, int, str, ContentStream]:
1436 tok = stream.read(1)
1437 stream.seek(-1, 1) # reset to start
1438 if tok == b"/":
1439 return NameObject.read_from_stream(stream, pdf)
1440 if tok == b"<":
1441 # hexadecimal string OR dictionary
1442 peek = stream.read(2)
1443 stream.seek(-2, 1) # reset to start
1444 if peek == b"<<":
1445 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1446 return read_hex_string_from_stream(stream, forced_encoding)
1447 if tok == b"[":
1448 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1449 if tok in (b"t", b"f"):
1450 return BooleanObject.read_from_stream(stream)
1451 if tok == b"(":
1452 return read_string_from_stream(stream, forced_encoding)
1453 if tok == b"e" and stream.read(6) == b"endobj":
1454 return NullObject()
1455 if tok == b"n":
1456 return NullObject.read_from_stream(stream)
1457 if tok == b"%":
1458 # comment
1459 skip_over_comment(stream)
1460 tok = read_non_whitespace(stream)
1461 stream.seek(-1, 1)
1462 return read_object(stream, pdf, forced_encoding)
1463 if tok in b"0123456789+-.":
1464 # number object OR indirect reference
1465 peek = stream.read(20)
1466 stream.seek(-len(peek), 1) # reset to start
1467 if IndirectPattern.match(peek) is not None:
1468 assert pdf is not None, "mypy"
1469 return IndirectObject.read_from_stream(stream, pdf)
1470 return NumberObject.read_from_stream(stream)
1471 pos = stream.tell()
1472 stream.seek(-20, 1)
1473 stream_extract = stream.read(80)
1474 stream.seek(pos)
1475 read_until_whitespace(stream)
1476 raise PdfReadError(
1477 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1478 )
1479
1480
1481class Field(TreeObject):
1482 """
1483 A class representing a field dictionary.
1484
1485 This class is accessed through
1486 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1487 """
1488
1489 def __init__(self, data: DictionaryObject) -> None:
1490 DictionaryObject.__init__(self)
1491 field_attributes = (
1492 FieldDictionaryAttributes.attributes()
1493 + CheckboxRadioButtonAttributes.attributes()
1494 )
1495 self.indirect_reference = data.indirect_reference
1496 for attr in field_attributes:
1497 try:
1498 self[NameObject(attr)] = data[attr]
1499 except KeyError:
1500 pass
1501 if isinstance(self.get("/V"), EncodedStreamObject):
1502 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1503 if isinstance(d, bytes):
1504 d_str = d.decode()
1505 elif d is None:
1506 d_str = ""
1507 else:
1508 raise Exception("Should never happen")
1509 self[NameObject("/V")] = TextStringObject(d_str)
1510
1511 # TABLE 8.69 Entries common to all field dictionaries
1512 @property
1513 def field_type(self) -> Optional[NameObject]:
1514 """Read-only property accessing the type of this field."""
1515 return self.get(FieldDictionaryAttributes.FT)
1516
1517 @property
1518 def parent(self) -> Optional[DictionaryObject]:
1519 """Read-only property accessing the parent of this field."""
1520 return self.get(FieldDictionaryAttributes.Parent)
1521
1522 @property
1523 def kids(self) -> Optional["ArrayObject"]:
1524 """Read-only property accessing the kids of this field."""
1525 return self.get(FieldDictionaryAttributes.Kids)
1526
1527 @property
1528 def name(self) -> Optional[str]:
1529 """Read-only property accessing the name of this field."""
1530 return self.get(FieldDictionaryAttributes.T)
1531
1532 @property
1533 def alternate_name(self) -> Optional[str]:
1534 """Read-only property accessing the alternate name of this field."""
1535 return self.get(FieldDictionaryAttributes.TU)
1536
1537 @property
1538 def mapping_name(self) -> Optional[str]:
1539 """
1540 Read-only property accessing the mapping name of this field.
1541
1542 This name is used by pypdf as a key in the dictionary returned by
1543 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1544 """
1545 return self.get(FieldDictionaryAttributes.TM)
1546
1547 @property
1548 def flags(self) -> Optional[int]:
1549 """
1550 Read-only property accessing the field flags, specifying various
1551 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1552 """
1553 return self.get(FieldDictionaryAttributes.Ff)
1554
1555 @property
1556 def value(self) -> Optional[Any]:
1557 """
1558 Read-only property accessing the value of this field.
1559
1560 Format varies based on field type.
1561 """
1562 return self.get(FieldDictionaryAttributes.V)
1563
1564 @property
1565 def default_value(self) -> Optional[Any]:
1566 """Read-only property accessing the default value of this field."""
1567 return self.get(FieldDictionaryAttributes.DV)
1568
1569 @property
1570 def additional_actions(self) -> Optional[DictionaryObject]:
1571 """
1572 Read-only property accessing the additional actions dictionary.
1573
1574 This dictionary defines the field's behavior in response to trigger
1575 events. See Section 8.5.2 of the PDF 1.7 reference.
1576 """
1577 return self.get(FieldDictionaryAttributes.AA)
1578
1579
1580class Destination(TreeObject):
1581 """
1582 A class representing a destination within a PDF file.
1583
1584 See section 12.3.2 of the PDF 2.0 reference.
1585
1586 Args:
1587 title: Title of this destination.
1588 page: Reference to the page of this destination. Should
1589 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1590 fit: How the destination is displayed.
1591
1592 Raises:
1593 PdfReadError: If destination type is invalid.
1594
1595 """
1596
1597 node: Optional[
1598 DictionaryObject
1599 ] = None # node provide access to the original Object
1600
1601 def __init__(
1602 self,
1603 title: str,
1604 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1605 fit: Fit,
1606 ) -> None:
1607 self._filtered_children: list[Any] = [] # used in PdfWriter
1608
1609 typ = fit.fit_type
1610 args = fit.fit_args
1611
1612 DictionaryObject.__init__(self)
1613 self[NameObject("/Title")] = TextStringObject(title)
1614 self[NameObject("/Page")] = page
1615 self[NameObject("/Type")] = typ
1616
1617 # from table 8.2 of the PDF 1.7 reference.
1618 if typ == "/XYZ":
1619 if len(args) < 1: # left is missing : should never occur
1620 args.append(NumberObject(0.0))
1621 if len(args) < 2: # top is missing
1622 args.append(NumberObject(0.0))
1623 if len(args) < 3: # zoom is missing
1624 args.append(NumberObject(0.0))
1625 (
1626 self[NameObject(TA.LEFT)],
1627 self[NameObject(TA.TOP)],
1628 self[NameObject("/Zoom")],
1629 ) = args
1630 elif len(args) == 0:
1631 pass
1632 elif typ == TF.FIT_R:
1633 (
1634 self[NameObject(TA.LEFT)],
1635 self[NameObject(TA.BOTTOM)],
1636 self[NameObject(TA.RIGHT)],
1637 self[NameObject(TA.TOP)],
1638 ) = args
1639 elif typ in [TF.FIT_H, TF.FIT_BH]:
1640 try: # Prefer to be more robust not only to null parameters
1641 (self[NameObject(TA.TOP)],) = args
1642 except Exception:
1643 (self[NameObject(TA.TOP)],) = (NullObject(),)
1644 elif typ in [TF.FIT_V, TF.FIT_BV]:
1645 try: # Prefer to be more robust not only to null parameters
1646 (self[NameObject(TA.LEFT)],) = args
1647 except Exception:
1648 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1649 elif typ in [TF.FIT, TF.FIT_B]:
1650 pass
1651 else:
1652 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1653
1654 @property
1655 def dest_array(self) -> "ArrayObject":
1656 return ArrayObject(
1657 [self.raw_get("/Page"), self["/Type"]]
1658 + [
1659 self[x]
1660 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1661 if x in self
1662 ]
1663 )
1664
1665 def write_to_stream(
1666 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1667 ) -> None:
1668 if encryption_key is not None: # deprecated
1669 deprecation_no_replacement(
1670 "the encryption_key parameter of write_to_stream", "5.0.0"
1671 )
1672 stream.write(b"<<\n")
1673 key = NameObject("/D")
1674 key.write_to_stream(stream)
1675 stream.write(b" ")
1676 value = self.dest_array
1677 value.write_to_stream(stream)
1678
1679 key = NameObject("/S")
1680 key.write_to_stream(stream)
1681 stream.write(b" ")
1682 value_s = NameObject("/GoTo")
1683 value_s.write_to_stream(stream)
1684
1685 stream.write(b"\n")
1686 stream.write(b">>")
1687
1688 @property
1689 def title(self) -> Optional[str]:
1690 """Read-only property accessing the destination title."""
1691 return self.get("/Title")
1692
1693 @property
1694 def page(self) -> Optional[IndirectObject]:
1695 """Read-only property accessing the IndirectObject of the destination page."""
1696 return self.get("/Page")
1697
1698 @property
1699 def typ(self) -> Optional[str]:
1700 """Read-only property accessing the destination type."""
1701 return self.get("/Type")
1702
1703 @property
1704 def zoom(self) -> Optional[int]:
1705 """Read-only property accessing the zoom factor."""
1706 return self.get("/Zoom", None)
1707
1708 @property
1709 def left(self) -> Optional[FloatObject]:
1710 """Read-only property accessing the left horizontal coordinate."""
1711 return self.get("/Left", None)
1712
1713 @property
1714 def right(self) -> Optional[FloatObject]:
1715 """Read-only property accessing the right horizontal coordinate."""
1716 return self.get("/Right", None)
1717
1718 @property
1719 def top(self) -> Optional[FloatObject]:
1720 """Read-only property accessing the top vertical coordinate."""
1721 return self.get("/Top", None)
1722
1723 @property
1724 def bottom(self) -> Optional[FloatObject]:
1725 """Read-only property accessing the bottom vertical coordinate."""
1726 return self.get("/Bottom", None)
1727
1728 @property
1729 def color(self) -> Optional["ArrayObject"]:
1730 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1731 return self.get(
1732 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])
1733 )
1734
1735 @property
1736 def font_format(self) -> Optional[OutlineFontFlag]:
1737 """
1738 Read-only property accessing the font type.
1739
1740 1=italic, 2=bold, 3=both
1741 """
1742 return self.get("/F", 0)
1743
1744 @property
1745 def outline_count(self) -> Optional[int]:
1746 """
1747 Read-only property accessing the outline count.
1748
1749 positive = expanded
1750 negative = collapsed
1751 absolute value = number of visible descendants at all levels
1752 """
1753 return self.get("/Count", None)