1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from collections.abc import Iterable, Sequence
36from io import BytesIO
37from math import ceil
38from typing import (
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
45
46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
47from .._utils import (
48 WHITESPACES,
49 StreamType,
50 deprecation_no_replacement,
51 logger_warning,
52 read_non_whitespace,
53 read_until_regex,
54 read_until_whitespace,
55 skip_over_comment,
56)
57from ..constants import (
58 CheckboxRadioButtonAttributes,
59 FieldDictionaryAttributes,
60 OutlineFontFlag,
61)
62from ..constants import FilterTypes as FT
63from ..constants import StreamAttributes as SA
64from ..constants import TypArguments as TA
65from ..constants import TypFitArguments as TF
66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError
67from ._base import (
68 BooleanObject,
69 ByteStringObject,
70 FloatObject,
71 IndirectObject,
72 NameObject,
73 NullObject,
74 NumberObject,
75 PdfObject,
76 TextStringObject,
77 is_null_or_none,
78)
79from ._fit import Fit
80from ._image_inline import (
81 extract_inline__ascii85_decode,
82 extract_inline__ascii_hex_decode,
83 extract_inline__dct_decode,
84 extract_inline__run_length_decode,
85 extract_inline_default,
86)
87from ._utils import read_hex_string_from_stream, read_string_from_stream
88
89if sys.version_info >= (3, 11):
90 from typing import Self
91else:
92 from typing_extensions import Self
93
94logger = logging.getLogger(__name__)
95
96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
97
98
99class ArrayObject(list[Any], PdfObject):
100 def replicate(
101 self,
102 pdf_dest: PdfWriterProtocol,
103 ) -> "ArrayObject":
104 arr = cast(
105 "ArrayObject",
106 self._reference_clone(ArrayObject(), pdf_dest, False),
107 )
108 for data in self:
109 if hasattr(data, "replicate"):
110 arr.append(data.replicate(pdf_dest))
111 else:
112 arr.append(data)
113 return arr
114
115 def clone(
116 self,
117 pdf_dest: PdfWriterProtocol,
118 force_duplicate: bool = False,
119 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
120 ) -> "ArrayObject":
121 """Clone object into pdf_dest."""
122 try:
123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
124 return self
125 except Exception:
126 pass
127 arr = cast(
128 "ArrayObject",
129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate=True),
130 )
131 for data in self:
132 if isinstance(data, StreamObject):
133 dup = data._reference_clone(
134 data.clone(pdf_dest, force_duplicate, ignore_fields),
135 pdf_dest,
136 force_duplicate,
137 )
138 arr.append(dup.indirect_reference)
139 elif hasattr(data, "clone"):
140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
141 else:
142 arr.append(data)
143 return arr
144
145 def hash_bin(self) -> int:
146 """
147 Used to detect modified object.
148
149 Returns:
150 Hash considering type and value.
151
152 """
153 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
154
155 def items(self) -> Iterable[Any]:
156 """Emulate DictionaryObject.items for a list (index, object)."""
157 return enumerate(self)
158
159 def _to_lst(self, lst: Any) -> list[Any]:
160 # Convert to list, internal
161 if isinstance(lst, (list, tuple, set)):
162 pass
163 elif isinstance(lst, PdfObject):
164 lst = [lst]
165 elif isinstance(lst, str):
166 if lst[0] == "/":
167 lst = [NameObject(lst)]
168 else:
169 lst = [TextStringObject(lst)]
170 elif isinstance(lst, bytes):
171 lst = [ByteStringObject(lst)]
172 else: # for numbers,...
173 lst = [lst]
174 return lst
175
176 def __add__(self, lst: Any) -> "ArrayObject":
177 """
178 Allow extension by adding list or add one element only
179
180 Args:
181 lst: any list, tuples are extended the list.
182 other types(numbers,...) will be appended.
183 if str is passed it will be converted into TextStringObject
184 or NameObject (if starting with "/")
185 if bytes is passed it will be converted into ByteStringObject
186
187 Returns:
188 ArrayObject with all elements
189
190 """
191 temp = ArrayObject(self)
192 temp.extend(self._to_lst(lst))
193 return temp
194
195 def __iadd__(self, lst: Any) -> Self:
196 """
197 Allow extension by adding list or add one element only
198
199 Args:
200 lst: any list, tuples are extended the list.
201 other types(numbers,...) will be appended.
202 if str is passed it will be converted into TextStringObject
203 or NameObject (if starting with "/")
204 if bytes is passed it will be converted into ByteStringObject
205
206 """
207 self.extend(self._to_lst(lst))
208 return self
209
210 def __isub__(self, lst: Any) -> Self:
211 """Allow to remove items"""
212 for x in self._to_lst(lst):
213 try:
214 index = self.index(x)
215 del self[index]
216 except ValueError:
217 pass
218 return self
219
220 def write_to_stream(
221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
222 ) -> None:
223 if encryption_key is not None: # deprecated
224 deprecation_no_replacement(
225 "the encryption_key parameter of write_to_stream", "5.0.0"
226 )
227 stream.write(b"[")
228 for data in self:
229 stream.write(b" ")
230 data.write_to_stream(stream)
231 stream.write(b" ]")
232
233 @staticmethod
234 def read_from_stream(
235 stream: StreamType,
236 pdf: Optional[PdfReaderProtocol],
237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
238 ) -> "ArrayObject":
239 arr = ArrayObject()
240 tmp = stream.read(1)
241 if tmp != b"[":
242 raise PdfReadError("Could not read array")
243 while True:
244 # skip leading whitespace
245 tok = stream.read(1)
246 while tok.isspace():
247 tok = stream.read(1)
248 if tok == b"":
249 break
250 if tok == b"%":
251 stream.seek(-1, 1)
252 skip_over_comment(stream)
253 continue
254 stream.seek(-1, 1)
255 # check for array ending
256 peek_ahead = stream.read(1)
257 if peek_ahead == b"]":
258 break
259 stream.seek(-1, 1)
260 # read and append object
261 arr.append(read_object(stream, pdf, forced_encoding))
262 return arr
263
264
265class DictionaryObject(dict[Any, Any], PdfObject):
266 def replicate(
267 self,
268 pdf_dest: PdfWriterProtocol,
269 ) -> "DictionaryObject":
270 d__ = cast(
271 "DictionaryObject",
272 self._reference_clone(self.__class__(), pdf_dest, False),
273 )
274 for k, v in self.items():
275 d__[k.replicate(pdf_dest)] = (
276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
277 )
278 return d__
279
280 def clone(
281 self,
282 pdf_dest: PdfWriterProtocol,
283 force_duplicate: bool = False,
284 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
285 ) -> "DictionaryObject":
286 """Clone object into pdf_dest."""
287 try:
288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
289 return self
290 except Exception:
291 pass
292
293 visited: set[tuple[int, int]] = set() # (idnum, generation)
294 d__ = cast(
295 "DictionaryObject",
296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
297 )
298 if ignore_fields is None:
299 ignore_fields = []
300 if len(d__.keys()) == 0:
301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
302 return d__
303
304 def _clone(
305 self,
306 src: "DictionaryObject",
307 pdf_dest: PdfWriterProtocol,
308 force_duplicate: bool,
309 ignore_fields: Optional[Sequence[Union[str, int]]],
310 visited: set[tuple[int, int]], # (idnum, generation)
311 ) -> None:
312 """
313 Update the object from src.
314
315 Args:
316 src: "DictionaryObject":
317 pdf_dest:
318 force_duplicate:
319 ignore_fields:
320
321 """
322 # First we remove the ignore_fields
323 # that are for a limited number of levels
324 assert ignore_fields is not None
325 ignore_fields = list(ignore_fields)
326 x = 0
327 while x < len(ignore_fields):
328 if isinstance(ignore_fields[x], int):
329 if cast(int, ignore_fields[x]) <= 0:
330 del ignore_fields[x]
331 del ignore_fields[x]
332 continue
333 ignore_fields[x] -= 1 # type:ignore
334 x += 1
335 # Check if this is a chain list, we need to loop to prevent recur
336 if any(
337 field not in ignore_fields
338 and field in src
339 and isinstance(src.raw_get(field), IndirectObject)
340 and isinstance(src[field], DictionaryObject)
341 and (
342 src.get("/Type", None) is None
343 or cast(DictionaryObject, src[field]).get("/Type", None) is None
344 or src.get("/Type", None)
345 == cast(DictionaryObject, src[field]).get("/Type", None)
346 )
347 for field in ["/Next", "/Prev", "/N", "/V"]
348 ):
349 ignore_fields = list(ignore_fields)
350 for lst in (("/Next", "/Prev"), ("/N", "/V")):
351 for k in lst:
352 objs = []
353 if (
354 k in src
355 and k not in self
356 and isinstance(src.raw_get(k), IndirectObject)
357 and isinstance(src[k], DictionaryObject)
358 # If need to go further the idea is to check
359 # that the types are the same
360 and (
361 src.get("/Type", None) is None
362 or cast(DictionaryObject, src[k]).get("/Type", None) is None
363 or src.get("/Type", None)
364 == cast(DictionaryObject, src[k]).get("/Type", None)
365 )
366 ):
367 cur_obj: Optional[DictionaryObject] = cast(
368 "DictionaryObject", src[k]
369 )
370 prev_obj: Optional[DictionaryObject] = self
371 while cur_obj is not None:
372 clon = cast(
373 "DictionaryObject",
374 cur_obj._reference_clone(
375 cur_obj.__class__(), pdf_dest, force_duplicate
376 ),
377 )
378 # Check to see if we've previously processed our item
379 if clon.indirect_reference is not None:
380 idnum = clon.indirect_reference.idnum
381 generation = clon.indirect_reference.generation
382 if (idnum, generation) in visited:
383 cur_obj = None
384 break
385 visited.add((idnum, generation))
386 objs.append((cur_obj, clon))
387 assert prev_obj is not None
388 prev_obj[NameObject(k)] = clon.indirect_reference
389 prev_obj = clon
390 try:
391 if cur_obj == src:
392 cur_obj = None
393 else:
394 cur_obj = cast("DictionaryObject", cur_obj[k])
395 except Exception:
396 cur_obj = None
397 for s, c in objs:
398 c._clone(
399 s, pdf_dest, force_duplicate, ignore_fields, visited
400 )
401
402 for k, v in src.items():
403 if k not in ignore_fields:
404 if isinstance(v, StreamObject):
405 if not hasattr(v, "indirect_reference"):
406 v.indirect_reference = None
407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
408 assert vv.indirect_reference is not None
409 self[k.clone(pdf_dest)] = vv.indirect_reference
410 elif k not in self:
411 self[NameObject(k)] = (
412 v.clone(pdf_dest, force_duplicate, ignore_fields)
413 if hasattr(v, "clone")
414 else v
415 )
416
417 def hash_bin(self) -> int:
418 """
419 Used to detect modified object.
420
421 Returns:
422 Hash considering type and value.
423
424 """
425 return hash(
426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
427 )
428
429 def raw_get(self, key: Any) -> Any:
430 return dict.__getitem__(self, key)
431
432 def get_inherited(self, key: str, default: Any = None) -> Any:
433 """
434 Returns the value of a key or from the parent if not found.
435 If not found returns default.
436
437 Args:
438 key: string identifying the field to return
439
440 default: default value to return
441
442 Returns:
443 Current key or inherited one, otherwise default value.
444
445 """
446 if key in self:
447 return self[key]
448 try:
449 if "/Parent" not in self:
450 return default
451 raise KeyError("Not present")
452 except KeyError:
453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited(
454 key, default
455 )
456
457 def __setitem__(self, key: Any, value: Any) -> Any:
458 if not isinstance(key, PdfObject):
459 raise ValueError("Key must be a PdfObject")
460 if not isinstance(value, PdfObject):
461 raise ValueError("Value must be a PdfObject")
462 return dict.__setitem__(self, key, value)
463
464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
465 if not isinstance(key, PdfObject):
466 raise ValueError("Key must be a PdfObject")
467 if not isinstance(value, PdfObject):
468 raise ValueError("Value must be a PdfObject")
469 return dict.setdefault(self, key, value)
470
471 def __getitem__(self, key: Any) -> PdfObject:
472 return dict.__getitem__(self, key).get_object()
473
474 @property
475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
476 """
477 Retrieve XMP (Extensible Metadata Platform) data relevant to this
478 object, if available.
479
480 See Table 347 — Additional entries in a metadata stream dictionary.
481
482 Returns:
483 Returns a :class:`~pypdf.xmp.XmpInformation` instance
484 that can be used to access XMP metadata from the document. Can also
485 return None if no metadata was found on the document root.
486
487 """
488 from ..xmp import XmpInformation # noqa: PLC0415
489
490 metadata = self.get("/Metadata", None)
491 if is_null_or_none(metadata):
492 return None
493 assert metadata is not None, "mypy"
494 metadata = metadata.get_object()
495 return XmpInformation(metadata)
496
497 def write_to_stream(
498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
499 ) -> None:
500 if encryption_key is not None: # deprecated
501 deprecation_no_replacement(
502 "the encryption_key parameter of write_to_stream", "5.0.0"
503 )
504 stream.write(b"<<\n")
505 for key, value in self.items():
506 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
507 continue
508 key.write_to_stream(stream, encryption_key)
509 stream.write(b" ")
510 value.write_to_stream(stream)
511 stream.write(b"\n")
512 stream.write(b">>")
513
514 @classmethod
515 def _get_next_object_position(
516 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol
517 ) -> int:
518 out = position_end
519 for generation in generations:
520 location = pdf.xref[generation]
521 values = [x for x in location.values() if position_before < x <= position_end]
522 if values:
523 out = min(out, *values)
524 return out
525
526 @classmethod
527 def _read_unsized_from_stream(
528 cls, stream: StreamType, pdf: PdfReaderProtocol
529 ) -> bytes:
530 object_position = cls._get_next_object_position(
531 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf
532 ) - 1
533 current_position = stream.tell()
534 # Read until the next object position.
535 read_value = stream.read(object_position - stream.tell())
536 endstream_position = read_value.find(b"endstream")
537 if endstream_position < 0:
538 raise PdfReadError(
539 f"Unable to find 'endstream' marker for obj starting at {current_position}."
540 )
541 # 9 = len(b"endstream")
542 stream.seek(current_position + endstream_position + 9)
543 return read_value[: endstream_position - 1]
544
545 @staticmethod
546 def read_from_stream(
547 stream: StreamType,
548 pdf: Optional[PdfReaderProtocol],
549 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
550 ) -> "DictionaryObject":
551 tmp = stream.read(2)
552 if tmp != b"<<":
553 raise PdfReadError(
554 f"Dictionary read error at byte {hex(stream.tell())}: "
555 "stream must begin with '<<'"
556 )
557 data: dict[Any, Any] = {}
558 while True:
559 tok = read_non_whitespace(stream)
560 if tok == b"\x00":
561 continue
562 if tok == b"%":
563 stream.seek(-1, 1)
564 skip_over_comment(stream)
565 continue
566 if not tok:
567 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
568
569 if tok == b">":
570 stream.read(1)
571 break
572 stream.seek(-1, 1)
573 try:
574 try:
575 key = read_object(stream, pdf)
576 if isinstance(key, NullObject):
577 break
578 if not isinstance(key, NameObject):
579 raise PdfReadError(
580 f"Expecting a NameObject for key but found {key!r}"
581 )
582 except PdfReadError as exc:
583 if pdf is not None and pdf.strict:
584 raise
585 logger_warning(exc.__repr__(), __name__)
586 continue
587 tok = read_non_whitespace(stream)
588 stream.seek(-1, 1)
589 value = read_object(stream, pdf, forced_encoding)
590 except Exception as exc:
591 if pdf is not None and pdf.strict:
592 raise PdfReadError(exc.__repr__())
593 logger_warning(exc.__repr__(), __name__)
594 retval = DictionaryObject()
595 retval.update(data)
596 return retval # return partial data
597
598 if not data.get(key):
599 data[key] = value
600 else:
601 # multiple definitions of key not permitted
602 msg = (
603 f"Multiple definitions in dictionary at byte "
604 f"{hex(stream.tell())} for key {key}"
605 )
606 if pdf is not None and pdf.strict:
607 raise PdfReadError(msg)
608 logger_warning(msg, __name__)
609
610 pos = stream.tell()
611 s = read_non_whitespace(stream)
612 if s == b"s" and stream.read(5) == b"tream":
613 eol = stream.read(1)
614 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
615 # patch provided by Danial Sandler
616 while eol == b" ":
617 eol = stream.read(1)
618 if eol not in (b"\n", b"\r"):
619 raise PdfStreamError("Stream data must be followed by a newline")
620 if eol == b"\r" and stream.read(1) != b"\n":
621 stream.seek(-1, 1)
622 # this is a stream object, not a dictionary
623 if SA.LENGTH not in data:
624 if pdf is not None and pdf.strict:
625 raise PdfStreamError("Stream length not defined")
626 logger_warning(
627 f"Stream length not defined @pos={stream.tell()}", __name__
628 )
629 data[NameObject(SA.LENGTH)] = NumberObject(-1)
630 length = data[SA.LENGTH]
631 if isinstance(length, IndirectObject):
632 t = stream.tell()
633 assert pdf is not None, "mypy"
634 length = pdf.get_object(length)
635 stream.seek(t, 0)
636 if length is None: # if the PDF is damaged
637 length = -1
638 pstart = stream.tell()
639 if length >= 0:
640 data["__streamdata__"] = stream.read(length)
641 else:
642 data["__streamdata__"] = read_until_regex(
643 stream, re.compile(b"endstream")
644 )
645 e = read_non_whitespace(stream)
646 ndstream = stream.read(8)
647 if (e + ndstream) != b"endstream":
648 # the odd PDF file has a length that is too long, so
649 # we need to read backwards to find the "endstream" ending.
650 # ReportLab (unknown version) generates files with this bug,
651 # and Python users into PDF files tend to be our audience.
652 # we need to do this to correct the streamdata and chop off
653 # an extra character.
654 pos = stream.tell()
655 stream.seek(-10, 1)
656 end = stream.read(9)
657 if end == b"endstream":
658 # we found it by looking back one character further.
659 data["__streamdata__"] = data["__streamdata__"][:-1]
660 elif pdf is not None and not pdf.strict:
661 stream.seek(pstart, 0)
662 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf)
663 pos = stream.tell()
664 else:
665 stream.seek(pos, 0)
666 raise PdfReadError(
667 "Unable to find 'endstream' marker after stream at byte "
668 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
669 )
670 else:
671 stream.seek(pos, 0)
672 if "__streamdata__" in data:
673 return StreamObject.initialize_from_dictionary(data)
674 retval = DictionaryObject()
675 retval.update(data)
676 return retval
677
678
679class TreeObject(DictionaryObject):
680 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
681 DictionaryObject.__init__(self)
682 if dct:
683 self.update(dct)
684
685 def has_children(self) -> bool:
686 return "/First" in self
687
688 def __iter__(self) -> Any:
689 return self.children()
690
691 def children(self) -> Iterable[Any]:
692 if not self.has_children():
693 return
694
695 child_ref = self[NameObject("/First")]
696 last = self[NameObject("/Last")]
697 child = child_ref.get_object()
698 visited: set[int] = set()
699 while True:
700 child_id = id(child)
701 if child_id in visited:
702 logger_warning(f"Detected cycle in outline structure for {child}", __name__)
703 return
704 visited.add(child_id)
705
706 yield child
707
708 if child == last:
709 return
710 child_ref = child.get(NameObject("/Next")) # type: ignore
711 if is_null_or_none(child_ref):
712 return
713 child = child_ref.get_object()
714
715 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
716 self.insert_child(child, None, pdf)
717
718 def inc_parent_counter_default(
719 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
720 ) -> None:
721 if is_null_or_none(parent):
722 return
723 assert parent is not None, "mypy"
724 parent = cast("TreeObject", parent.get_object())
725 if "/Count" in parent:
726 parent[NameObject("/Count")] = NumberObject(
727 max(0, cast(int, parent[NameObject("/Count")]) + n)
728 )
729 self.inc_parent_counter_default(parent.get("/Parent", None), n)
730
731 def inc_parent_counter_outline(
732 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
733 ) -> None:
734 if is_null_or_none(parent):
735 return
736 assert parent is not None, "mypy"
737 parent = cast("TreeObject", parent.get_object())
738 # BooleanObject requires comparison with == not is
739 opn = parent.get("/%is_open%", True) == True # noqa: E712
740 c = cast(int, parent.get("/Count", 0))
741 if c < 0:
742 c = abs(c)
743 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
744 if not opn:
745 return
746 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
747
748 def insert_child(
749 self,
750 child: Any,
751 before: Any,
752 pdf: PdfWriterProtocol,
753 inc_parent_counter: Optional[Callable[..., Any]] = None,
754 ) -> IndirectObject:
755 if inc_parent_counter is None:
756 inc_parent_counter = self.inc_parent_counter_default
757 child_obj = child.get_object()
758 child = child.indirect_reference # get_reference(child_obj)
759
760 prev: Optional[DictionaryObject]
761 if "/First" not in self: # no child yet
762 self[NameObject("/First")] = child
763 self[NameObject("/Count")] = NumberObject(0)
764 self[NameObject("/Last")] = child
765 child_obj[NameObject("/Parent")] = self.indirect_reference
766 inc_parent_counter(self, child_obj.get("/Count", 1))
767 if "/Next" in child_obj:
768 del child_obj["/Next"]
769 if "/Prev" in child_obj:
770 del child_obj["/Prev"]
771 return child
772 prev = cast("DictionaryObject", self["/Last"])
773
774 while prev.indirect_reference != before:
775 if "/Next" in prev:
776 prev = cast("TreeObject", prev["/Next"])
777 else: # append at the end
778 prev[NameObject("/Next")] = cast("TreeObject", child)
779 child_obj[NameObject("/Prev")] = prev.indirect_reference
780 child_obj[NameObject("/Parent")] = self.indirect_reference
781 if "/Next" in child_obj:
782 del child_obj["/Next"]
783 self[NameObject("/Last")] = child
784 inc_parent_counter(self, child_obj.get("/Count", 1))
785 return child
786 try: # insert as first or in the middle
787 assert isinstance(prev["/Prev"], DictionaryObject)
788 prev["/Prev"][NameObject("/Next")] = child
789 child_obj[NameObject("/Prev")] = prev["/Prev"]
790 except Exception: # it means we are inserting in first position
791 del child_obj["/Next"]
792 child_obj[NameObject("/Next")] = prev
793 prev[NameObject("/Prev")] = child
794 child_obj[NameObject("/Parent")] = self.indirect_reference
795 inc_parent_counter(self, child_obj.get("/Count", 1))
796 return child
797
798 def _remove_node_from_tree(
799 self, prev: Any, prev_ref: Any, cur: Any, last: Any
800 ) -> None:
801 """
802 Adjust the pointers of the linked list and tree node count.
803
804 Args:
805 prev:
806 prev_ref:
807 cur:
808 last:
809
810 """
811 next_ref = cur.get(NameObject("/Next"), None)
812 if prev is None:
813 if next_ref:
814 # Removing first tree node
815 next_obj = next_ref.get_object()
816 del next_obj[NameObject("/Prev")]
817 self[NameObject("/First")] = next_ref
818 self[NameObject("/Count")] = NumberObject(
819 self[NameObject("/Count")] - 1 # type: ignore
820 )
821
822 else:
823 # Removing only tree node
824 self[NameObject("/Count")] = NumberObject(0)
825 del self[NameObject("/First")]
826 if NameObject("/Last") in self:
827 del self[NameObject("/Last")]
828 else:
829 if next_ref:
830 # Removing middle tree node
831 next_obj = next_ref.get_object()
832 next_obj[NameObject("/Prev")] = prev_ref
833 prev[NameObject("/Next")] = next_ref
834 else:
835 # Removing last tree node
836 assert cur == last
837 del prev[NameObject("/Next")]
838 self[NameObject("/Last")] = prev_ref
839 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore
840
841 def remove_child(self, child: Any) -> None:
842 child_obj = child.get_object()
843 child = child_obj.indirect_reference
844
845 if NameObject("/Parent") not in child_obj:
846 raise ValueError("Removed child does not appear to be a tree item")
847 if child_obj[NameObject("/Parent")] != self:
848 raise ValueError("Removed child is not a member of this tree")
849
850 found = False
851 prev_ref = None
852 prev = None
853 cur_ref: Optional[Any] = self[NameObject("/First")]
854 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore
855 last_ref = self[NameObject("/Last")]
856 last = last_ref.get_object()
857 while cur is not None:
858 if cur == child_obj:
859 self._remove_node_from_tree(prev, prev_ref, cur, last)
860 found = True
861 break
862
863 # Go to the next node
864 prev_ref = cur_ref
865 prev = cur
866 if NameObject("/Next") in cur:
867 cur_ref = cur[NameObject("/Next")]
868 cur = cur_ref.get_object()
869 else:
870 cur_ref = None
871 cur = None
872
873 if not found:
874 raise ValueError("Removal couldn't find item in tree")
875
876 _reset_node_tree_relationship(child_obj)
877
878 def remove_from_tree(self) -> None:
879 """Remove the object from the tree it is in."""
880 if NameObject("/Parent") not in self:
881 raise ValueError("Removed child does not appear to be a tree item")
882 cast("TreeObject", self["/Parent"]).remove_child(self)
883
884 def empty_tree(self) -> None:
885 for child in self:
886 child_obj = child.get_object()
887 _reset_node_tree_relationship(child_obj)
888
889 if NameObject("/Count") in self:
890 del self[NameObject("/Count")]
891 if NameObject("/First") in self:
892 del self[NameObject("/First")]
893 if NameObject("/Last") in self:
894 del self[NameObject("/Last")]
895
896
897def _reset_node_tree_relationship(child_obj: Any) -> None:
898 """
899 Call this after a node has been removed from a tree.
900
901 This resets the nodes attributes in respect to that tree.
902
903 Args:
904 child_obj:
905
906 """
907 del child_obj[NameObject("/Parent")]
908 if NameObject("/Next") in child_obj:
909 del child_obj[NameObject("/Next")]
910 if NameObject("/Prev") in child_obj:
911 del child_obj[NameObject("/Prev")]
912
913
914class StreamObject(DictionaryObject):
915 def __init__(self) -> None:
916 self._data: bytes = b""
917 self.decoded_self: Optional[DecodedStreamObject] = None
918
919 def replicate(
920 self,
921 pdf_dest: PdfWriterProtocol,
922 ) -> "StreamObject":
923 d__ = cast(
924 "StreamObject",
925 self._reference_clone(self.__class__(), pdf_dest, False),
926 )
927 d__._data = self._data
928 try:
929 decoded_self = self.decoded_self
930 if decoded_self is None:
931 self.decoded_self = None
932 else:
933 self.decoded_self = cast(
934 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
935 )
936 except Exception:
937 pass
938 for k, v in self.items():
939 d__[k.replicate(pdf_dest)] = (
940 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
941 )
942 return d__
943
944 def _clone(
945 self,
946 src: DictionaryObject,
947 pdf_dest: PdfWriterProtocol,
948 force_duplicate: bool,
949 ignore_fields: Optional[Sequence[Union[str, int]]],
950 visited: set[tuple[int, int]],
951 ) -> None:
952 """
953 Update the object from src.
954
955 Args:
956 src:
957 pdf_dest:
958 force_duplicate:
959 ignore_fields:
960
961 """
962 self._data = cast("StreamObject", src)._data
963 try:
964 decoded_self = cast("StreamObject", src).decoded_self
965 if decoded_self is None:
966 self.decoded_self = None
967 else:
968 self.decoded_self = cast(
969 "DecodedStreamObject",
970 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
971 )
972 except Exception:
973 pass
974 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
975
976 def hash_bin(self) -> int:
977 """
978 Used to detect modified object.
979
980 Returns:
981 Hash considering type and value.
982
983 """
984 # Use _data to prevent errors on non-decoded streams.
985 return hash((super().hash_bin(), self._data))
986
987 def get_data(self) -> bytes:
988 return self._data
989
990 def set_data(self, data: bytes) -> None:
991 self._data = data
992
993 def hash_value_data(self) -> bytes:
994 data = super().hash_value_data()
995 data += self.get_data()
996 return data
997
998 def write_to_stream(
999 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1000 ) -> None:
1001 if encryption_key is not None: # deprecated
1002 deprecation_no_replacement(
1003 "the encryption_key parameter of write_to_stream", "5.0.0"
1004 )
1005 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data))
1006 DictionaryObject.write_to_stream(self, stream)
1007 del self[SA.LENGTH]
1008 stream.write(b"\nstream\n")
1009 stream.write(self._data)
1010 stream.write(b"\nendstream")
1011
1012 @staticmethod
1013 def initialize_from_dictionary(
1014 data: dict[str, Any]
1015 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1016 retval: Union[EncodedStreamObject, DecodedStreamObject]
1017 if SA.FILTER in data:
1018 retval = EncodedStreamObject()
1019 else:
1020 retval = DecodedStreamObject()
1021 retval._data = data["__streamdata__"]
1022 del data["__streamdata__"]
1023 if SA.LENGTH in data:
1024 del data[SA.LENGTH]
1025 retval.update(data)
1026 return retval
1027
1028 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1029 from ..filters import FlateDecode # noqa: PLC0415
1030
1031 if SA.FILTER in self:
1032 f = self[SA.FILTER]
1033 if isinstance(f, ArrayObject):
1034 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1035 try:
1036 params = ArrayObject(
1037 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())]
1038 )
1039 except TypeError:
1040 # case of error where the * operator is not working (not an array
1041 params = ArrayObject(
1042 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())]
1043 )
1044 else:
1045 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1046 params = ArrayObject(
1047 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())]
1048 )
1049 else:
1050 f = NameObject(FT.FLATE_DECODE)
1051 params = None
1052 retval = EncodedStreamObject()
1053 retval.update(self)
1054 retval[NameObject(SA.FILTER)] = f
1055 if params is not None:
1056 retval[NameObject(SA.DECODE_PARMS)] = params
1057 retval._data = FlateDecode.encode(self._data, level)
1058 return retval
1059
1060 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any:
1061 """
1062 Try to decode the stream object as an image
1063
1064 Args:
1065 pillow_parameters: parameters provided to Pillow Image.save() method,
1066 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save>
1067
1068 Returns:
1069 a PIL image if proper decoding has been found
1070 Raises:
1071 Exception: Errors during decoding will be reported.
1072 It is recommended to catch exceptions to prevent
1073 stops in your program.
1074
1075 """
1076 from ._image_xobject import _xobj_to_image # noqa: PLC0415
1077
1078 if self.get("/Subtype", "") != "/Image":
1079 try:
1080 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover
1081 except AttributeError:
1082 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover
1083 logger_warning(msg, __name__)
1084 extension, _, img = _xobj_to_image(self, pillow_parameters)
1085 if extension is None:
1086 return None # pragma: no cover
1087 return img
1088
1089
1090class DecodedStreamObject(StreamObject):
1091 pass
1092
1093
1094class EncodedStreamObject(StreamObject):
1095 def __init__(self) -> None:
1096 self.decoded_self: Optional[DecodedStreamObject] = None
1097
1098 # This overrides the parent method
1099 def get_data(self) -> bytes:
1100 from ..filters import decode_stream_data # noqa: PLC0415
1101
1102 if self.decoded_self is not None:
1103 # Cached version of decoded object
1104 return self.decoded_self.get_data()
1105
1106 # Create decoded object
1107 decoded = DecodedStreamObject()
1108 decoded.set_data(decode_stream_data(self))
1109 for key, value in self.items():
1110 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS):
1111 decoded[key] = value
1112 self.decoded_self = decoded
1113 return decoded.get_data()
1114
1115 # This overrides the parent method:
1116 def set_data(self, data: bytes) -> None:
1117 from ..filters import FlateDecode # noqa: PLC0415
1118
1119 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1120 if not isinstance(data, bytes):
1121 raise TypeError("Data must be bytes")
1122 if self.decoded_self is None:
1123 self.get_data() # to create self.decoded_self
1124 assert self.decoded_self is not None, "mypy"
1125 self.decoded_self.set_data(data)
1126 super().set_data(FlateDecode.encode(data))
1127 else:
1128 raise PdfReadError(
1129 "Streams encoded with a filter different from FlateDecode are not supported"
1130 )
1131
1132
1133class ContentStream(DecodedStreamObject):
1134 """
1135 In order to be fast, this data structure can contain either:
1136
1137 * raw data in ._data
1138 * parsed stream operations in ._operations.
1139
1140 At any time, ContentStream object can either have both of those fields defined,
1141 or one field defined and the other set to None.
1142
1143 These fields are "rebuilt" lazily, when accessed:
1144
1145 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1146 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1147
1148 Conversely, these fields can be invalidated:
1149
1150 * when .set_data() is called, ._operations is set to None.
1151 * when .operations is set, ._data is set to None.
1152 """
1153
1154 def __init__(
1155 self,
1156 stream: Any,
1157 pdf: Any,
1158 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1159 ) -> None:
1160 self.pdf = pdf
1161 self._operations: list[tuple[Any, bytes]] = []
1162
1163 # stream may be a StreamObject or an ArrayObject containing
1164 # StreamObjects to be concatenated together.
1165 if stream is None:
1166 super().set_data(b"")
1167 else:
1168 stream = stream.get_object()
1169 if isinstance(stream, ArrayObject):
1170 data = b""
1171 for s in stream:
1172 s_resolved = s.get_object()
1173 if isinstance(s_resolved, NullObject):
1174 continue
1175 if not isinstance(s_resolved, StreamObject):
1176 # No need to emit an exception here for now - the PDF structure
1177 # seems to already be broken beforehand in these cases.
1178 logger_warning(
1179 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.",
1180 __name__
1181 )
1182 else:
1183 data += s_resolved.get_data()
1184 if len(data) == 0 or data[-1] != b"\n":
1185 data += b"\n"
1186 super().set_data(bytes(data))
1187 else:
1188 stream_data = stream.get_data()
1189 assert stream_data is not None
1190 super().set_data(stream_data)
1191 self.forced_encoding = forced_encoding
1192
1193 def replicate(
1194 self,
1195 pdf_dest: PdfWriterProtocol,
1196 ) -> "ContentStream":
1197 d__ = cast(
1198 "ContentStream",
1199 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1200 )
1201 d__._data = self._data
1202 try:
1203 decoded_self = self.decoded_self
1204 if decoded_self is None:
1205 self.decoded_self = None
1206 else:
1207 self.decoded_self = cast(
1208 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1209 )
1210 except Exception:
1211 pass
1212 for k, v in self.items():
1213 d__[k.replicate(pdf_dest)] = (
1214 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1215 )
1216 return d__
1217 d__.set_data(self._data)
1218 d__.pdf = pdf_dest
1219 d__._operations = list(self._operations)
1220 d__.forced_encoding = self.forced_encoding
1221 return d__
1222
1223 def clone(
1224 self,
1225 pdf_dest: Any,
1226 force_duplicate: bool = False,
1227 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1228 ) -> "ContentStream":
1229 """
1230 Clone object into pdf_dest.
1231
1232 Args:
1233 pdf_dest:
1234 force_duplicate:
1235 ignore_fields:
1236
1237 Returns:
1238 The cloned ContentStream
1239
1240 """
1241 try:
1242 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
1243 return self
1244 except Exception:
1245 pass
1246
1247 visited: set[tuple[int, int]] = set()
1248 d__ = cast(
1249 "ContentStream",
1250 self._reference_clone(
1251 self.__class__(None, None), pdf_dest, force_duplicate
1252 ),
1253 )
1254 if ignore_fields is None:
1255 ignore_fields = []
1256 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1257 return d__
1258
1259 def _clone(
1260 self,
1261 src: DictionaryObject,
1262 pdf_dest: PdfWriterProtocol,
1263 force_duplicate: bool,
1264 ignore_fields: Optional[Sequence[Union[str, int]]],
1265 visited: set[tuple[int, int]],
1266 ) -> None:
1267 """
1268 Update the object from src.
1269
1270 Args:
1271 src:
1272 pdf_dest:
1273 force_duplicate:
1274 ignore_fields:
1275
1276 """
1277 src_cs = cast("ContentStream", src)
1278 super().set_data(src_cs._data)
1279 self.pdf = pdf_dest
1280 self._operations = list(src_cs._operations)
1281 self.forced_encoding = src_cs.forced_encoding
1282 # no need to call DictionaryObjection or anything
1283 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1284
1285 def _parse_content_stream(self, stream: StreamType) -> None:
1286 # 7.8.2 Content Streams
1287 stream.seek(0, 0)
1288 operands: list[Union[int, str, PdfObject]] = []
1289 while True:
1290 peek = read_non_whitespace(stream)
1291 if peek in (b"", 0):
1292 break
1293 stream.seek(-1, 1)
1294 if peek.isalpha() or peek in (b"'", b'"'):
1295 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1296 if operator == b"BI":
1297 # begin inline image - a completely different parsing
1298 # mechanism is required, of course... thanks buddy...
1299 assert operands == []
1300 ii = self._read_inline_image(stream)
1301 self._operations.append((ii, b"INLINE IMAGE"))
1302 else:
1303 self._operations.append((operands, operator))
1304 operands = []
1305 elif peek == b"%":
1306 # If we encounter a comment in the content stream, we have to
1307 # handle it here. Typically, read_object will handle
1308 # encountering a comment -- but read_object assumes that
1309 # following the comment must be the object we're trying to
1310 # read. In this case, it could be an operator instead.
1311 while peek not in (b"\r", b"\n", b""):
1312 peek = stream.read(1)
1313 else:
1314 operands.append(read_object(stream, None, self.forced_encoding))
1315
1316 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]:
1317 # begin reading just after the "BI" - begin image
1318 # first read the dictionary of settings.
1319 settings = DictionaryObject()
1320 while True:
1321 tok = read_non_whitespace(stream)
1322 stream.seek(-1, 1)
1323 if tok == b"I":
1324 # "ID" - begin of image data
1325 break
1326 key = read_object(stream, self.pdf)
1327 tok = read_non_whitespace(stream)
1328 stream.seek(-1, 1)
1329 value = read_object(stream, self.pdf)
1330 settings[key] = value
1331 # left at beginning of ID
1332 tmp = stream.read(3)
1333 assert tmp[:2] == b"ID"
1334 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1335 savpos = stream.tell()
1336 if isinstance(filtr, list):
1337 filtr = filtr[0] # used forencoding
1338 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1339 data = extract_inline__ascii_hex_decode(stream)
1340 elif "A85" in filtr or "ASCII85Decode" in filtr:
1341 data = extract_inline__ascii85_decode(stream)
1342 elif "RL" in filtr or "RunLengthDecode" in filtr:
1343 data = extract_inline__run_length_decode(stream)
1344 elif "DCT" in filtr or "DCTDecode" in filtr:
1345 data = extract_inline__dct_decode(stream)
1346 elif filtr == "not set":
1347 cs = settings.get("/CS", "")
1348 if isinstance(cs, list):
1349 cs = cs[0]
1350 if "RGB" in cs:
1351 lcs = 3
1352 elif "CMYK" in cs:
1353 lcs = 4
1354 else:
1355 bits = settings.get(
1356 "/BPC",
1357 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1358 )
1359 if bits > 0:
1360 lcs = bits / 8.0
1361 else:
1362 data = extract_inline_default(stream)
1363 lcs = -1
1364 if lcs > 0:
1365 data = stream.read(
1366 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1367 )
1368 # Move to the `EI` if possible.
1369 ei = read_non_whitespace(stream)
1370 stream.seek(-1, 1)
1371 else:
1372 data = extract_inline_default(stream)
1373
1374 ei = stream.read(3)
1375 stream.seek(-1, 1)
1376 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1377 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1378 stream.seek(savpos, 0)
1379 data = extract_inline_default(stream)
1380 ei = stream.read(3)
1381 stream.seek(-1, 1)
1382 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1383 # Check the same condition again. This should never fail as
1384 # edge cases are covered by `extract_inline_default` above,
1385 # but check this ot make sure that we are behind the `EI` afterwards.
1386 raise PdfStreamError(
1387 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1388 )
1389 return {"settings": settings, "data": data}
1390
1391 # This overrides the parent method
1392 def get_data(self) -> bytes:
1393 if not self._data:
1394 new_data = BytesIO()
1395 for operands, operator in self._operations:
1396 if operator == b"INLINE IMAGE":
1397 new_data.write(b"BI")
1398 dict_text = BytesIO()
1399 operands["settings"].write_to_stream(dict_text)
1400 new_data.write(dict_text.getvalue()[2:-2])
1401 new_data.write(b"ID ")
1402 new_data.write(operands["data"])
1403 new_data.write(b"EI")
1404 else:
1405 for op in operands:
1406 op.write_to_stream(new_data)
1407 new_data.write(b" ")
1408 new_data.write(operator)
1409 new_data.write(b"\n")
1410 self._data = new_data.getvalue()
1411 return self._data
1412
1413 # This overrides the parent method
1414 def set_data(self, data: bytes) -> None:
1415 super().set_data(data)
1416 self._operations = []
1417
1418 @property
1419 def operations(self) -> list[tuple[Any, bytes]]:
1420 if not self._operations and self._data:
1421 self._parse_content_stream(BytesIO(self._data))
1422 self._data = b""
1423 return self._operations
1424
1425 @operations.setter
1426 def operations(self, operations: list[tuple[Any, bytes]]) -> None:
1427 self._operations = operations
1428 self._data = b""
1429
1430 def isolate_graphics_state(self) -> None:
1431 if self._operations:
1432 self._operations.insert(0, ([], b"q"))
1433 self._operations.append(([], b"Q"))
1434 elif self._data:
1435 self._data = b"q\n" + self._data + b"\nQ\n"
1436
1437 # This overrides the parent method
1438 def write_to_stream(
1439 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1440 ) -> None:
1441 if not self._data and self._operations:
1442 self.get_data() # this ensures ._data is rebuilt
1443 super().write_to_stream(stream, encryption_key)
1444
1445
1446def read_object(
1447 stream: StreamType,
1448 pdf: Optional[PdfReaderProtocol],
1449 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1450) -> Union[PdfObject, int, str, ContentStream]:
1451 tok = stream.read(1)
1452 stream.seek(-1, 1) # reset to start
1453 if tok == b"/":
1454 return NameObject.read_from_stream(stream, pdf)
1455 if tok == b"<":
1456 # hexadecimal string OR dictionary
1457 peek = stream.read(2)
1458 stream.seek(-2, 1) # reset to start
1459 if peek == b"<<":
1460 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1461 return read_hex_string_from_stream(stream, forced_encoding)
1462 if tok == b"[":
1463 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1464 if tok in (b"t", b"f"):
1465 return BooleanObject.read_from_stream(stream)
1466 if tok == b"(":
1467 return read_string_from_stream(stream, forced_encoding)
1468 if tok == b"e" and stream.read(6) == b"endobj":
1469 return NullObject()
1470 if tok == b"n":
1471 return NullObject.read_from_stream(stream)
1472 if tok == b"%":
1473 # comment
1474 skip_over_comment(stream)
1475 tok = read_non_whitespace(stream)
1476 stream.seek(-1, 1)
1477 return read_object(stream, pdf, forced_encoding)
1478 if tok in b"0123456789+-.":
1479 # number object OR indirect reference
1480 peek = stream.read(20)
1481 stream.seek(-len(peek), 1) # reset to start
1482 if IndirectPattern.match(peek) is not None:
1483 assert pdf is not None, "mypy"
1484 return IndirectObject.read_from_stream(stream, pdf)
1485 return NumberObject.read_from_stream(stream)
1486 pos = stream.tell()
1487 stream.seek(-20, 1)
1488 stream_extract = stream.read(80)
1489 stream.seek(pos)
1490 read_until_whitespace(stream)
1491 raise PdfReadError(
1492 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1493 )
1494
1495
1496class Field(TreeObject):
1497 """
1498 A class representing a field dictionary.
1499
1500 This class is accessed through
1501 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1502 """
1503
1504 def __init__(self, data: DictionaryObject) -> None:
1505 DictionaryObject.__init__(self)
1506 field_attributes = (
1507 FieldDictionaryAttributes.attributes()
1508 + CheckboxRadioButtonAttributes.attributes()
1509 )
1510 self.indirect_reference = data.indirect_reference
1511 for attr in field_attributes:
1512 try:
1513 self[NameObject(attr)] = data[attr]
1514 except KeyError:
1515 pass
1516 if isinstance(self.get("/V"), EncodedStreamObject):
1517 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1518 if isinstance(d, bytes):
1519 d_str = d.decode()
1520 elif d is None:
1521 d_str = ""
1522 else:
1523 raise Exception("Should never happen")
1524 self[NameObject("/V")] = TextStringObject(d_str)
1525
1526 # TABLE 8.69 Entries common to all field dictionaries
1527 @property
1528 def field_type(self) -> Optional[NameObject]:
1529 """Read-only property accessing the type of this field."""
1530 return self.get(FieldDictionaryAttributes.FT)
1531
1532 @property
1533 def parent(self) -> Optional[DictionaryObject]:
1534 """Read-only property accessing the parent of this field."""
1535 return self.get(FieldDictionaryAttributes.Parent)
1536
1537 @property
1538 def kids(self) -> Optional["ArrayObject"]:
1539 """Read-only property accessing the kids of this field."""
1540 return self.get(FieldDictionaryAttributes.Kids)
1541
1542 @property
1543 def name(self) -> Optional[str]:
1544 """Read-only property accessing the name of this field."""
1545 return self.get(FieldDictionaryAttributes.T)
1546
1547 @property
1548 def alternate_name(self) -> Optional[str]:
1549 """Read-only property accessing the alternate name of this field."""
1550 return self.get(FieldDictionaryAttributes.TU)
1551
1552 @property
1553 def mapping_name(self) -> Optional[str]:
1554 """
1555 Read-only property accessing the mapping name of this field.
1556
1557 This name is used by pypdf as a key in the dictionary returned by
1558 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1559 """
1560 return self.get(FieldDictionaryAttributes.TM)
1561
1562 @property
1563 def flags(self) -> Optional[int]:
1564 """
1565 Read-only property accessing the field flags, specifying various
1566 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1567 """
1568 return self.get(FieldDictionaryAttributes.Ff)
1569
1570 @property
1571 def value(self) -> Optional[Any]:
1572 """
1573 Read-only property accessing the value of this field.
1574
1575 Format varies based on field type.
1576 """
1577 return self.get(FieldDictionaryAttributes.V)
1578
1579 @property
1580 def default_value(self) -> Optional[Any]:
1581 """Read-only property accessing the default value of this field."""
1582 return self.get(FieldDictionaryAttributes.DV)
1583
1584 @property
1585 def additional_actions(self) -> Optional[DictionaryObject]:
1586 """
1587 Read-only property accessing the additional actions dictionary.
1588
1589 This dictionary defines the field's behavior in response to trigger
1590 events. See Section 8.5.2 of the PDF 1.7 reference.
1591 """
1592 return self.get(FieldDictionaryAttributes.AA)
1593
1594
1595class Destination(TreeObject):
1596 """
1597 A class representing a destination within a PDF file.
1598
1599 See section 12.3.2 of the PDF 2.0 reference.
1600
1601 Args:
1602 title: Title of this destination.
1603 page: Reference to the page of this destination. Should
1604 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1605 fit: How the destination is displayed.
1606
1607 Raises:
1608 PdfReadError: If destination type is invalid.
1609
1610 """
1611
1612 node: Optional[
1613 DictionaryObject
1614 ] = None # node provide access to the original Object
1615
1616 def __init__(
1617 self,
1618 title: Union[str, bytes],
1619 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1620 fit: Fit,
1621 ) -> None:
1622 self._filtered_children: list[Any] = [] # used in PdfWriter
1623
1624 typ = fit.fit_type
1625 args = fit.fit_args
1626
1627 DictionaryObject.__init__(self)
1628 self[NameObject("/Title")] = TextStringObject(title)
1629 self[NameObject("/Page")] = page
1630 self[NameObject("/Type")] = typ
1631
1632 # from table 8.2 of the PDF 1.7 reference.
1633 if typ == "/XYZ":
1634 if len(args) < 1: # left is missing : should never occur
1635 args.append(NumberObject(0.0))
1636 if len(args) < 2: # top is missing
1637 args.append(NumberObject(0.0))
1638 if len(args) < 3: # zoom is missing
1639 args.append(NumberObject(0.0))
1640 (
1641 self[NameObject(TA.LEFT)],
1642 self[NameObject(TA.TOP)],
1643 self[NameObject("/Zoom")],
1644 ) = args
1645 elif len(args) == 0:
1646 pass
1647 elif typ == TF.FIT_R:
1648 (
1649 self[NameObject(TA.LEFT)],
1650 self[NameObject(TA.BOTTOM)],
1651 self[NameObject(TA.RIGHT)],
1652 self[NameObject(TA.TOP)],
1653 ) = args
1654 elif typ in [TF.FIT_H, TF.FIT_BH]:
1655 try: # Prefer to be more robust not only to null parameters
1656 (self[NameObject(TA.TOP)],) = args
1657 except Exception:
1658 (self[NameObject(TA.TOP)],) = (NullObject(),)
1659 elif typ in [TF.FIT_V, TF.FIT_BV]:
1660 try: # Prefer to be more robust not only to null parameters
1661 (self[NameObject(TA.LEFT)],) = args
1662 except Exception:
1663 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1664 elif typ in [TF.FIT, TF.FIT_B]:
1665 pass
1666 else:
1667 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1668
1669 @property
1670 def dest_array(self) -> "ArrayObject":
1671 return ArrayObject(
1672 [self.raw_get("/Page"), self["/Type"]]
1673 + [
1674 self[x]
1675 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1676 if x in self
1677 ]
1678 )
1679
1680 def write_to_stream(
1681 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1682 ) -> None:
1683 if encryption_key is not None: # deprecated
1684 deprecation_no_replacement(
1685 "the encryption_key parameter of write_to_stream", "5.0.0"
1686 )
1687 stream.write(b"<<\n")
1688 key = NameObject("/D")
1689 key.write_to_stream(stream)
1690 stream.write(b" ")
1691 value = self.dest_array
1692 value.write_to_stream(stream)
1693
1694 key = NameObject("/S")
1695 key.write_to_stream(stream)
1696 stream.write(b" ")
1697 value_s = NameObject("/GoTo")
1698 value_s.write_to_stream(stream)
1699
1700 stream.write(b"\n")
1701 stream.write(b">>")
1702
1703 @property
1704 def title(self) -> Optional[str]:
1705 """Read-only property accessing the destination title."""
1706 return self.get("/Title")
1707
1708 @property
1709 def page(self) -> Optional[IndirectObject]:
1710 """Read-only property accessing the IndirectObject of the destination page."""
1711 return self.get("/Page")
1712
1713 @property
1714 def typ(self) -> Optional[str]:
1715 """Read-only property accessing the destination type."""
1716 return self.get("/Type")
1717
1718 @property
1719 def zoom(self) -> Optional[int]:
1720 """Read-only property accessing the zoom factor."""
1721 return self.get("/Zoom", None)
1722
1723 @property
1724 def left(self) -> Optional[FloatObject]:
1725 """Read-only property accessing the left horizontal coordinate."""
1726 return self.get("/Left", None)
1727
1728 @property
1729 def right(self) -> Optional[FloatObject]:
1730 """Read-only property accessing the right horizontal coordinate."""
1731 return self.get("/Right", None)
1732
1733 @property
1734 def top(self) -> Optional[FloatObject]:
1735 """Read-only property accessing the top vertical coordinate."""
1736 return self.get("/Top", None)
1737
1738 @property
1739 def bottom(self) -> Optional[FloatObject]:
1740 """Read-only property accessing the bottom vertical coordinate."""
1741 return self.get("/Bottom", None)
1742
1743 @property
1744 def color(self) -> Optional["ArrayObject"]:
1745 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1746 return self.get(
1747 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])
1748 )
1749
1750 @property
1751 def font_format(self) -> Optional[OutlineFontFlag]:
1752 """
1753 Read-only property accessing the font type.
1754
1755 1=italic, 2=bold, 3=both
1756 """
1757 return self.get("/F", 0)
1758
1759 @property
1760 def outline_count(self) -> Optional[int]:
1761 """
1762 Read-only property accessing the outline count.
1763
1764 positive = expanded
1765 negative = collapsed
1766 absolute value = number of visible descendants at all levels
1767 """
1768 return self.get("/Count", None)