1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from collections.abc import Iterable, Sequence
36from io import BytesIO
37from math import ceil
38from typing import (
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
45
46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
47from .._utils import (
48 WHITESPACES,
49 BinaryStreamType,
50 StreamType,
51 deprecation_no_replacement,
52 logger_warning,
53 read_non_whitespace,
54 read_until_regex,
55 read_until_whitespace,
56 skip_over_comment,
57)
58from ..constants import (
59 CheckboxRadioButtonAttributes,
60 FieldDictionaryAttributes,
61 OutlineFontFlag,
62 StreamAttributes,
63)
64from ..constants import FilterTypes as FT
65from ..constants import TypArguments as TA
66from ..constants import TypFitArguments as TF
67from ..errors import STREAM_TRUNCATED_PREMATURELY, LimitReachedError, PdfReadError, PdfStreamError
68from ._base import (
69 BooleanObject,
70 ByteStringObject,
71 FloatObject,
72 IndirectObject,
73 NameObject,
74 NullObject,
75 NumberObject,
76 PdfObject,
77 TextStringObject,
78 is_null_or_none,
79)
80from ._fit import Fit
81from ._image_inline import (
82 extract_inline__ascii85_decode,
83 extract_inline__ascii_hex_decode,
84 extract_inline__dct_decode,
85 extract_inline__run_length_decode,
86 extract_inline_default,
87)
88from ._utils import read_hex_string_from_stream, read_string_from_stream
89
90if sys.version_info >= (3, 11):
91 from typing import Self
92else:
93 from typing_extensions import Self
94
95logger = logging.getLogger(__name__)
96
97IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
98
99
100class ArrayObject(list[Any], PdfObject):
101 def replicate(
102 self,
103 pdf_dest: PdfWriterProtocol,
104 ) -> "ArrayObject":
105 arr = cast(
106 "ArrayObject",
107 self._reference_clone(ArrayObject(), pdf_dest, False),
108 )
109 for data in self:
110 if hasattr(data, "replicate"):
111 arr.append(data.replicate(pdf_dest))
112 else:
113 arr.append(data)
114 return arr
115
116 def clone(
117 self,
118 pdf_dest: PdfWriterProtocol,
119 force_duplicate: bool = False,
120 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
121 ) -> "ArrayObject":
122 """Clone object into pdf_dest."""
123 try:
124 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore[union-attr]
125 return self
126 except Exception:
127 pass
128 arr = cast(
129 "ArrayObject",
130 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate=True),
131 )
132 for data in self:
133 if isinstance(data, StreamObject):
134 dup = data._reference_clone(
135 data.clone(pdf_dest, force_duplicate, ignore_fields),
136 pdf_dest,
137 force_duplicate,
138 )
139 arr.append(dup.indirect_reference)
140 elif isinstance(data, IndirectObject) and isinstance(resolved := data.get_object(), StreamObject):
141 dup = data._reference_clone(
142 resolved.clone(pdf_dest, force_duplicate=True, ignore_fields=ignore_fields),
143 pdf_dest,
144 force_duplicate,
145 )
146 arr.append(dup.indirect_reference)
147 elif hasattr(data, "clone"):
148 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
149 else:
150 arr.append(data)
151 return arr
152
153 def hash_bin(self) -> int:
154 """
155 Used to detect modified object.
156
157 Returns:
158 Hash considering type and value.
159
160 """
161 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
162
163 def items(self) -> Iterable[Any]:
164 """Emulate DictionaryObject.items for a list (index, object)."""
165 return enumerate(self)
166
167 def _to_lst(self, lst: Any) -> list[Any]:
168 # Convert to list, internal
169 result: list[Any]
170 if isinstance(lst, (list, tuple, set)):
171 result = list(lst)
172 elif isinstance(lst, PdfObject):
173 result = [lst]
174 elif isinstance(lst, str):
175 if lst[0] == "/":
176 result = [NameObject(lst)]
177 else:
178 result = [TextStringObject(lst)]
179 elif isinstance(lst, bytes):
180 result = [ByteStringObject(lst)]
181 else: # for numbers,...
182 result = [lst]
183 return result
184
185 def __add__(self, lst: Any) -> "ArrayObject":
186 """
187 Allow extension by adding list or add one element only
188
189 Args:
190 lst: any list, tuples are extended the list.
191 other types(numbers,...) will be appended.
192 if str is passed it will be converted into TextStringObject
193 or NameObject (if starting with "/")
194 if bytes is passed it will be converted into ByteStringObject
195
196 Returns:
197 ArrayObject with all elements
198
199 """
200 temp = ArrayObject(self)
201 temp.extend(self._to_lst(lst))
202 return temp
203
204 def __iadd__(self, lst: Any) -> Self:
205 """
206 Allow extension by adding list or add one element only
207
208 Args:
209 lst: any list, tuples are extended the list.
210 other types(numbers,...) will be appended.
211 if str is passed it will be converted into TextStringObject
212 or NameObject (if starting with "/")
213 if bytes is passed it will be converted into ByteStringObject
214
215 """
216 self.extend(self._to_lst(lst))
217 return self
218
219 def __isub__(self, lst: Any) -> Self:
220 """Allow to remove items"""
221 for x in self._to_lst(lst):
222 try:
223 index = self.index(x)
224 del self[index]
225 except ValueError:
226 pass
227 return self
228
229 def write_to_stream(
230 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
231 ) -> None:
232 if encryption_key is not None: # deprecated
233 deprecation_no_replacement(
234 "the encryption_key parameter of write_to_stream", "5.0.0"
235 )
236 stream.write(b"[")
237 for data in self:
238 stream.write(b" ")
239 data.write_to_stream(stream)
240 stream.write(b" ]")
241
242 @staticmethod
243 def read_from_stream(
244 stream: StreamType,
245 pdf: Optional[PdfReaderProtocol],
246 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
247 ) -> "ArrayObject":
248 arr = ArrayObject()
249 tmp = stream.read(1)
250 if tmp != b"[":
251 raise PdfReadError("Could not read array")
252 while True:
253 # skip leading whitespace
254 tok = stream.read(1)
255 while tok.isspace():
256 tok = stream.read(1)
257 if tok == b"":
258 break
259 if tok == b"%":
260 stream.seek(-1, 1)
261 skip_over_comment(stream)
262 continue
263 stream.seek(-1, 1)
264 # check for array ending
265 peek_ahead = stream.read(1)
266 if peek_ahead == b"]":
267 break
268 stream.seek(-1, 1)
269 # read and append object
270 arr.append(read_object(stream, pdf, forced_encoding))
271 return arr
272
273
274class DictionaryObject(dict[Any, Any], PdfObject):
275 def replicate(
276 self,
277 pdf_dest: PdfWriterProtocol,
278 ) -> "DictionaryObject":
279 d__ = cast(
280 "DictionaryObject",
281 self._reference_clone(self.__class__(), pdf_dest, False),
282 )
283 for k, v in self.items():
284 d__[k.replicate(pdf_dest)] = (
285 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
286 )
287 return d__
288
289 def clone(
290 self,
291 pdf_dest: PdfWriterProtocol,
292 force_duplicate: bool = False,
293 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
294 ) -> "DictionaryObject":
295 """Clone object into pdf_dest."""
296 try:
297 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore[union-attr]
298 return self
299 except Exception:
300 pass
301
302 visited: set[tuple[int, int]] = set() # (idnum, generation)
303 d__ = cast(
304 "DictionaryObject",
305 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
306 )
307 if ignore_fields is None:
308 ignore_fields = []
309 if len(d__.keys()) == 0:
310 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
311 return d__
312
313 def _clone(
314 self,
315 src: "DictionaryObject",
316 pdf_dest: PdfWriterProtocol,
317 force_duplicate: bool,
318 ignore_fields: Optional[Sequence[Union[str, int]]],
319 visited: set[tuple[int, int]], # (idnum, generation)
320 ) -> None:
321 """
322 Update the object from src.
323
324 Args:
325 src: "DictionaryObject":
326 pdf_dest:
327 force_duplicate:
328 ignore_fields:
329
330 """
331 # First we remove the ignore_fields
332 # that are for a limited number of levels
333 assert ignore_fields is not None
334 ignore_fields = list(ignore_fields)
335 x = 0
336 while x < len(ignore_fields):
337 if isinstance(ignore_fields[x], int):
338 if cast(int, ignore_fields[x]) <= 0:
339 del ignore_fields[x]
340 del ignore_fields[x]
341 continue
342 ignore_fields[x] -= 1 # type:ignore
343 x += 1
344 # Check if this is a chain list, we need to loop to prevent recur
345 if any(
346 field not in ignore_fields
347 and field in src
348 and isinstance(src.raw_get(field), IndirectObject)
349 and isinstance(src[field], DictionaryObject)
350 and (
351 src.get("/Type", None) is None
352 or cast(DictionaryObject, src[field]).get("/Type", None) is None
353 or src.get("/Type", None)
354 == cast(DictionaryObject, src[field]).get("/Type", None)
355 )
356 for field in ["/Next", "/Prev", "/N", "/V"]
357 ):
358 ignore_fields = list(ignore_fields)
359 for lst in (("/Next", "/Prev"), ("/N", "/V")):
360 for k in lst:
361 objs = []
362 if (
363 k in src
364 and k not in self
365 and isinstance(src.raw_get(k), IndirectObject)
366 and isinstance(src[k], DictionaryObject)
367 # If need to go further the idea is to check
368 # that the types are the same
369 and (
370 src.get("/Type", None) is None
371 or cast(DictionaryObject, src[k]).get("/Type", None) is None
372 or src.get("/Type", None)
373 == cast(DictionaryObject, src[k]).get("/Type", None)
374 )
375 ):
376 cur_obj: Optional[DictionaryObject] = cast(
377 "DictionaryObject", src[k]
378 )
379 prev_obj: Optional[DictionaryObject] = self
380 while cur_obj is not None:
381 clon = cast(
382 "DictionaryObject",
383 cur_obj._reference_clone(
384 cur_obj.__class__(), pdf_dest, force_duplicate
385 ),
386 )
387 # Check to see if we've previously processed our item
388 if clon.indirect_reference is not None:
389 idnum = clon.indirect_reference.idnum
390 generation = clon.indirect_reference.generation
391 if (idnum, generation) in visited:
392 cur_obj = None
393 break
394 visited.add((idnum, generation))
395 objs.append((cur_obj, clon))
396 assert prev_obj is not None
397 prev_obj[NameObject(k)] = clon.indirect_reference
398 prev_obj = clon
399 try:
400 if cur_obj == src:
401 cur_obj = None
402 else:
403 cur_obj = cast("DictionaryObject", cur_obj[k])
404 except Exception:
405 cur_obj = None
406 for s, c in objs:
407 c._clone(
408 s, pdf_dest, force_duplicate, ignore_fields, visited
409 )
410
411 for k, v in src.items():
412 if k not in ignore_fields:
413 if isinstance(v, StreamObject):
414 if not hasattr(v, "indirect_reference"):
415 v.indirect_reference = None
416 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
417 assert vv.indirect_reference is not None
418 self[k.clone(pdf_dest)] = vv.indirect_reference
419 elif k not in self:
420 self[NameObject(k)] = (
421 v.clone(pdf_dest, force_duplicate, ignore_fields)
422 if hasattr(v, "clone")
423 else v
424 )
425
426 def hash_bin(self) -> int:
427 """
428 Used to detect modified object.
429
430 Returns:
431 Hash considering type and value.
432
433 """
434 return hash(
435 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
436 )
437
438 def raw_get(self, key: Any) -> Any:
439 return dict.__getitem__(self, key)
440
441 def get_inherited(self, key: str, default: Any = None) -> Any:
442 """
443 Returns the value of a key or from the parent if not found.
444 If not found returns default.
445
446 Args:
447 key: string identifying the field to return
448
449 default: default value to return
450
451 Returns:
452 Current key or inherited one, otherwise default value.
453
454 """
455 current = self
456 visited: set[int] = set()
457
458 while True:
459 # Detect cyclic parent references
460 obj_id = id(current)
461 if obj_id in visited:
462 raise LimitReachedError(f"Detected cycle in /Parent hierarchy when retrieving value for key {key!r}.")
463 visited.add(obj_id)
464
465 if key in current:
466 return current[key]
467
468 if "/Parent" not in current:
469 return default
470
471 # Walk upward
472 current = cast(
473 "DictionaryObject",
474 current["/Parent"].get_object(),
475 )
476
477 def __setitem__(self, key: Any, value: Any) -> Any:
478 if not isinstance(key, PdfObject):
479 raise ValueError("Key must be a PdfObject")
480 if not isinstance(value, PdfObject):
481 raise ValueError("Value must be a PdfObject")
482 return dict.__setitem__(self, key, value)
483
484 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
485 if not isinstance(key, PdfObject):
486 raise ValueError("Key must be a PdfObject")
487 if not isinstance(value, PdfObject):
488 raise ValueError("Value must be a PdfObject")
489 return dict.setdefault(self, key, value)
490
491 def __getitem__(self, key: Any) -> PdfObject:
492 return cast(PdfObject, dict.__getitem__(self, key).get_object())
493
494 @property
495 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
496 """
497 Retrieve XMP (Extensible Metadata Platform) data relevant to this
498 object, if available.
499
500 See Table 347 — Additional entries in a metadata stream dictionary.
501
502 Returns:
503 Returns a :class:`~pypdf.xmp.XmpInformation` instance
504 that can be used to access XMP metadata from the document. Can also
505 return None if no metadata was found on the document root.
506
507 """
508 from ..xmp import XmpInformation # noqa: PLC0415
509
510 metadata = self.get("/Metadata", None)
511 if is_null_or_none(metadata):
512 return None
513 assert metadata is not None, "mypy"
514 metadata = metadata.get_object()
515 return XmpInformation(metadata)
516
517 def write_to_stream(
518 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
519 ) -> None:
520 if encryption_key is not None: # deprecated
521 deprecation_no_replacement(
522 "the encryption_key parameter of write_to_stream", "5.0.0"
523 )
524 stream.write(b"<<\n")
525 for key, value in self.items():
526 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
527 continue
528 key.write_to_stream(stream, encryption_key)
529 stream.write(b" ")
530 value.write_to_stream(stream)
531 stream.write(b"\n")
532 stream.write(b">>")
533
534 @classmethod
535 def _get_next_object_position(
536 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol
537 ) -> int:
538 out = position_end
539 for generation in generations:
540 location = pdf.xref[generation]
541 values = [x for x in location.values() if position_before < x <= position_end]
542 if values:
543 out = min(out, *values)
544 return out
545
546 @classmethod
547 def _read_unsized_from_stream(
548 cls, stream: BinaryStreamType, pdf: PdfReaderProtocol
549 ) -> bytes:
550 object_position = cls._get_next_object_position(
551 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf
552 ) - 1
553 current_position = stream.tell()
554 # Read until the next object position.
555 read_value = stream.read(object_position - stream.tell())
556 endstream_position = read_value.find(b"endstream")
557 if endstream_position < 0:
558 raise PdfReadError(
559 f"Unable to find 'endstream' marker for obj starting at {current_position}."
560 )
561 # 9 = len(b"endstream")
562 stream.seek(current_position + endstream_position + 9)
563 return read_value[: endstream_position - 1]
564
565 @staticmethod
566 def read_from_stream(
567 stream: StreamType,
568 pdf: Optional[PdfReaderProtocol],
569 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
570 ) -> "DictionaryObject":
571 tmp = stream.read(2)
572 if tmp != b"<<":
573 raise PdfReadError(
574 f"Dictionary read error at byte {hex(stream.tell())}: "
575 "stream must begin with '<<'"
576 )
577 data: dict[Any, Any] = {}
578 while True:
579 tok = read_non_whitespace(stream)
580 if tok == b"\x00":
581 continue
582 if tok == b"%":
583 stream.seek(-1, 1)
584 skip_over_comment(stream)
585 continue
586 if not tok:
587 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
588
589 if tok == b">":
590 stream.read(1)
591 break
592 stream.seek(-1, 1)
593 try:
594 try:
595 key = read_object(stream, pdf)
596 if isinstance(key, NullObject):
597 break
598 if not isinstance(key, NameObject):
599 raise PdfReadError(
600 f"Expecting a NameObject for key but found {key!r}"
601 )
602 except PdfReadError as exc:
603 if pdf is not None and pdf.strict:
604 raise
605 logger_warning("%(exception)r", source=__name__, exception=exc)
606 continue
607 tok = read_non_whitespace(stream)
608 stream.seek(-1, 1)
609 value = read_object(stream, pdf, forced_encoding)
610 except (RecursionError, LimitReachedError) as exc:
611 raise PdfReadError(exc.__repr__())
612 except Exception as exc:
613 if pdf is not None and pdf.strict:
614 raise PdfReadError(exc.__repr__())
615 logger_warning("%(exception)r", source=__name__, exception=exc)
616 retval = DictionaryObject()
617 retval.update(data)
618 return retval # return partial data
619
620 if not data.get(key):
621 data[key] = value
622 else:
623 # multiple definitions of key not permitted
624 msg = (
625 "Multiple definitions in dictionary at byte "
626 "%(position)s for key %(key)s"
627 )
628 values = {"position": hex(stream.tell()), "key": key}
629 if pdf is not None and pdf.strict:
630 raise PdfReadError(msg % values)
631 logger_warning(msg, source=__name__, **values)
632
633 pos = stream.tell()
634 s = read_non_whitespace(stream)
635 if s == b"s" and stream.read(5) == b"tream":
636 eol = stream.read(1)
637 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
638 # patch provided by Danial Sandler
639 while eol == b" ":
640 eol = stream.read(1)
641 if eol not in (b"\n", b"\r"):
642 raise PdfStreamError("Stream data must be followed by a newline")
643 if eol == b"\r" and stream.read(1) != b"\n":
644 stream.seek(-1, 1)
645 # this is a stream object, not a dictionary
646 if StreamAttributes.LENGTH not in data:
647 if pdf is not None and pdf.strict:
648 raise PdfStreamError("Stream length not defined")
649 logger_warning(
650 "Stream length not defined @pos=%(position)d",
651 source=__name__,
652 position=stream.tell(),
653 )
654 data[NameObject(StreamAttributes.LENGTH)] = NumberObject(-1)
655 length = data[StreamAttributes.LENGTH]
656 if isinstance(length, IndirectObject):
657 t = stream.tell()
658 assert pdf is not None, "mypy"
659 length = pdf.get_object(length)
660 stream.seek(t, 0)
661 if length is None: # if the PDF is damaged
662 length = -1
663 pstart = stream.tell()
664 if length >= 0:
665 from ..filters import MAX_DECLARED_STREAM_LENGTH # noqa: PLC0415
666 if length > MAX_DECLARED_STREAM_LENGTH:
667 raise LimitReachedError(f"Declared stream length of {length} exceeds maximum allowed length.")
668
669 data["__streamdata__"] = stream.read(length)
670 else:
671 data["__streamdata__"] = read_until_regex(
672 stream, re.compile(b"endstream")
673 )
674 e = read_non_whitespace(stream)
675 ndstream = stream.read(8)
676 if (e + ndstream) != b"endstream":
677 # the odd PDF file has a length that is too long, so
678 # we need to read backwards to find the "endstream" ending.
679 # ReportLab (unknown version) generates files with this bug,
680 # and Python users into PDF files tend to be our audience.
681 # we need to do this to correct the streamdata and chop off
682 # an extra character.
683 pos = stream.tell()
684 stream.seek(-10, 1)
685 end = stream.read(9)
686 if end == b"endstream":
687 # we found it by looking back one character further.
688 data["__streamdata__"] = data["__streamdata__"][:-1]
689 elif pdf is not None and not pdf.strict:
690 stream.seek(pstart, 0)
691 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf)
692 pos = stream.tell()
693 else:
694 stream.seek(pos, 0)
695 raise PdfReadError(
696 "Unable to find 'endstream' marker after stream at byte "
697 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
698 )
699 else:
700 stream.seek(pos, 0)
701 if "__streamdata__" in data:
702 return StreamObject.initialize_from_dictionary(data)
703 retval = DictionaryObject()
704 retval.update(data)
705 return retval
706
707
708class TreeObject(DictionaryObject):
709 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
710 DictionaryObject.__init__(self)
711 if dct:
712 self.update(dct)
713
714 def has_children(self) -> bool:
715 return "/First" in self
716
717 def __iter__(self) -> Any:
718 return self.children()
719
720 def children(self) -> Iterable[Any]:
721 if not self.has_children():
722 return
723
724 child_ref = self[NameObject("/First")]
725 last = self[NameObject("/Last")]
726 child = child_ref.get_object()
727 visited: set[int] = set()
728 while True:
729 child_id = id(child)
730 if child_id in visited:
731 logger_warning("Detected cycle in outline structure for %(child)s", source=__name__, child=child)
732 return
733 visited.add(child_id)
734
735 yield child
736
737 if child == last:
738 return
739 child_ref = child.get(NameObject("/Next")) # type: ignore[union-attr]
740 if is_null_or_none(child_ref):
741 return
742 child = child_ref.get_object()
743
744 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
745 self.insert_child(child, None, pdf)
746
747 def inc_parent_counter_default(
748 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
749 ) -> None:
750 if is_null_or_none(parent):
751 return
752 assert parent is not None, "mypy"
753 parent = cast("TreeObject", parent.get_object())
754 if "/Count" in parent:
755 parent[NameObject("/Count")] = NumberObject(
756 max(0, cast(int, parent[NameObject("/Count")]) + n)
757 )
758 self.inc_parent_counter_default(parent.get("/Parent", None), n)
759
760 def inc_parent_counter_outline(
761 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
762 ) -> None:
763 if is_null_or_none(parent):
764 return
765 assert parent is not None, "mypy"
766 parent = cast("TreeObject", parent.get_object())
767 # BooleanObject requires comparison with == not is
768 opn = parent.get("/%is_open%", True) == True # noqa: E712
769 c = cast(int, parent.get("/Count", 0))
770 if c < 0:
771 c = abs(c)
772 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
773 if not opn:
774 return
775 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
776
777 def insert_child(
778 self,
779 child: Any,
780 before: Any,
781 pdf: PdfWriterProtocol,
782 inc_parent_counter: Optional[Callable[..., Any]] = None,
783 ) -> IndirectObject:
784 if inc_parent_counter is None:
785 inc_parent_counter = self.inc_parent_counter_default
786 child_obj = child.get_object()
787 assert child.indirect_reference is not None, "mypy"
788 child_reference: IndirectObject = child.indirect_reference
789
790 prev: Optional[DictionaryObject]
791 if "/First" not in self: # no child yet
792 self[NameObject("/First")] = child_reference
793 self[NameObject("/Count")] = NumberObject(0)
794 self[NameObject("/Last")] = child_reference
795 child_obj[NameObject("/Parent")] = self.indirect_reference
796 inc_parent_counter(self, child_obj.get("/Count", 1))
797 if "/Next" in child_obj:
798 del child_obj["/Next"]
799 if "/Prev" in child_obj:
800 del child_obj["/Prev"]
801 return child_reference
802 prev = cast("DictionaryObject", self["/Last"])
803
804 while prev.indirect_reference != before:
805 if "/Next" in prev:
806 prev = cast("TreeObject", prev["/Next"])
807 else: # append at the end
808 prev[NameObject("/Next")] = cast("TreeObject", child_reference)
809 child_obj[NameObject("/Prev")] = prev.indirect_reference
810 child_obj[NameObject("/Parent")] = self.indirect_reference
811 if "/Next" in child_obj:
812 del child_obj["/Next"]
813 self[NameObject("/Last")] = child_reference
814 inc_parent_counter(self, child_obj.get("/Count", 1))
815 return child_reference
816 try: # insert as first or in the middle
817 assert isinstance(prev["/Prev"], DictionaryObject)
818 prev["/Prev"][NameObject("/Next")] = child_reference
819 child_obj[NameObject("/Prev")] = prev["/Prev"]
820 except Exception: # it means we are inserting in first position
821 child_obj.pop("/Next", None)
822 child_obj[NameObject("/Next")] = prev
823 prev[NameObject("/Prev")] = child_reference
824 child_obj[NameObject("/Parent")] = self.indirect_reference
825 inc_parent_counter(self, child_obj.get("/Count", 1))
826 return child_reference
827
828 def _remove_node_from_tree(
829 self, prev: Any, prev_ref: Any, cur: Any, last: Any
830 ) -> None:
831 """
832 Adjust the pointers of the linked list and tree node count.
833
834 Args:
835 prev:
836 prev_ref:
837 cur:
838 last:
839
840 """
841 next_ref = cur.get(NameObject("/Next"), None)
842 if prev is None:
843 if next_ref:
844 # Removing first tree node
845 next_obj = next_ref.get_object()
846 del next_obj[NameObject("/Prev")]
847 self[NameObject("/First")] = next_ref
848 self[NameObject("/Count")] = NumberObject(
849 self[NameObject("/Count")] - 1 # type: ignore[operator]
850 )
851
852 else:
853 # Removing only tree node
854 self[NameObject("/Count")] = NumberObject(0)
855 del self[NameObject("/First")]
856 if NameObject("/Last") in self:
857 del self[NameObject("/Last")]
858 else:
859 if next_ref:
860 # Removing middle tree node
861 next_obj = next_ref.get_object()
862 next_obj[NameObject("/Prev")] = prev_ref
863 prev[NameObject("/Next")] = next_ref
864 else:
865 # Removing last tree node
866 assert cur == last
867 del prev[NameObject("/Next")]
868 self[NameObject("/Last")] = prev_ref
869 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore[operator]
870
871 def remove_child(self, child: Any) -> None:
872 child_obj = child.get_object()
873 child = child_obj.indirect_reference
874
875 if NameObject("/Parent") not in child_obj:
876 raise ValueError("Removed child does not appear to be a tree item")
877 if child_obj[NameObject("/Parent")] != self:
878 raise ValueError("Removed child is not a member of this tree")
879
880 found = False
881 prev_ref = None
882 prev = None
883 cur_ref: Optional[Any] = self[NameObject("/First")]
884 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore[union-attr]
885 last_ref = self[NameObject("/Last")]
886 last = last_ref.get_object()
887 while cur is not None:
888 if cur == child_obj:
889 self._remove_node_from_tree(prev, prev_ref, cur, last)
890 found = True
891 break
892
893 # Go to the next node
894 prev_ref = cur_ref
895 prev = cur
896 if NameObject("/Next") in cur:
897 cur_ref = cur[NameObject("/Next")]
898 cur = cur_ref.get_object()
899 else:
900 cur_ref = None
901 cur = None
902
903 if not found:
904 raise ValueError("Removal couldn't find item in tree")
905
906 _reset_node_tree_relationship(child_obj)
907
908 def remove_from_tree(self) -> None:
909 """Remove the object from the tree it is in."""
910 if NameObject("/Parent") not in self:
911 raise ValueError("Removed child does not appear to be a tree item")
912 cast("TreeObject", self["/Parent"]).remove_child(self)
913
914 def empty_tree(self) -> None:
915 for child in self:
916 child_obj = child.get_object()
917 _reset_node_tree_relationship(child_obj)
918
919 if NameObject("/Count") in self:
920 del self[NameObject("/Count")]
921 if NameObject("/First") in self:
922 del self[NameObject("/First")]
923 if NameObject("/Last") in self:
924 del self[NameObject("/Last")]
925
926
927def _reset_node_tree_relationship(child_obj: Any) -> None:
928 """
929 Call this after a node has been removed from a tree.
930
931 This resets the nodes attributes in respect to that tree.
932
933 Args:
934 child_obj:
935
936 """
937 del child_obj[NameObject("/Parent")]
938 if NameObject("/Next") in child_obj:
939 del child_obj[NameObject("/Next")]
940 if NameObject("/Prev") in child_obj:
941 del child_obj[NameObject("/Prev")]
942
943
944class StreamObject(DictionaryObject):
945 def __init__(self) -> None:
946 self._data: bytes = b""
947 self.decoded_self: Optional[DecodedStreamObject] = None
948
949 def replicate(
950 self,
951 pdf_dest: PdfWriterProtocol,
952 ) -> "StreamObject":
953 d__ = cast(
954 "StreamObject",
955 self._reference_clone(self.__class__(), pdf_dest, False),
956 )
957 d__._data = self._data
958 try:
959 decoded_self = self.decoded_self
960 if decoded_self is None:
961 self.decoded_self = None
962 else:
963 self.decoded_self = cast(
964 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
965 )
966 except Exception:
967 pass
968 for k, v in self.items():
969 d__[k.replicate(pdf_dest)] = (
970 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
971 )
972 return d__
973
974 def _clone(
975 self,
976 src: DictionaryObject,
977 pdf_dest: PdfWriterProtocol,
978 force_duplicate: bool,
979 ignore_fields: Optional[Sequence[Union[str, int]]],
980 visited: set[tuple[int, int]],
981 ) -> None:
982 """
983 Update the object from src.
984
985 Args:
986 src:
987 pdf_dest:
988 force_duplicate:
989 ignore_fields:
990
991 """
992 self._data = cast("StreamObject", src)._data
993 try:
994 decoded_self = cast("StreamObject", src).decoded_self
995 if decoded_self is None:
996 self.decoded_self = None
997 else:
998 self.decoded_self = cast(
999 "DecodedStreamObject",
1000 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
1001 )
1002 except Exception:
1003 pass
1004 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1005
1006 def hash_bin(self) -> int:
1007 """
1008 Used to detect modified object.
1009
1010 Returns:
1011 Hash considering type and value.
1012
1013 """
1014 # Use _data to prevent errors on non-decoded streams.
1015 return hash((super().hash_bin(), self._data))
1016
1017 def get_data(self) -> bytes:
1018 return self._data
1019
1020 def set_data(self, data: bytes) -> None:
1021 self._data = data
1022
1023 def hash_value_data(self) -> bytes:
1024 data = super().hash_value_data()
1025 data += self.get_data()
1026 return data
1027
1028 def write_to_stream(
1029 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1030 ) -> None:
1031 if encryption_key is not None: # deprecated
1032 deprecation_no_replacement(
1033 "the encryption_key parameter of write_to_stream", "5.0.0"
1034 )
1035 self[NameObject(StreamAttributes.LENGTH)] = NumberObject(len(self._data))
1036 DictionaryObject.write_to_stream(self, stream)
1037 del self[StreamAttributes.LENGTH]
1038 stream.write(b"\nstream\n")
1039 stream.write(self._data)
1040 stream.write(b"\nendstream")
1041
1042 @staticmethod
1043 def initialize_from_dictionary(
1044 data: dict[str, Any]
1045 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1046 retval: Union[EncodedStreamObject, DecodedStreamObject]
1047 if StreamAttributes.FILTER in data:
1048 retval = EncodedStreamObject()
1049 else:
1050 retval = DecodedStreamObject()
1051 retval._data = data["__streamdata__"]
1052 del data["__streamdata__"]
1053 if StreamAttributes.LENGTH in data:
1054 del data[StreamAttributes.LENGTH]
1055 retval.update(data)
1056 return retval
1057
1058 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1059 from ..filters import FlateDecode # noqa: PLC0415
1060
1061 if StreamAttributes.FILTER in self:
1062 f = self[StreamAttributes.FILTER]
1063 if isinstance(f, ArrayObject):
1064 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1065 try:
1066 params = ArrayObject(
1067 [NullObject(), *self.get(StreamAttributes.DECODE_PARMS, ArrayObject())]
1068 )
1069 except TypeError:
1070 # case of error where the * operator is not working (not an array
1071 params = ArrayObject(
1072 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, ArrayObject())]
1073 )
1074 else:
1075 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1076 params = ArrayObject(
1077 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, NullObject())]
1078 )
1079 else:
1080 f = NameObject(FT.FLATE_DECODE)
1081 params = None
1082 retval = EncodedStreamObject()
1083 retval.update(self)
1084 retval[NameObject(StreamAttributes.FILTER)] = f
1085 if params is not None:
1086 retval[NameObject(StreamAttributes.DECODE_PARMS)] = params
1087 retval._data = FlateDecode.encode(self._data, level)
1088 return retval
1089
1090 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any:
1091 """
1092 Try to decode the stream object as an image
1093
1094 Args:
1095 pillow_parameters: parameters provided to Pillow Image.save() method,
1096 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save>
1097
1098 Returns:
1099 a PIL image if proper decoding has been found
1100 Raises:
1101 Exception: Errors during decoding will be reported.
1102 It is recommended to catch exceptions to prevent
1103 stops in your program.
1104
1105 """
1106 from ._image_xobject import _xobj_to_image # noqa: PLC0415
1107
1108 if self.get("/Subtype", "") != "/Image":
1109 try:
1110 logger_warning( # pragma: no cover
1111 "%(indirect_reference)s does not seem to be an Image",
1112 source=__name__,
1113 indirect_reference=self.indirect_reference,
1114 )
1115 except AttributeError:
1116 logger_warning( # pragma: no cover
1117 "%(obj)r object does not seem to be an Image",
1118 source=__name__,
1119 obj=self,
1120 )
1121 extension, _, img = _xobj_to_image(self, pillow_parameters)
1122 if extension is None:
1123 return None # pragma: no cover
1124 return img
1125
1126
1127class DecodedStreamObject(StreamObject):
1128 pass
1129
1130
1131class EncodedStreamObject(StreamObject):
1132 def __init__(self) -> None:
1133 self.decoded_self: Optional[DecodedStreamObject] = None
1134
1135 # This overrides the parent method
1136 def get_data(self) -> bytes:
1137 from ..filters import decode_stream_data # noqa: PLC0415
1138
1139 if self.decoded_self is not None:
1140 # Cached version of decoded object
1141 return self.decoded_self.get_data()
1142
1143 # Create decoded object
1144 decoded = DecodedStreamObject()
1145 decoded.set_data(decode_stream_data(self))
1146 for key, value in self.items():
1147 if key not in (StreamAttributes.LENGTH, StreamAttributes.FILTER, StreamAttributes.DECODE_PARMS):
1148 decoded[key] = value
1149 self.decoded_self = decoded
1150 return decoded.get_data()
1151
1152 # This overrides the parent method:
1153 def set_data(self, data: bytes) -> None:
1154 from ..filters import FlateDecode # noqa: PLC0415
1155
1156 if self.get(StreamAttributes.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1157 if not isinstance(data, bytes):
1158 raise TypeError("Data must be bytes")
1159 if self.decoded_self is None:
1160 self.get_data() # to create self.decoded_self
1161 assert self.decoded_self is not None, "mypy"
1162 self.decoded_self.set_data(data)
1163 super().set_data(FlateDecode.encode(data))
1164 else:
1165 raise PdfReadError(
1166 "Streams encoded with a filter different from FlateDecode are not supported"
1167 )
1168
1169
1170CONTENT_STREAM_ARRAY_MAX_LENGTH = 10_000
1171
1172
1173class ContentStream(DecodedStreamObject):
1174 """
1175 In order to be fast, this data structure can contain either:
1176
1177 * raw data in ._data
1178 * parsed stream operations in ._operations.
1179
1180 At any time, ContentStream object can either have both of those fields defined,
1181 or one field defined and the other set to None.
1182
1183 These fields are "rebuilt" lazily, when accessed:
1184
1185 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1186 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1187
1188 Conversely, these fields can be invalidated:
1189
1190 * when .set_data() is called, ._operations is set to None.
1191 * when .operations is set, ._data is set to None.
1192 """
1193
1194 def __init__(
1195 self,
1196 stream: Any,
1197 pdf: Any,
1198 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1199 ) -> None:
1200 self.pdf = pdf
1201 self._operations: list[tuple[Any, bytes]] = []
1202
1203 # stream may be a StreamObject or an ArrayObject containing
1204 # StreamObjects to be concatenated together.
1205 if stream is None:
1206 super().set_data(b"")
1207 else:
1208 stream = stream.get_object()
1209 if isinstance(stream, ArrayObject):
1210 from pypdf.filters import MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH # noqa: PLC0415
1211
1212 if (stream_length := len(stream)) > CONTENT_STREAM_ARRAY_MAX_LENGTH:
1213 raise LimitReachedError(
1214 f"Array-based stream has {stream_length} > {CONTENT_STREAM_ARRAY_MAX_LENGTH} elements."
1215 )
1216 data = bytearray()
1217 length = 0
1218 for s in stream:
1219 s_resolved = s.get_object()
1220 if isinstance(s_resolved, NullObject):
1221 continue
1222 if not isinstance(s_resolved, StreamObject):
1223 # No need to emit an exception here for now - the PDF structure
1224 # seems to already be broken beforehand in these cases.
1225 logger_warning(
1226 "Expected StreamObject, got %(type_name)s instead. Data might be wrong.",
1227 source=__name__,
1228 type_name=type(s_resolved).__name__,
1229 )
1230 else:
1231 new_data = s_resolved.get_data()
1232 length += len(new_data)
1233 if length > MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH:
1234 raise LimitReachedError(
1235 f"Array-based stream has at least {length} > "
1236 f"{MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH} output bytes."
1237 )
1238 data += new_data
1239 if len(data) == 0 or data[-1:] != b"\n":
1240 # There should be no direct need to check for a change of one byte.
1241 length += 1
1242 data += b"\n"
1243 super().set_data(bytes(data))
1244 else:
1245 stream_data = stream.get_data()
1246 assert stream_data is not None
1247 super().set_data(stream_data)
1248 self.forced_encoding = forced_encoding
1249
1250 def replicate(
1251 self,
1252 pdf_dest: PdfWriterProtocol,
1253 ) -> "ContentStream":
1254 d__ = cast(
1255 "ContentStream",
1256 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1257 )
1258 d__._data = self._data
1259 try:
1260 decoded_self = self.decoded_self
1261 if decoded_self is None:
1262 self.decoded_self = None
1263 else:
1264 self.decoded_self = cast(
1265 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1266 )
1267 except Exception:
1268 pass
1269 for k, v in self.items():
1270 d__[k.replicate(pdf_dest)] = (
1271 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1272 )
1273 return d__
1274 d__.set_data(self._data)
1275 d__.pdf = pdf_dest
1276 d__._operations = list(self._operations)
1277 d__.forced_encoding = self.forced_encoding
1278 return d__
1279
1280 def clone(
1281 self,
1282 pdf_dest: Any,
1283 force_duplicate: bool = False,
1284 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1285 ) -> "ContentStream":
1286 """
1287 Clone object into pdf_dest.
1288
1289 Args:
1290 pdf_dest:
1291 force_duplicate:
1292 ignore_fields:
1293
1294 Returns:
1295 The cloned ContentStream
1296
1297 """
1298 try:
1299 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore[union-attr]
1300 return self
1301 except Exception:
1302 pass
1303
1304 visited: set[tuple[int, int]] = set()
1305 d__ = cast(
1306 "ContentStream",
1307 self._reference_clone(
1308 self.__class__(None, None), pdf_dest, force_duplicate
1309 ),
1310 )
1311 if ignore_fields is None:
1312 ignore_fields = []
1313 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1314 return d__
1315
1316 def _clone(
1317 self,
1318 src: DictionaryObject,
1319 pdf_dest: PdfWriterProtocol,
1320 force_duplicate: bool,
1321 ignore_fields: Optional[Sequence[Union[str, int]]],
1322 visited: set[tuple[int, int]],
1323 ) -> None:
1324 """
1325 Update the object from src.
1326
1327 Args:
1328 src:
1329 pdf_dest:
1330 force_duplicate:
1331 ignore_fields:
1332
1333 """
1334 src_cs = cast("ContentStream", src)
1335 super().set_data(src_cs._data)
1336 self.pdf = pdf_dest
1337 self._operations = list(src_cs._operations)
1338 self.forced_encoding = src_cs.forced_encoding
1339 # no need to call DictionaryObjection or anything
1340 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1341
1342 def _parse_content_stream(self, stream: StreamType) -> None:
1343 # 7.8.2 Content Streams
1344 stream.seek(0, 0)
1345 operands: list[Union[int, str, PdfObject]] = []
1346 while True:
1347 peek = read_non_whitespace(stream)
1348 if peek in (b"", 0):
1349 break
1350 stream.seek(-1, 1)
1351 if peek.isalpha() or peek in (b"'", b'"'):
1352 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1353 if operator == b"BI":
1354 # begin inline image - a completely different parsing
1355 # mechanism is required, of course... thanks buddy...
1356 assert operands == []
1357 ii = self._read_inline_image(stream)
1358 self._operations.append((ii, b"INLINE IMAGE"))
1359 else:
1360 self._operations.append((operands, operator))
1361 operands = []
1362 elif peek == b"%":
1363 # If we encounter a comment in the content stream, we have to
1364 # handle it here. Typically, read_object will handle
1365 # encountering a comment -- but read_object assumes that
1366 # following the comment must be the object we're trying to
1367 # read. In this case, it could be an operator instead.
1368 while peek not in (b"\r", b"\n", b""):
1369 peek = stream.read(1)
1370 else:
1371 operands.append(read_object(stream, None, self.forced_encoding))
1372
1373 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]:
1374 # begin reading just after the "BI" - begin image
1375 # first read the dictionary of settings.
1376 settings = DictionaryObject()
1377 while True:
1378 tok = read_non_whitespace(stream)
1379 stream.seek(-1, 1)
1380 if tok == b"I":
1381 # "ID" - begin of image data
1382 break
1383 key = read_object(stream, self.pdf)
1384 tok = read_non_whitespace(stream)
1385 stream.seek(-1, 1)
1386 value = read_object(stream, self.pdf)
1387 settings[key] = value
1388 # left at beginning of ID
1389 tmp = stream.read(3)
1390 assert tmp[:2] == b"ID"
1391 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1392 savpos = stream.tell()
1393 if isinstance(filtr, list):
1394 filtr = filtr[0] # used forencoding
1395 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1396 data = extract_inline__ascii_hex_decode(stream)
1397 elif "A85" in filtr or "ASCII85Decode" in filtr:
1398 data = extract_inline__ascii85_decode(stream)
1399 elif "RL" in filtr or "RunLengthDecode" in filtr:
1400 data = extract_inline__run_length_decode(stream)
1401 elif "DCT" in filtr or "DCTDecode" in filtr:
1402 data = extract_inline__dct_decode(stream)
1403 elif filtr == "not set":
1404 cs = settings.get("/CS", "")
1405 if isinstance(cs, list):
1406 cs = cs[0]
1407 if "RGB" in cs:
1408 lcs = 3
1409 elif "CMYK" in cs:
1410 lcs = 4
1411 else:
1412 bits = settings.get(
1413 "/BPC",
1414 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1415 )
1416 if bits > 0:
1417 lcs = bits / 8.0
1418 else:
1419 data = extract_inline_default(stream)
1420 lcs = -1
1421 if lcs > 0:
1422 data = stream.read(
1423 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1424 )
1425 # Move to the `EI` if possible.
1426 ei = read_non_whitespace(stream)
1427 stream.seek(-1, 1)
1428 else:
1429 data = extract_inline_default(stream)
1430
1431 ei = stream.read(3)
1432 stream.seek(-1, 1)
1433 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1434 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1435 stream.seek(savpos, 0)
1436 data = extract_inline_default(stream)
1437 ei = stream.read(3)
1438 stream.seek(-1, 1)
1439 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1440 # Check the same condition again. This should never fail as
1441 # edge cases are covered by `extract_inline_default` above,
1442 # but check this ot make sure that we are behind the `EI` afterwards.
1443 raise PdfStreamError(
1444 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1445 )
1446 return {"settings": settings, "data": data}
1447
1448 # This overrides the parent method
1449 def get_data(self) -> bytes:
1450 if not self._data:
1451 new_data = BytesIO()
1452 for operands, operator in self._operations:
1453 if operator == b"INLINE IMAGE":
1454 new_data.write(b"BI")
1455 dict_text = BytesIO()
1456 operands["settings"].write_to_stream(dict_text)
1457 new_data.write(dict_text.getvalue()[2:-2])
1458 new_data.write(b"ID ")
1459 new_data.write(operands["data"])
1460 new_data.write(b"EI")
1461 else:
1462 for op in operands:
1463 op.write_to_stream(new_data)
1464 new_data.write(b" ")
1465 new_data.write(operator)
1466 new_data.write(b"\n")
1467 self._data = new_data.getvalue()
1468 return self._data
1469
1470 # This overrides the parent method
1471 def set_data(self, data: bytes) -> None:
1472 super().set_data(data)
1473 self._operations = []
1474
1475 @property
1476 def operations(self) -> list[tuple[Any, bytes]]:
1477 if not self._operations and self._data:
1478 self._parse_content_stream(BytesIO(self._data))
1479 self._data = b""
1480 return self._operations
1481
1482 @operations.setter
1483 def operations(self, operations: list[tuple[Any, bytes]]) -> None:
1484 self._operations = operations
1485 self._data = b""
1486
1487 def isolate_graphics_state(self) -> None:
1488 if self._operations:
1489 self._operations.insert(0, ([], b"q"))
1490 self._operations.append(([], b"Q"))
1491 elif self._data:
1492 self._data = b"q\n" + self._data + b"\nQ\n"
1493
1494 # This overrides the parent method
1495 def write_to_stream(
1496 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1497 ) -> None:
1498 if not self._data and self._operations:
1499 self.get_data() # this ensures ._data is rebuilt
1500 super().write_to_stream(stream, encryption_key)
1501
1502
1503def read_object(
1504 stream: StreamType,
1505 pdf: Optional[PdfReaderProtocol],
1506 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1507) -> Union[PdfObject, int, str, ContentStream]:
1508 tok = stream.read(1)
1509 stream.seek(-1, 1) # reset to start
1510 if tok == b"/":
1511 return NameObject.read_from_stream(stream, pdf)
1512 if tok == b"<":
1513 # hexadecimal string OR dictionary
1514 peek = stream.read(2)
1515 stream.seek(-2, 1) # reset to start
1516 if peek == b"<<":
1517 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1518 return read_hex_string_from_stream(stream, forced_encoding)
1519 if tok == b"[":
1520 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1521 if tok in (b"t", b"f"):
1522 return BooleanObject.read_from_stream(stream)
1523 if tok == b"(":
1524 return read_string_from_stream(stream, forced_encoding)
1525 if tok == b"e" and stream.read(6) == b"endobj":
1526 return NullObject()
1527 if tok == b"n":
1528 return NullObject.read_from_stream(stream)
1529 if tok == b"%":
1530 # comment
1531 skip_over_comment(stream)
1532 tok = read_non_whitespace(stream)
1533 stream.seek(-1, 1)
1534 return read_object(stream, pdf, forced_encoding)
1535 if tok in b"0123456789+-.":
1536 # number object OR indirect reference
1537 peek = stream.read(20)
1538 stream.seek(-len(peek), 1) # reset to start
1539 if IndirectPattern.match(peek) is not None:
1540 assert pdf is not None, "mypy"
1541 return IndirectObject.read_from_stream(stream, pdf)
1542 return NumberObject.read_from_stream(stream)
1543 pos = stream.tell()
1544 stream.seek(-20, 1)
1545 stream_extract = stream.read(80)
1546 stream.seek(pos)
1547 read_until_whitespace(stream)
1548 raise PdfReadError(
1549 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1550 )
1551
1552
1553class Field(TreeObject):
1554 """
1555 A class representing a field dictionary.
1556
1557 This class is accessed through
1558 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1559 """
1560
1561 def __init__(self, data: DictionaryObject) -> None:
1562 DictionaryObject.__init__(self)
1563 field_attributes = (
1564 FieldDictionaryAttributes.attributes()
1565 + CheckboxRadioButtonAttributes.attributes()
1566 )
1567 self.indirect_reference = data.indirect_reference
1568 for attr in field_attributes:
1569 try:
1570 self[NameObject(attr)] = data[attr]
1571 except KeyError:
1572 pass
1573 if isinstance(self.get("/V"), EncodedStreamObject):
1574 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1575 if isinstance(d, bytes):
1576 d_str = d.decode()
1577 elif d is None:
1578 d_str = ""
1579 else:
1580 raise Exception("Should never happen")
1581 self[NameObject("/V")] = TextStringObject(d_str)
1582
1583 # TABLE 8.69 Entries common to all field dictionaries
1584 @property
1585 def field_type(self) -> Optional[NameObject]:
1586 """Read-only property accessing the type of this field."""
1587 return self.get(FieldDictionaryAttributes.FT)
1588
1589 @property
1590 def parent(self) -> Optional[DictionaryObject]:
1591 """Read-only property accessing the parent of this field."""
1592 return self.get(FieldDictionaryAttributes.Parent)
1593
1594 @property
1595 def kids(self) -> Optional["ArrayObject"]:
1596 """Read-only property accessing the kids of this field."""
1597 return self.get(FieldDictionaryAttributes.Kids)
1598
1599 @property
1600 def name(self) -> Optional[str]:
1601 """Read-only property accessing the name of this field."""
1602 return self.get(FieldDictionaryAttributes.T)
1603
1604 @property
1605 def alternate_name(self) -> Optional[str]:
1606 """Read-only property accessing the alternate name of this field."""
1607 return self.get(FieldDictionaryAttributes.TU)
1608
1609 @property
1610 def mapping_name(self) -> Optional[str]:
1611 """
1612 Read-only property accessing the mapping name of this field.
1613
1614 This name is used by pypdf as a key in the dictionary returned by
1615 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1616 """
1617 return self.get(FieldDictionaryAttributes.TM)
1618
1619 @property
1620 def flags(self) -> Optional[int]:
1621 """
1622 Read-only property accessing the field flags, specifying various
1623 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1624 """
1625 return self.get(FieldDictionaryAttributes.Ff)
1626
1627 @property
1628 def value(self) -> Optional[Any]:
1629 """
1630 Read-only property accessing the value of this field.
1631
1632 Format varies based on field type.
1633 """
1634 return self.get(FieldDictionaryAttributes.V)
1635
1636 @property
1637 def default_value(self) -> Optional[Any]:
1638 """Read-only property accessing the default value of this field."""
1639 return self.get(FieldDictionaryAttributes.DV)
1640
1641 @property
1642 def additional_actions(self) -> Optional[DictionaryObject]:
1643 """
1644 Read-only property accessing the additional actions dictionary.
1645
1646 This dictionary defines the field's behavior in response to trigger
1647 events. See Section 8.5.2 of the PDF 1.7 reference.
1648 """
1649 return self.get(FieldDictionaryAttributes.AA)
1650
1651
1652class Destination(TreeObject):
1653 """
1654 A class representing a destination within a PDF file.
1655
1656 See section 12.3.2 of the PDF 2.0 reference.
1657
1658 Args:
1659 title: Title of this destination.
1660 page: Reference to the page of this destination. Should
1661 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1662 fit: How the destination is displayed.
1663
1664 Raises:
1665 PdfReadError: If destination type is invalid.
1666
1667 """
1668
1669 node: Optional[
1670 DictionaryObject
1671 ] = None # node provide access to the original Object
1672
1673 def __init__(
1674 self,
1675 title: Union[str, bytes],
1676 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1677 fit: Fit,
1678 ) -> None:
1679 self._filtered_children: list[Any] = [] # used in PdfWriter
1680
1681 typ = fit.fit_type
1682 args = fit.fit_args
1683
1684 DictionaryObject.__init__(self)
1685 self[NameObject("/Title")] = TextStringObject(title)
1686 self[NameObject("/Page")] = page
1687 self[NameObject("/Type")] = typ
1688
1689 # from table 8.2 of the PDF 1.7 reference.
1690 if typ == "/XYZ":
1691 if len(args) < 1: # left is missing : should never occur
1692 args.append(NumberObject(0.0))
1693 if len(args) < 2: # top is missing
1694 args.append(NumberObject(0.0))
1695 if len(args) < 3: # zoom is missing
1696 args.append(NumberObject(0.0))
1697 (
1698 self[NameObject(TA.LEFT)],
1699 self[NameObject(TA.TOP)],
1700 self[NameObject("/Zoom")],
1701 ) = args
1702 elif len(args) == 0:
1703 pass
1704 elif typ == TF.FIT_R:
1705 (
1706 self[NameObject(TA.LEFT)],
1707 self[NameObject(TA.BOTTOM)],
1708 self[NameObject(TA.RIGHT)],
1709 self[NameObject(TA.TOP)],
1710 ) = args
1711 elif typ in [TF.FIT_H, TF.FIT_BH]:
1712 try: # Prefer to be more robust not only to null parameters
1713 (self[NameObject(TA.TOP)],) = args
1714 except Exception:
1715 (self[NameObject(TA.TOP)],) = (NullObject(),)
1716 elif typ in [TF.FIT_V, TF.FIT_BV]:
1717 try: # Prefer to be more robust not only to null parameters
1718 (self[NameObject(TA.LEFT)],) = args
1719 except Exception:
1720 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1721 elif typ in [TF.FIT, TF.FIT_B]:
1722 pass
1723 else:
1724 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1725
1726 @property
1727 def dest_array(self) -> "ArrayObject":
1728 return ArrayObject(
1729 [self.raw_get("/Page"), self["/Type"]]
1730 + [
1731 self[x]
1732 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1733 if x in self
1734 ]
1735 )
1736
1737 def write_to_stream(
1738 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1739 ) -> None:
1740 if encryption_key is not None: # deprecated
1741 deprecation_no_replacement(
1742 "the encryption_key parameter of write_to_stream", "5.0.0"
1743 )
1744 stream.write(b"<<\n")
1745 key = NameObject("/D")
1746 key.write_to_stream(stream)
1747 stream.write(b" ")
1748 value = self.dest_array
1749 value.write_to_stream(stream)
1750
1751 key = NameObject("/S")
1752 key.write_to_stream(stream)
1753 stream.write(b" ")
1754 value_s = NameObject("/GoTo")
1755 value_s.write_to_stream(stream)
1756
1757 stream.write(b"\n")
1758 stream.write(b">>")
1759
1760 @property
1761 def title(self) -> Optional[str]:
1762 """Read-only property accessing the destination title."""
1763 return self.get("/Title")
1764
1765 @property
1766 def page(self) -> Optional[IndirectObject]:
1767 """Read-only property accessing the IndirectObject of the destination page."""
1768 return self.get("/Page")
1769
1770 @property
1771 def typ(self) -> Optional[str]:
1772 """Read-only property accessing the destination type."""
1773 return self.get("/Type")
1774
1775 @property
1776 def zoom(self) -> Optional[int]:
1777 """Read-only property accessing the zoom factor."""
1778 return self.get("/Zoom", None)
1779
1780 @property
1781 def left(self) -> Optional[FloatObject]:
1782 """Read-only property accessing the left horizontal coordinate."""
1783 return self.get("/Left", None)
1784
1785 @property
1786 def right(self) -> Optional[FloatObject]:
1787 """Read-only property accessing the right horizontal coordinate."""
1788 return self.get("/Right", None)
1789
1790 @property
1791 def top(self) -> Optional[FloatObject]:
1792 """Read-only property accessing the top vertical coordinate."""
1793 return self.get("/Top", None)
1794
1795 @property
1796 def bottom(self) -> Optional[FloatObject]:
1797 """Read-only property accessing the bottom vertical coordinate."""
1798 return self.get("/Bottom", None)
1799
1800 @property
1801 def color(self) -> Optional["ArrayObject"]:
1802 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1803 return cast(
1804 "ArrayObject",
1805 self.get("/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])),
1806 )
1807
1808 @property
1809 def font_format(self) -> Optional[OutlineFontFlag]:
1810 """
1811 Read-only property accessing the font type.
1812
1813 1=italic, 2=bold, 3=both
1814 """
1815 return OutlineFontFlag(self.get("/F", 0))
1816
1817 @property
1818 def outline_count(self) -> Optional[int]:
1819 """
1820 Read-only property accessing the outline count.
1821
1822 positive = expanded
1823 negative = collapsed
1824 absolute value = number of visible descendants at all levels
1825 """
1826 return self.get("/Count", None)