1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from collections.abc import Iterable, Sequence
36from io import BytesIO
37from math import ceil
38from typing import (
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
45
46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
47from .._utils import (
48 WHITESPACES,
49 StreamType,
50 deprecation_no_replacement,
51 logger_warning,
52 read_non_whitespace,
53 read_until_regex,
54 read_until_whitespace,
55 skip_over_comment,
56)
57from ..constants import (
58 CheckboxRadioButtonAttributes,
59 FieldDictionaryAttributes,
60 OutlineFontFlag,
61)
62from ..constants import FilterTypes as FT
63from ..constants import StreamAttributes as SA
64from ..constants import TypArguments as TA
65from ..constants import TypFitArguments as TF
66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError
67from ._base import (
68 BooleanObject,
69 ByteStringObject,
70 FloatObject,
71 IndirectObject,
72 NameObject,
73 NullObject,
74 NumberObject,
75 PdfObject,
76 TextStringObject,
77 is_null_or_none,
78)
79from ._fit import Fit
80from ._image_inline import (
81 extract_inline_A85,
82 extract_inline_AHx,
83 extract_inline_DCT,
84 extract_inline_default,
85 extract_inline_RL,
86)
87from ._utils import read_hex_string_from_stream, read_string_from_stream
88
89if sys.version_info >= (3, 11):
90 from typing import Self
91else:
92 from typing_extensions import Self
93
94logger = logging.getLogger(__name__)
95
96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
97
98
99class ArrayObject(list[Any], PdfObject):
100 def replicate(
101 self,
102 pdf_dest: PdfWriterProtocol,
103 ) -> "ArrayObject":
104 arr = cast(
105 "ArrayObject",
106 self._reference_clone(ArrayObject(), pdf_dest, False),
107 )
108 for data in self:
109 if hasattr(data, "replicate"):
110 arr.append(data.replicate(pdf_dest))
111 else:
112 arr.append(data)
113 return arr
114
115 def clone(
116 self,
117 pdf_dest: PdfWriterProtocol,
118 force_duplicate: bool = False,
119 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
120 ) -> "ArrayObject":
121 """Clone object into pdf_dest."""
122 try:
123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
124 return self
125 except Exception:
126 pass
127 arr = cast(
128 "ArrayObject",
129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate),
130 )
131 for data in self:
132 if isinstance(data, StreamObject):
133 dup = data._reference_clone(
134 data.clone(pdf_dest, force_duplicate, ignore_fields),
135 pdf_dest,
136 force_duplicate,
137 )
138 arr.append(dup.indirect_reference)
139 elif hasattr(data, "clone"):
140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
141 else:
142 arr.append(data)
143 return arr
144
145 def hash_bin(self) -> int:
146 """
147 Used to detect modified object.
148
149 Returns:
150 Hash considering type and value.
151
152 """
153 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
154
155 def items(self) -> Iterable[Any]:
156 """Emulate DictionaryObject.items for a list (index, object)."""
157 return enumerate(self)
158
159 def _to_lst(self, lst: Any) -> list[Any]:
160 # Convert to list, internal
161 if isinstance(lst, (list, tuple, set)):
162 pass
163 elif isinstance(lst, PdfObject):
164 lst = [lst]
165 elif isinstance(lst, str):
166 if lst[0] == "/":
167 lst = [NameObject(lst)]
168 else:
169 lst = [TextStringObject(lst)]
170 elif isinstance(lst, bytes):
171 lst = [ByteStringObject(lst)]
172 else: # for numbers,...
173 lst = [lst]
174 return lst
175
176 def __add__(self, lst: Any) -> "ArrayObject":
177 """
178 Allow extension by adding list or add one element only
179
180 Args:
181 lst: any list, tuples are extended the list.
182 other types(numbers,...) will be appended.
183 if str is passed it will be converted into TextStringObject
184 or NameObject (if starting with "/")
185 if bytes is passed it will be converted into ByteStringObject
186
187 Returns:
188 ArrayObject with all elements
189
190 """
191 temp = ArrayObject(self)
192 temp.extend(self._to_lst(lst))
193 return temp
194
195 def __iadd__(self, lst: Any) -> Self:
196 """
197 Allow extension by adding list or add one element only
198
199 Args:
200 lst: any list, tuples are extended the list.
201 other types(numbers,...) will be appended.
202 if str is passed it will be converted into TextStringObject
203 or NameObject (if starting with "/")
204 if bytes is passed it will be converted into ByteStringObject
205
206 """
207 self.extend(self._to_lst(lst))
208 return self
209
210 def __isub__(self, lst: Any) -> Self:
211 """Allow to remove items"""
212 for x in self._to_lst(lst):
213 try:
214 index = self.index(x)
215 del self[index]
216 except ValueError:
217 pass
218 return self
219
220 def write_to_stream(
221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
222 ) -> None:
223 if encryption_key is not None: # deprecated
224 deprecation_no_replacement(
225 "the encryption_key parameter of write_to_stream", "5.0.0"
226 )
227 stream.write(b"[")
228 for data in self:
229 stream.write(b" ")
230 data.write_to_stream(stream)
231 stream.write(b" ]")
232
233 @staticmethod
234 def read_from_stream(
235 stream: StreamType,
236 pdf: Optional[PdfReaderProtocol],
237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
238 ) -> "ArrayObject":
239 arr = ArrayObject()
240 tmp = stream.read(1)
241 if tmp != b"[":
242 raise PdfReadError("Could not read array")
243 while True:
244 # skip leading whitespace
245 tok = stream.read(1)
246 while tok.isspace():
247 tok = stream.read(1)
248 if tok == b"":
249 break
250 if tok == b"%":
251 stream.seek(-1, 1)
252 skip_over_comment(stream)
253 continue
254 stream.seek(-1, 1)
255 # check for array ending
256 peek_ahead = stream.read(1)
257 if peek_ahead == b"]":
258 break
259 stream.seek(-1, 1)
260 # read and append object
261 arr.append(read_object(stream, pdf, forced_encoding))
262 return arr
263
264
265class DictionaryObject(dict[Any, Any], PdfObject):
266 def replicate(
267 self,
268 pdf_dest: PdfWriterProtocol,
269 ) -> "DictionaryObject":
270 d__ = cast(
271 "DictionaryObject",
272 self._reference_clone(self.__class__(), pdf_dest, False),
273 )
274 for k, v in self.items():
275 d__[k.replicate(pdf_dest)] = (
276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
277 )
278 return d__
279
280 def clone(
281 self,
282 pdf_dest: PdfWriterProtocol,
283 force_duplicate: bool = False,
284 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
285 ) -> "DictionaryObject":
286 """Clone object into pdf_dest."""
287 try:
288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
289 return self
290 except Exception:
291 pass
292
293 visited: set[tuple[int, int]] = set() # (idnum, generation)
294 d__ = cast(
295 "DictionaryObject",
296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
297 )
298 if ignore_fields is None:
299 ignore_fields = []
300 if len(d__.keys()) == 0:
301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
302 return d__
303
304 def _clone(
305 self,
306 src: "DictionaryObject",
307 pdf_dest: PdfWriterProtocol,
308 force_duplicate: bool,
309 ignore_fields: Optional[Sequence[Union[str, int]]],
310 visited: set[tuple[int, int]], # (idnum, generation)
311 ) -> None:
312 """
313 Update the object from src.
314
315 Args:
316 src: "DictionaryObject":
317 pdf_dest:
318 force_duplicate:
319 ignore_fields:
320
321 """
322 # First we remove the ignore_fields
323 # that are for a limited number of levels
324 assert ignore_fields is not None
325 ignore_fields = list(ignore_fields)
326 x = 0
327 while x < len(ignore_fields):
328 if isinstance(ignore_fields[x], int):
329 if cast(int, ignore_fields[x]) <= 0:
330 del ignore_fields[x]
331 del ignore_fields[x]
332 continue
333 ignore_fields[x] -= 1 # type:ignore
334 x += 1
335 # Check if this is a chain list, we need to loop to prevent recur
336 if any(
337 field not in ignore_fields
338 and field in src
339 and isinstance(src.raw_get(field), IndirectObject)
340 and isinstance(src[field], DictionaryObject)
341 and (
342 src.get("/Type", None) is None
343 or cast(DictionaryObject, src[field]).get("/Type", None) is None
344 or src.get("/Type", None)
345 == cast(DictionaryObject, src[field]).get("/Type", None)
346 )
347 for field in ["/Next", "/Prev", "/N", "/V"]
348 ):
349 ignore_fields = list(ignore_fields)
350 for lst in (("/Next", "/Prev"), ("/N", "/V")):
351 for k in lst:
352 objs = []
353 if (
354 k in src
355 and k not in self
356 and isinstance(src.raw_get(k), IndirectObject)
357 and isinstance(src[k], DictionaryObject)
358 # If need to go further the idea is to check
359 # that the types are the same
360 and (
361 src.get("/Type", None) is None
362 or cast(DictionaryObject, src[k]).get("/Type", None) is None
363 or src.get("/Type", None)
364 == cast(DictionaryObject, src[k]).get("/Type", None)
365 )
366 ):
367 cur_obj: Optional[DictionaryObject] = cast(
368 "DictionaryObject", src[k]
369 )
370 prev_obj: Optional[DictionaryObject] = self
371 while cur_obj is not None:
372 clon = cast(
373 "DictionaryObject",
374 cur_obj._reference_clone(
375 cur_obj.__class__(), pdf_dest, force_duplicate
376 ),
377 )
378 # Check to see if we've previously processed our item
379 if clon.indirect_reference is not None:
380 idnum = clon.indirect_reference.idnum
381 generation = clon.indirect_reference.generation
382 if (idnum, generation) in visited:
383 cur_obj = None
384 break
385 visited.add((idnum, generation))
386 objs.append((cur_obj, clon))
387 assert prev_obj is not None
388 prev_obj[NameObject(k)] = clon.indirect_reference
389 prev_obj = clon
390 try:
391 if cur_obj == src:
392 cur_obj = None
393 else:
394 cur_obj = cast("DictionaryObject", cur_obj[k])
395 except Exception:
396 cur_obj = None
397 for s, c in objs:
398 c._clone(
399 s, pdf_dest, force_duplicate, ignore_fields, visited
400 )
401
402 for k, v in src.items():
403 if k not in ignore_fields:
404 if isinstance(v, StreamObject):
405 if not hasattr(v, "indirect_reference"):
406 v.indirect_reference = None
407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
408 assert vv.indirect_reference is not None
409 self[k.clone(pdf_dest)] = vv.indirect_reference
410 elif k not in self:
411 self[NameObject(k)] = (
412 v.clone(pdf_dest, force_duplicate, ignore_fields)
413 if hasattr(v, "clone")
414 else v
415 )
416
417 def hash_bin(self) -> int:
418 """
419 Used to detect modified object.
420
421 Returns:
422 Hash considering type and value.
423
424 """
425 return hash(
426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
427 )
428
429 def raw_get(self, key: Any) -> Any:
430 return dict.__getitem__(self, key)
431
432 def get_inherited(self, key: str, default: Any = None) -> Any:
433 """
434 Returns the value of a key or from the parent if not found.
435 If not found returns default.
436
437 Args:
438 key: string identifying the field to return
439
440 default: default value to return
441
442 Returns:
443 Current key or inherited one, otherwise default value.
444
445 """
446 if key in self:
447 return self[key]
448 try:
449 if "/Parent" not in self:
450 return default
451 raise KeyError("Not present")
452 except KeyError:
453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited(
454 key, default
455 )
456
457 def __setitem__(self, key: Any, value: Any) -> Any:
458 if not isinstance(key, PdfObject):
459 raise ValueError("Key must be a PdfObject")
460 if not isinstance(value, PdfObject):
461 raise ValueError("Value must be a PdfObject")
462 return dict.__setitem__(self, key, value)
463
464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
465 if not isinstance(key, PdfObject):
466 raise ValueError("Key must be a PdfObject")
467 if not isinstance(value, PdfObject):
468 raise ValueError("Value must be a PdfObject")
469 return dict.setdefault(self, key, value)
470
471 def __getitem__(self, key: Any) -> PdfObject:
472 return dict.__getitem__(self, key).get_object()
473
474 @property
475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
476 """
477 Retrieve XMP (Extensible Metadata Platform) data relevant to this
478 object, if available.
479
480 See Table 347 — Additional entries in a metadata stream dictionary.
481
482 Returns:
483 Returns a :class:`~pypdf.xmp.XmpInformation` instance
484 that can be used to access XMP metadata from the document. Can also
485 return None if no metadata was found on the document root.
486
487 """
488 from ..xmp import XmpInformation # noqa: PLC0415
489
490 metadata = self.get("/Metadata", None)
491 if is_null_or_none(metadata):
492 return None
493 assert metadata is not None, "mypy"
494 metadata = metadata.get_object()
495 return XmpInformation(metadata)
496
497 def write_to_stream(
498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
499 ) -> None:
500 if encryption_key is not None: # deprecated
501 deprecation_no_replacement(
502 "the encryption_key parameter of write_to_stream", "5.0.0"
503 )
504 stream.write(b"<<\n")
505 for key, value in self.items():
506 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
507 continue
508 key.write_to_stream(stream, encryption_key)
509 stream.write(b" ")
510 value.write_to_stream(stream)
511 stream.write(b"\n")
512 stream.write(b">>")
513
514 @staticmethod
515 def read_from_stream(
516 stream: StreamType,
517 pdf: Optional[PdfReaderProtocol],
518 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
519 ) -> "DictionaryObject":
520 def get_next_obj_pos(
521 p: int, p1: int, rem_gens: list[int], pdf: PdfReaderProtocol
522 ) -> int:
523 out = p1
524 for gen in rem_gens:
525 loc = pdf.xref[gen]
526 try:
527 values = [x for x in loc.values() if p < x <= p1]
528 if values:
529 out = min(out, *values)
530 except ValueError:
531 pass
532 return out
533
534 def read_unsized_from_stream(
535 stream: StreamType, pdf: PdfReaderProtocol
536 ) -> bytes:
537 # we are just pointing at beginning of the stream
538 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1
539 curr = stream.tell()
540 rw = stream.read(eon - stream.tell())
541 p = rw.find(b"endstream")
542 if p < 0:
543 raise PdfReadError(
544 f"Unable to find 'endstream' marker for obj starting at {curr}."
545 )
546 stream.seek(curr + p + 9)
547 return rw[: p - 1]
548
549 tmp = stream.read(2)
550 if tmp != b"<<":
551 raise PdfReadError(
552 f"Dictionary read error at byte {hex(stream.tell())}: "
553 "stream must begin with '<<'"
554 )
555 data: dict[Any, Any] = {}
556 while True:
557 tok = read_non_whitespace(stream)
558 if tok == b"\x00":
559 continue
560 if tok == b"%":
561 stream.seek(-1, 1)
562 skip_over_comment(stream)
563 continue
564 if not tok:
565 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
566
567 if tok == b">":
568 stream.read(1)
569 break
570 stream.seek(-1, 1)
571 try:
572 try:
573 key = read_object(stream, pdf)
574 if isinstance(key, NullObject):
575 break
576 if not isinstance(key, NameObject):
577 raise PdfReadError(
578 f"Expecting a NameObject for key but found {key!r}"
579 )
580 except PdfReadError as exc:
581 if pdf is not None and pdf.strict:
582 raise
583 logger_warning(exc.__repr__(), __name__)
584 continue
585 tok = read_non_whitespace(stream)
586 stream.seek(-1, 1)
587 value = read_object(stream, pdf, forced_encoding)
588 except Exception as exc:
589 if pdf is not None and pdf.strict:
590 raise PdfReadError(exc.__repr__())
591 logger_warning(exc.__repr__(), __name__)
592 retval = DictionaryObject()
593 retval.update(data)
594 return retval # return partial data
595
596 if not data.get(key):
597 data[key] = value
598 else:
599 # multiple definitions of key not permitted
600 msg = (
601 f"Multiple definitions in dictionary at byte "
602 f"{hex(stream.tell())} for key {key}"
603 )
604 if pdf is not None and pdf.strict:
605 raise PdfReadError(msg)
606 logger_warning(msg, __name__)
607
608 pos = stream.tell()
609 s = read_non_whitespace(stream)
610 if s == b"s" and stream.read(5) == b"tream":
611 eol = stream.read(1)
612 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
613 # patch provided by Danial Sandler
614 while eol == b" ":
615 eol = stream.read(1)
616 if eol not in (b"\n", b"\r"):
617 raise PdfStreamError("Stream data must be followed by a newline")
618 if eol == b"\r" and stream.read(1) != b"\n":
619 stream.seek(-1, 1)
620 # this is a stream object, not a dictionary
621 if SA.LENGTH not in data:
622 if pdf is not None and pdf.strict:
623 raise PdfStreamError("Stream length not defined")
624 logger_warning(
625 f"Stream length not defined @pos={stream.tell()}", __name__
626 )
627 data[NameObject(SA.LENGTH)] = NumberObject(-1)
628 length = data[SA.LENGTH]
629 if isinstance(length, IndirectObject):
630 t = stream.tell()
631 assert pdf is not None, "mypy"
632 length = pdf.get_object(length)
633 stream.seek(t, 0)
634 if length is None: # if the PDF is damaged
635 length = -1
636 pstart = stream.tell()
637 if length >= 0:
638 data["__streamdata__"] = stream.read(length)
639 else:
640 data["__streamdata__"] = read_until_regex(
641 stream, re.compile(b"endstream")
642 )
643 e = read_non_whitespace(stream)
644 ndstream = stream.read(8)
645 if (e + ndstream) != b"endstream":
646 # the odd PDF file has a length that is too long, so
647 # we need to read backwards to find the "endstream" ending.
648 # ReportLab (unknown version) generates files with this bug,
649 # and Python users into PDF files tend to be our audience.
650 # we need to do this to correct the streamdata and chop off
651 # an extra character.
652 pos = stream.tell()
653 stream.seek(-10, 1)
654 end = stream.read(9)
655 if end == b"endstream":
656 # we found it by looking back one character further.
657 data["__streamdata__"] = data["__streamdata__"][:-1]
658 elif pdf is not None and not pdf.strict:
659 stream.seek(pstart, 0)
660 data["__streamdata__"] = read_unsized_from_stream(stream, pdf)
661 pos = stream.tell()
662 else:
663 stream.seek(pos, 0)
664 raise PdfReadError(
665 "Unable to find 'endstream' marker after stream at byte "
666 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
667 )
668 else:
669 stream.seek(pos, 0)
670 if "__streamdata__" in data:
671 return StreamObject.initialize_from_dictionary(data)
672 retval = DictionaryObject()
673 retval.update(data)
674 return retval
675
676
677class TreeObject(DictionaryObject):
678 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
679 DictionaryObject.__init__(self)
680 if dct:
681 self.update(dct)
682
683 def has_children(self) -> bool:
684 return "/First" in self
685
686 def __iter__(self) -> Any:
687 return self.children()
688
689 def children(self) -> Iterable[Any]:
690 if not self.has_children():
691 return
692
693 child_ref = self[NameObject("/First")]
694 child = child_ref.get_object()
695 while True:
696 yield child
697 if child == self[NameObject("/Last")]:
698 return
699 child_ref = child.get(NameObject("/Next")) # type: ignore
700 if is_null_or_none(child_ref):
701 return
702 child = child_ref.get_object()
703
704 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
705 self.insert_child(child, None, pdf)
706
707 def inc_parent_counter_default(
708 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
709 ) -> None:
710 if is_null_or_none(parent):
711 return
712 assert parent is not None, "mypy"
713 parent = cast("TreeObject", parent.get_object())
714 if "/Count" in parent:
715 parent[NameObject("/Count")] = NumberObject(
716 max(0, cast(int, parent[NameObject("/Count")]) + n)
717 )
718 self.inc_parent_counter_default(parent.get("/Parent", None), n)
719
720 def inc_parent_counter_outline(
721 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
722 ) -> None:
723 if is_null_or_none(parent):
724 return
725 assert parent is not None, "mypy"
726 parent = cast("TreeObject", parent.get_object())
727 # BooleanObject requires comparison with == not is
728 opn = parent.get("/%is_open%", True) == True # noqa: E712
729 c = cast(int, parent.get("/Count", 0))
730 if c < 0:
731 c = abs(c)
732 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
733 if not opn:
734 return
735 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
736
737 def insert_child(
738 self,
739 child: Any,
740 before: Any,
741 pdf: PdfWriterProtocol,
742 inc_parent_counter: Optional[Callable[..., Any]] = None,
743 ) -> IndirectObject:
744 if inc_parent_counter is None:
745 inc_parent_counter = self.inc_parent_counter_default
746 child_obj = child.get_object()
747 child = child.indirect_reference # get_reference(child_obj)
748
749 prev: Optional[DictionaryObject]
750 if "/First" not in self: # no child yet
751 self[NameObject("/First")] = child
752 self[NameObject("/Count")] = NumberObject(0)
753 self[NameObject("/Last")] = child
754 child_obj[NameObject("/Parent")] = self.indirect_reference
755 inc_parent_counter(self, child_obj.get("/Count", 1))
756 if "/Next" in child_obj:
757 del child_obj["/Next"]
758 if "/Prev" in child_obj:
759 del child_obj["/Prev"]
760 return child
761 prev = cast("DictionaryObject", self["/Last"])
762
763 while prev.indirect_reference != before:
764 if "/Next" in prev:
765 prev = cast("TreeObject", prev["/Next"])
766 else: # append at the end
767 prev[NameObject("/Next")] = cast("TreeObject", child)
768 child_obj[NameObject("/Prev")] = prev.indirect_reference
769 child_obj[NameObject("/Parent")] = self.indirect_reference
770 if "/Next" in child_obj:
771 del child_obj["/Next"]
772 self[NameObject("/Last")] = child
773 inc_parent_counter(self, child_obj.get("/Count", 1))
774 return child
775 try: # insert as first or in the middle
776 assert isinstance(prev["/Prev"], DictionaryObject)
777 prev["/Prev"][NameObject("/Next")] = child
778 child_obj[NameObject("/Prev")] = prev["/Prev"]
779 except Exception: # it means we are inserting in first position
780 del child_obj["/Next"]
781 child_obj[NameObject("/Next")] = prev
782 prev[NameObject("/Prev")] = child
783 child_obj[NameObject("/Parent")] = self.indirect_reference
784 inc_parent_counter(self, child_obj.get("/Count", 1))
785 return child
786
787 def _remove_node_from_tree(
788 self, prev: Any, prev_ref: Any, cur: Any, last: Any
789 ) -> None:
790 """
791 Adjust the pointers of the linked list and tree node count.
792
793 Args:
794 prev:
795 prev_ref:
796 cur:
797 last:
798
799 """
800 next_ref = cur.get(NameObject("/Next"), None)
801 if prev is None:
802 if next_ref:
803 # Removing first tree node
804 next_obj = next_ref.get_object()
805 del next_obj[NameObject("/Prev")]
806 self[NameObject("/First")] = next_ref
807 self[NameObject("/Count")] = NumberObject(
808 self[NameObject("/Count")] - 1 # type: ignore
809 )
810
811 else:
812 # Removing only tree node
813 self[NameObject("/Count")] = NumberObject(0)
814 del self[NameObject("/First")]
815 if NameObject("/Last") in self:
816 del self[NameObject("/Last")]
817 else:
818 if next_ref:
819 # Removing middle tree node
820 next_obj = next_ref.get_object()
821 next_obj[NameObject("/Prev")] = prev_ref
822 prev[NameObject("/Next")] = next_ref
823 else:
824 # Removing last tree node
825 assert cur == last
826 del prev[NameObject("/Next")]
827 self[NameObject("/Last")] = prev_ref
828 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore
829
830 def remove_child(self, child: Any) -> None:
831 child_obj = child.get_object()
832 child = child_obj.indirect_reference
833
834 if NameObject("/Parent") not in child_obj:
835 raise ValueError("Removed child does not appear to be a tree item")
836 if child_obj[NameObject("/Parent")] != self:
837 raise ValueError("Removed child is not a member of this tree")
838
839 found = False
840 prev_ref = None
841 prev = None
842 cur_ref: Optional[Any] = self[NameObject("/First")]
843 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore
844 last_ref = self[NameObject("/Last")]
845 last = last_ref.get_object()
846 while cur is not None:
847 if cur == child_obj:
848 self._remove_node_from_tree(prev, prev_ref, cur, last)
849 found = True
850 break
851
852 # Go to the next node
853 prev_ref = cur_ref
854 prev = cur
855 if NameObject("/Next") in cur:
856 cur_ref = cur[NameObject("/Next")]
857 cur = cur_ref.get_object()
858 else:
859 cur_ref = None
860 cur = None
861
862 if not found:
863 raise ValueError("Removal couldn't find item in tree")
864
865 _reset_node_tree_relationship(child_obj)
866
867 def remove_from_tree(self) -> None:
868 """Remove the object from the tree it is in."""
869 if NameObject("/Parent") not in self:
870 raise ValueError("Removed child does not appear to be a tree item")
871 cast("TreeObject", self["/Parent"]).remove_child(self)
872
873 def empty_tree(self) -> None:
874 for child in self:
875 child_obj = child.get_object()
876 _reset_node_tree_relationship(child_obj)
877
878 if NameObject("/Count") in self:
879 del self[NameObject("/Count")]
880 if NameObject("/First") in self:
881 del self[NameObject("/First")]
882 if NameObject("/Last") in self:
883 del self[NameObject("/Last")]
884
885
886def _reset_node_tree_relationship(child_obj: Any) -> None:
887 """
888 Call this after a node has been removed from a tree.
889
890 This resets the nodes attributes in respect to that tree.
891
892 Args:
893 child_obj:
894
895 """
896 del child_obj[NameObject("/Parent")]
897 if NameObject("/Next") in child_obj:
898 del child_obj[NameObject("/Next")]
899 if NameObject("/Prev") in child_obj:
900 del child_obj[NameObject("/Prev")]
901
902
903class StreamObject(DictionaryObject):
904 def __init__(self) -> None:
905 self._data: bytes = b""
906 self.decoded_self: Optional[DecodedStreamObject] = None
907
908 def replicate(
909 self,
910 pdf_dest: PdfWriterProtocol,
911 ) -> "StreamObject":
912 d__ = cast(
913 "StreamObject",
914 self._reference_clone(self.__class__(), pdf_dest, False),
915 )
916 d__._data = self._data
917 try:
918 decoded_self = self.decoded_self
919 if decoded_self is None:
920 self.decoded_self = None
921 else:
922 self.decoded_self = cast(
923 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
924 )
925 except Exception:
926 pass
927 for k, v in self.items():
928 d__[k.replicate(pdf_dest)] = (
929 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
930 )
931 return d__
932
933 def _clone(
934 self,
935 src: DictionaryObject,
936 pdf_dest: PdfWriterProtocol,
937 force_duplicate: bool,
938 ignore_fields: Optional[Sequence[Union[str, int]]],
939 visited: set[tuple[int, int]],
940 ) -> None:
941 """
942 Update the object from src.
943
944 Args:
945 src:
946 pdf_dest:
947 force_duplicate:
948 ignore_fields:
949
950 """
951 self._data = cast("StreamObject", src)._data
952 try:
953 decoded_self = cast("StreamObject", src).decoded_self
954 if decoded_self is None:
955 self.decoded_self = None
956 else:
957 self.decoded_self = cast(
958 "DecodedStreamObject",
959 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
960 )
961 except Exception:
962 pass
963 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
964
965 def hash_bin(self) -> int:
966 """
967 Used to detect modified object.
968
969 Returns:
970 Hash considering type and value.
971
972 """
973 # Use _data to prevent errors on non-decoded streams.
974 return hash((super().hash_bin(), self._data))
975
976 def get_data(self) -> bytes:
977 return self._data
978
979 def set_data(self, data: bytes) -> None:
980 self._data = data
981
982 def hash_value_data(self) -> bytes:
983 data = super().hash_value_data()
984 data += self.get_data()
985 return data
986
987 def write_to_stream(
988 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
989 ) -> None:
990 if encryption_key is not None: # deprecated
991 deprecation_no_replacement(
992 "the encryption_key parameter of write_to_stream", "5.0.0"
993 )
994 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data))
995 DictionaryObject.write_to_stream(self, stream)
996 del self[SA.LENGTH]
997 stream.write(b"\nstream\n")
998 stream.write(self._data)
999 stream.write(b"\nendstream")
1000
1001 @staticmethod
1002 def initialize_from_dictionary(
1003 data: dict[str, Any]
1004 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1005 retval: Union[EncodedStreamObject, DecodedStreamObject]
1006 if SA.FILTER in data:
1007 retval = EncodedStreamObject()
1008 else:
1009 retval = DecodedStreamObject()
1010 retval._data = data["__streamdata__"]
1011 del data["__streamdata__"]
1012 if SA.LENGTH in data:
1013 del data[SA.LENGTH]
1014 retval.update(data)
1015 return retval
1016
1017 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1018 from ..filters import FlateDecode # noqa: PLC0415
1019
1020 if SA.FILTER in self:
1021 f = self[SA.FILTER]
1022 if isinstance(f, ArrayObject):
1023 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1024 try:
1025 params = ArrayObject(
1026 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())]
1027 )
1028 except TypeError:
1029 # case of error where the * operator is not working (not an array
1030 params = ArrayObject(
1031 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())]
1032 )
1033 else:
1034 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1035 params = ArrayObject(
1036 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())]
1037 )
1038 else:
1039 f = NameObject(FT.FLATE_DECODE)
1040 params = None
1041 retval = EncodedStreamObject()
1042 retval.update(self)
1043 retval[NameObject(SA.FILTER)] = f
1044 if params is not None:
1045 retval[NameObject(SA.DECODE_PARMS)] = params
1046 retval._data = FlateDecode.encode(self._data, level)
1047 return retval
1048
1049 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any:
1050 """
1051 Try to decode the stream object as an image
1052
1053 Args:
1054 pillow_parameters: parameters provided to Pillow Image.save() method,
1055 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save>
1056
1057 Returns:
1058 a PIL image if proper decoding has been found
1059 Raises:
1060 Exception: Errors during decoding will be reported.
1061 It is recommended to catch exceptions to prevent
1062 stops in your program.
1063
1064 """
1065 from ..filters import _xobj_to_image # noqa: PLC0415
1066
1067 if self.get("/Subtype", "") != "/Image":
1068 try:
1069 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover
1070 except AttributeError:
1071 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover
1072 logger_warning(msg, __name__)
1073 extension, _, img = _xobj_to_image(self, pillow_parameters)
1074 if extension is None:
1075 return None # pragma: no cover
1076 return img
1077
1078
1079class DecodedStreamObject(StreamObject):
1080 pass
1081
1082
1083class EncodedStreamObject(StreamObject):
1084 def __init__(self) -> None:
1085 self.decoded_self: Optional[DecodedStreamObject] = None
1086
1087 # This overrides the parent method
1088 def get_data(self) -> bytes:
1089 from ..filters import decode_stream_data # noqa: PLC0415
1090
1091 if self.decoded_self is not None:
1092 # Cached version of decoded object
1093 return self.decoded_self.get_data()
1094
1095 # Create decoded object
1096 decoded = DecodedStreamObject()
1097 decoded.set_data(decode_stream_data(self))
1098 for key, value in self.items():
1099 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS):
1100 decoded[key] = value
1101 self.decoded_self = decoded
1102 return decoded.get_data()
1103
1104 # This overrides the parent method:
1105 def set_data(self, data: bytes) -> None:
1106 from ..filters import FlateDecode # noqa: PLC0415
1107
1108 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1109 if not isinstance(data, bytes):
1110 raise TypeError("Data must be bytes")
1111 if self.decoded_self is None:
1112 self.get_data() # to create self.decoded_self
1113 assert self.decoded_self is not None, "mypy"
1114 self.decoded_self.set_data(data)
1115 super().set_data(FlateDecode.encode(data))
1116 else:
1117 raise PdfReadError(
1118 "Streams encoded with a filter different from FlateDecode are not supported"
1119 )
1120
1121
1122class ContentStream(DecodedStreamObject):
1123 """
1124 In order to be fast, this data structure can contain either:
1125
1126 * raw data in ._data
1127 * parsed stream operations in ._operations.
1128
1129 At any time, ContentStream object can either have both of those fields defined,
1130 or one field defined and the other set to None.
1131
1132 These fields are "rebuilt" lazily, when accessed:
1133
1134 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1135 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1136
1137 Conversely, these fields can be invalidated:
1138
1139 * when .set_data() is called, ._operations is set to None.
1140 * when .operations is set, ._data is set to None.
1141 """
1142
1143 def __init__(
1144 self,
1145 stream: Any,
1146 pdf: Any,
1147 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1148 ) -> None:
1149 self.pdf = pdf
1150 self._operations: list[tuple[Any, bytes]] = []
1151
1152 # stream may be a StreamObject or an ArrayObject containing
1153 # StreamObjects to be concatenated together.
1154 if stream is None:
1155 super().set_data(b"")
1156 else:
1157 stream = stream.get_object()
1158 if isinstance(stream, ArrayObject):
1159 data = b""
1160 for s in stream:
1161 s_resolved = s.get_object()
1162 if isinstance(s_resolved, NullObject):
1163 continue
1164 if not isinstance(s_resolved, StreamObject):
1165 # No need to emit an exception here for now - the PDF structure
1166 # seems to already be broken beforehand in these cases.
1167 logger_warning(
1168 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.",
1169 __name__
1170 )
1171 else:
1172 data += s_resolved.get_data()
1173 if len(data) == 0 or data[-1] != b"\n":
1174 data += b"\n"
1175 super().set_data(bytes(data))
1176 else:
1177 stream_data = stream.get_data()
1178 assert stream_data is not None
1179 super().set_data(stream_data)
1180 self.forced_encoding = forced_encoding
1181
1182 def replicate(
1183 self,
1184 pdf_dest: PdfWriterProtocol,
1185 ) -> "ContentStream":
1186 d__ = cast(
1187 "ContentStream",
1188 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1189 )
1190 d__._data = self._data
1191 try:
1192 decoded_self = self.decoded_self
1193 if decoded_self is None:
1194 self.decoded_self = None
1195 else:
1196 self.decoded_self = cast(
1197 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1198 )
1199 except Exception:
1200 pass
1201 for k, v in self.items():
1202 d__[k.replicate(pdf_dest)] = (
1203 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1204 )
1205 return d__
1206 d__.set_data(self._data)
1207 d__.pdf = pdf_dest
1208 d__._operations = list(self._operations)
1209 d__.forced_encoding = self.forced_encoding
1210 return d__
1211
1212 def clone(
1213 self,
1214 pdf_dest: Any,
1215 force_duplicate: bool = False,
1216 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1217 ) -> "ContentStream":
1218 """
1219 Clone object into pdf_dest.
1220
1221 Args:
1222 pdf_dest:
1223 force_duplicate:
1224 ignore_fields:
1225
1226 Returns:
1227 The cloned ContentStream
1228
1229 """
1230 try:
1231 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
1232 return self
1233 except Exception:
1234 pass
1235
1236 visited: set[tuple[int, int]] = set()
1237 d__ = cast(
1238 "ContentStream",
1239 self._reference_clone(
1240 self.__class__(None, None), pdf_dest, force_duplicate
1241 ),
1242 )
1243 if ignore_fields is None:
1244 ignore_fields = []
1245 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1246 return d__
1247
1248 def _clone(
1249 self,
1250 src: DictionaryObject,
1251 pdf_dest: PdfWriterProtocol,
1252 force_duplicate: bool,
1253 ignore_fields: Optional[Sequence[Union[str, int]]],
1254 visited: set[tuple[int, int]],
1255 ) -> None:
1256 """
1257 Update the object from src.
1258
1259 Args:
1260 src:
1261 pdf_dest:
1262 force_duplicate:
1263 ignore_fields:
1264
1265 """
1266 src_cs = cast("ContentStream", src)
1267 super().set_data(src_cs._data)
1268 self.pdf = pdf_dest
1269 self._operations = list(src_cs._operations)
1270 self.forced_encoding = src_cs.forced_encoding
1271 # no need to call DictionaryObjection or anything
1272 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1273
1274 def _parse_content_stream(self, stream: StreamType) -> None:
1275 # 7.8.2 Content Streams
1276 stream.seek(0, 0)
1277 operands: list[Union[int, str, PdfObject]] = []
1278 while True:
1279 peek = read_non_whitespace(stream)
1280 if peek in (b"", 0):
1281 break
1282 stream.seek(-1, 1)
1283 if peek.isalpha() or peek in (b"'", b'"'):
1284 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1285 if operator == b"BI":
1286 # begin inline image - a completely different parsing
1287 # mechanism is required, of course... thanks buddy...
1288 assert operands == []
1289 ii = self._read_inline_image(stream)
1290 self._operations.append((ii, b"INLINE IMAGE"))
1291 else:
1292 self._operations.append((operands, operator))
1293 operands = []
1294 elif peek == b"%":
1295 # If we encounter a comment in the content stream, we have to
1296 # handle it here. Typically, read_object will handle
1297 # encountering a comment -- but read_object assumes that
1298 # following the comment must be the object we're trying to
1299 # read. In this case, it could be an operator instead.
1300 while peek not in (b"\r", b"\n", b""):
1301 peek = stream.read(1)
1302 else:
1303 operands.append(read_object(stream, None, self.forced_encoding))
1304
1305 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]:
1306 # begin reading just after the "BI" - begin image
1307 # first read the dictionary of settings.
1308 settings = DictionaryObject()
1309 while True:
1310 tok = read_non_whitespace(stream)
1311 stream.seek(-1, 1)
1312 if tok == b"I":
1313 # "ID" - begin of image data
1314 break
1315 key = read_object(stream, self.pdf)
1316 tok = read_non_whitespace(stream)
1317 stream.seek(-1, 1)
1318 value = read_object(stream, self.pdf)
1319 settings[key] = value
1320 # left at beginning of ID
1321 tmp = stream.read(3)
1322 assert tmp[:2] == b"ID"
1323 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1324 savpos = stream.tell()
1325 if isinstance(filtr, list):
1326 filtr = filtr[0] # used forencoding
1327 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1328 data = extract_inline_AHx(stream)
1329 elif "A85" in filtr or "ASCII85Decode" in filtr:
1330 data = extract_inline_A85(stream)
1331 elif "RL" in filtr or "RunLengthDecode" in filtr:
1332 data = extract_inline_RL(stream)
1333 elif "DCT" in filtr or "DCTDecode" in filtr:
1334 data = extract_inline_DCT(stream)
1335 elif filtr == "not set":
1336 cs = settings.get("/CS", "")
1337 if isinstance(cs, list):
1338 cs = cs[0]
1339 if "RGB" in cs:
1340 lcs = 3
1341 elif "CMYK" in cs:
1342 lcs = 4
1343 else:
1344 bits = settings.get(
1345 "/BPC",
1346 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1347 )
1348 if bits > 0:
1349 lcs = bits / 8.0
1350 else:
1351 data = extract_inline_default(stream)
1352 lcs = -1
1353 if lcs > 0:
1354 data = stream.read(
1355 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1356 )
1357 # Move to the `EI` if possible.
1358 ei = read_non_whitespace(stream)
1359 stream.seek(-1, 1)
1360 else:
1361 data = extract_inline_default(stream)
1362
1363 ei = stream.read(3)
1364 stream.seek(-1, 1)
1365 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1366 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1367 stream.seek(savpos, 0)
1368 data = extract_inline_default(stream)
1369 ei = stream.read(3)
1370 stream.seek(-1, 1)
1371 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1372 # Check the same condition again. This should never fail as
1373 # edge cases are covered by `extract_inline_default` above,
1374 # but check this ot make sure that we are behind the `EI` afterwards.
1375 raise PdfStreamError(
1376 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1377 )
1378 return {"settings": settings, "data": data}
1379
1380 # This overrides the parent method
1381 def get_data(self) -> bytes:
1382 if not self._data:
1383 new_data = BytesIO()
1384 for operands, operator in self._operations:
1385 if operator == b"INLINE IMAGE":
1386 new_data.write(b"BI")
1387 dict_text = BytesIO()
1388 operands["settings"].write_to_stream(dict_text)
1389 new_data.write(dict_text.getvalue()[2:-2])
1390 new_data.write(b"ID ")
1391 new_data.write(operands["data"])
1392 new_data.write(b"EI")
1393 else:
1394 for op in operands:
1395 op.write_to_stream(new_data)
1396 new_data.write(b" ")
1397 new_data.write(operator)
1398 new_data.write(b"\n")
1399 self._data = new_data.getvalue()
1400 return self._data
1401
1402 # This overrides the parent method
1403 def set_data(self, data: bytes) -> None:
1404 super().set_data(data)
1405 self._operations = []
1406
1407 @property
1408 def operations(self) -> list[tuple[Any, bytes]]:
1409 if not self._operations and self._data:
1410 self._parse_content_stream(BytesIO(self._data))
1411 self._data = b""
1412 return self._operations
1413
1414 @operations.setter
1415 def operations(self, operations: list[tuple[Any, bytes]]) -> None:
1416 self._operations = operations
1417 self._data = b""
1418
1419 def isolate_graphics_state(self) -> None:
1420 if self._operations:
1421 self._operations.insert(0, ([], b"q"))
1422 self._operations.append(([], b"Q"))
1423 elif self._data:
1424 self._data = b"q\n" + self._data + b"\nQ\n"
1425
1426 # This overrides the parent method
1427 def write_to_stream(
1428 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1429 ) -> None:
1430 if not self._data and self._operations:
1431 self.get_data() # this ensures ._data is rebuilt
1432 super().write_to_stream(stream, encryption_key)
1433
1434
1435def read_object(
1436 stream: StreamType,
1437 pdf: Optional[PdfReaderProtocol],
1438 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1439) -> Union[PdfObject, int, str, ContentStream]:
1440 tok = stream.read(1)
1441 stream.seek(-1, 1) # reset to start
1442 if tok == b"/":
1443 return NameObject.read_from_stream(stream, pdf)
1444 if tok == b"<":
1445 # hexadecimal string OR dictionary
1446 peek = stream.read(2)
1447 stream.seek(-2, 1) # reset to start
1448 if peek == b"<<":
1449 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1450 return read_hex_string_from_stream(stream, forced_encoding)
1451 if tok == b"[":
1452 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1453 if tok in (b"t", b"f"):
1454 return BooleanObject.read_from_stream(stream)
1455 if tok == b"(":
1456 return read_string_from_stream(stream, forced_encoding)
1457 if tok == b"e" and stream.read(6) == b"endobj":
1458 return NullObject()
1459 if tok == b"n":
1460 return NullObject.read_from_stream(stream)
1461 if tok == b"%":
1462 # comment
1463 skip_over_comment(stream)
1464 tok = read_non_whitespace(stream)
1465 stream.seek(-1, 1)
1466 return read_object(stream, pdf, forced_encoding)
1467 if tok in b"0123456789+-.":
1468 # number object OR indirect reference
1469 peek = stream.read(20)
1470 stream.seek(-len(peek), 1) # reset to start
1471 if IndirectPattern.match(peek) is not None:
1472 assert pdf is not None, "mypy"
1473 return IndirectObject.read_from_stream(stream, pdf)
1474 return NumberObject.read_from_stream(stream)
1475 pos = stream.tell()
1476 stream.seek(-20, 1)
1477 stream_extract = stream.read(80)
1478 stream.seek(pos)
1479 read_until_whitespace(stream)
1480 raise PdfReadError(
1481 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1482 )
1483
1484
1485class Field(TreeObject):
1486 """
1487 A class representing a field dictionary.
1488
1489 This class is accessed through
1490 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1491 """
1492
1493 def __init__(self, data: DictionaryObject) -> None:
1494 DictionaryObject.__init__(self)
1495 field_attributes = (
1496 FieldDictionaryAttributes.attributes()
1497 + CheckboxRadioButtonAttributes.attributes()
1498 )
1499 self.indirect_reference = data.indirect_reference
1500 for attr in field_attributes:
1501 try:
1502 self[NameObject(attr)] = data[attr]
1503 except KeyError:
1504 pass
1505 if isinstance(self.get("/V"), EncodedStreamObject):
1506 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1507 if isinstance(d, bytes):
1508 d_str = d.decode()
1509 elif d is None:
1510 d_str = ""
1511 else:
1512 raise Exception("Should never happen")
1513 self[NameObject("/V")] = TextStringObject(d_str)
1514
1515 # TABLE 8.69 Entries common to all field dictionaries
1516 @property
1517 def field_type(self) -> Optional[NameObject]:
1518 """Read-only property accessing the type of this field."""
1519 return self.get(FieldDictionaryAttributes.FT)
1520
1521 @property
1522 def parent(self) -> Optional[DictionaryObject]:
1523 """Read-only property accessing the parent of this field."""
1524 return self.get(FieldDictionaryAttributes.Parent)
1525
1526 @property
1527 def kids(self) -> Optional["ArrayObject"]:
1528 """Read-only property accessing the kids of this field."""
1529 return self.get(FieldDictionaryAttributes.Kids)
1530
1531 @property
1532 def name(self) -> Optional[str]:
1533 """Read-only property accessing the name of this field."""
1534 return self.get(FieldDictionaryAttributes.T)
1535
1536 @property
1537 def alternate_name(self) -> Optional[str]:
1538 """Read-only property accessing the alternate name of this field."""
1539 return self.get(FieldDictionaryAttributes.TU)
1540
1541 @property
1542 def mapping_name(self) -> Optional[str]:
1543 """
1544 Read-only property accessing the mapping name of this field.
1545
1546 This name is used by pypdf as a key in the dictionary returned by
1547 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1548 """
1549 return self.get(FieldDictionaryAttributes.TM)
1550
1551 @property
1552 def flags(self) -> Optional[int]:
1553 """
1554 Read-only property accessing the field flags, specifying various
1555 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1556 """
1557 return self.get(FieldDictionaryAttributes.Ff)
1558
1559 @property
1560 def value(self) -> Optional[Any]:
1561 """
1562 Read-only property accessing the value of this field.
1563
1564 Format varies based on field type.
1565 """
1566 return self.get(FieldDictionaryAttributes.V)
1567
1568 @property
1569 def default_value(self) -> Optional[Any]:
1570 """Read-only property accessing the default value of this field."""
1571 return self.get(FieldDictionaryAttributes.DV)
1572
1573 @property
1574 def additional_actions(self) -> Optional[DictionaryObject]:
1575 """
1576 Read-only property accessing the additional actions dictionary.
1577
1578 This dictionary defines the field's behavior in response to trigger
1579 events. See Section 8.5.2 of the PDF 1.7 reference.
1580 """
1581 return self.get(FieldDictionaryAttributes.AA)
1582
1583
1584class Destination(TreeObject):
1585 """
1586 A class representing a destination within a PDF file.
1587
1588 See section 12.3.2 of the PDF 2.0 reference.
1589
1590 Args:
1591 title: Title of this destination.
1592 page: Reference to the page of this destination. Should
1593 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1594 fit: How the destination is displayed.
1595
1596 Raises:
1597 PdfReadError: If destination type is invalid.
1598
1599 """
1600
1601 node: Optional[
1602 DictionaryObject
1603 ] = None # node provide access to the original Object
1604
1605 def __init__(
1606 self,
1607 title: Union[str, bytes],
1608 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1609 fit: Fit,
1610 ) -> None:
1611 self._filtered_children: list[Any] = [] # used in PdfWriter
1612
1613 typ = fit.fit_type
1614 args = fit.fit_args
1615
1616 DictionaryObject.__init__(self)
1617 self[NameObject("/Title")] = TextStringObject(title)
1618 self[NameObject("/Page")] = page
1619 self[NameObject("/Type")] = typ
1620
1621 # from table 8.2 of the PDF 1.7 reference.
1622 if typ == "/XYZ":
1623 if len(args) < 1: # left is missing : should never occur
1624 args.append(NumberObject(0.0))
1625 if len(args) < 2: # top is missing
1626 args.append(NumberObject(0.0))
1627 if len(args) < 3: # zoom is missing
1628 args.append(NumberObject(0.0))
1629 (
1630 self[NameObject(TA.LEFT)],
1631 self[NameObject(TA.TOP)],
1632 self[NameObject("/Zoom")],
1633 ) = args
1634 elif len(args) == 0:
1635 pass
1636 elif typ == TF.FIT_R:
1637 (
1638 self[NameObject(TA.LEFT)],
1639 self[NameObject(TA.BOTTOM)],
1640 self[NameObject(TA.RIGHT)],
1641 self[NameObject(TA.TOP)],
1642 ) = args
1643 elif typ in [TF.FIT_H, TF.FIT_BH]:
1644 try: # Prefer to be more robust not only to null parameters
1645 (self[NameObject(TA.TOP)],) = args
1646 except Exception:
1647 (self[NameObject(TA.TOP)],) = (NullObject(),)
1648 elif typ in [TF.FIT_V, TF.FIT_BV]:
1649 try: # Prefer to be more robust not only to null parameters
1650 (self[NameObject(TA.LEFT)],) = args
1651 except Exception:
1652 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1653 elif typ in [TF.FIT, TF.FIT_B]:
1654 pass
1655 else:
1656 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1657
1658 @property
1659 def dest_array(self) -> "ArrayObject":
1660 return ArrayObject(
1661 [self.raw_get("/Page"), self["/Type"]]
1662 + [
1663 self[x]
1664 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1665 if x in self
1666 ]
1667 )
1668
1669 def write_to_stream(
1670 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1671 ) -> None:
1672 if encryption_key is not None: # deprecated
1673 deprecation_no_replacement(
1674 "the encryption_key parameter of write_to_stream", "5.0.0"
1675 )
1676 stream.write(b"<<\n")
1677 key = NameObject("/D")
1678 key.write_to_stream(stream)
1679 stream.write(b" ")
1680 value = self.dest_array
1681 value.write_to_stream(stream)
1682
1683 key = NameObject("/S")
1684 key.write_to_stream(stream)
1685 stream.write(b" ")
1686 value_s = NameObject("/GoTo")
1687 value_s.write_to_stream(stream)
1688
1689 stream.write(b"\n")
1690 stream.write(b">>")
1691
1692 @property
1693 def title(self) -> Optional[str]:
1694 """Read-only property accessing the destination title."""
1695 return self.get("/Title")
1696
1697 @property
1698 def page(self) -> Optional[IndirectObject]:
1699 """Read-only property accessing the IndirectObject of the destination page."""
1700 return self.get("/Page")
1701
1702 @property
1703 def typ(self) -> Optional[str]:
1704 """Read-only property accessing the destination type."""
1705 return self.get("/Type")
1706
1707 @property
1708 def zoom(self) -> Optional[int]:
1709 """Read-only property accessing the zoom factor."""
1710 return self.get("/Zoom", None)
1711
1712 @property
1713 def left(self) -> Optional[FloatObject]:
1714 """Read-only property accessing the left horizontal coordinate."""
1715 return self.get("/Left", None)
1716
1717 @property
1718 def right(self) -> Optional[FloatObject]:
1719 """Read-only property accessing the right horizontal coordinate."""
1720 return self.get("/Right", None)
1721
1722 @property
1723 def top(self) -> Optional[FloatObject]:
1724 """Read-only property accessing the top vertical coordinate."""
1725 return self.get("/Top", None)
1726
1727 @property
1728 def bottom(self) -> Optional[FloatObject]:
1729 """Read-only property accessing the bottom vertical coordinate."""
1730 return self.get("/Bottom", None)
1731
1732 @property
1733 def color(self) -> Optional["ArrayObject"]:
1734 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1735 return self.get(
1736 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])
1737 )
1738
1739 @property
1740 def font_format(self) -> Optional[OutlineFontFlag]:
1741 """
1742 Read-only property accessing the font type.
1743
1744 1=italic, 2=bold, 3=both
1745 """
1746 return self.get("/F", 0)
1747
1748 @property
1749 def outline_count(self) -> Optional[int]:
1750 """
1751 Read-only property accessing the outline count.
1752
1753 positive = expanded
1754 negative = collapsed
1755 absolute value = number of visible descendants at all levels
1756 """
1757 return self.get("/Count", None)