1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from collections.abc import Iterable, Sequence
36from io import BytesIO
37from math import ceil
38from typing import (
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
45
46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
47from .._utils import (
48 WHITESPACES,
49 BinaryStreamType,
50 StreamType,
51 deprecation_no_replacement,
52 logger_warning,
53 read_non_whitespace,
54 read_until_regex,
55 read_until_whitespace,
56 skip_over_comment,
57)
58from ..constants import (
59 CheckboxRadioButtonAttributes,
60 FieldDictionaryAttributes,
61 OutlineFontFlag,
62 StreamAttributes,
63)
64from ..constants import FilterTypes as FT
65from ..constants import TypArguments as TA
66from ..constants import TypFitArguments as TF
67from ..errors import STREAM_TRUNCATED_PREMATURELY, LimitReachedError, PdfReadError, PdfStreamError
68from ._base import (
69 BooleanObject,
70 ByteStringObject,
71 FloatObject,
72 IndirectObject,
73 NameObject,
74 NullObject,
75 NumberObject,
76 PdfObject,
77 TextStringObject,
78 is_null_or_none,
79)
80from ._fit import Fit
81from ._image_inline import (
82 extract_inline__ascii85_decode,
83 extract_inline__ascii_hex_decode,
84 extract_inline__dct_decode,
85 extract_inline__run_length_decode,
86 extract_inline_default,
87)
88from ._utils import read_hex_string_from_stream, read_string_from_stream
89
90if sys.version_info >= (3, 11):
91 from typing import Self
92else:
93 from typing_extensions import Self
94
95logger = logging.getLogger(__name__)
96
97IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
98
99
100class ArrayObject(list[Any], PdfObject):
101 def replicate(
102 self,
103 pdf_dest: PdfWriterProtocol,
104 ) -> "ArrayObject":
105 arr = cast(
106 "ArrayObject",
107 self._reference_clone(ArrayObject(), pdf_dest, False),
108 )
109 for data in self:
110 if hasattr(data, "replicate"):
111 arr.append(data.replicate(pdf_dest))
112 else:
113 arr.append(data)
114 return arr
115
116 def clone(
117 self,
118 pdf_dest: PdfWriterProtocol,
119 force_duplicate: bool = False,
120 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
121 ) -> "ArrayObject":
122 """Clone object into pdf_dest."""
123 try:
124 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
125 return self
126 except Exception:
127 pass
128 arr = cast(
129 "ArrayObject",
130 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate=True),
131 )
132 for data in self:
133 if isinstance(data, StreamObject):
134 dup = data._reference_clone(
135 data.clone(pdf_dest, force_duplicate, ignore_fields),
136 pdf_dest,
137 force_duplicate,
138 )
139 arr.append(dup.indirect_reference)
140 elif isinstance(data, IndirectObject) and isinstance(resolved := data.get_object(), StreamObject):
141 dup = data._reference_clone(
142 resolved.clone(pdf_dest, force_duplicate=True, ignore_fields=ignore_fields),
143 pdf_dest,
144 force_duplicate,
145 )
146 arr.append(dup.indirect_reference)
147 elif hasattr(data, "clone"):
148 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
149 else:
150 arr.append(data)
151 return arr
152
153 def hash_bin(self) -> int:
154 """
155 Used to detect modified object.
156
157 Returns:
158 Hash considering type and value.
159
160 """
161 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
162
163 def items(self) -> Iterable[Any]:
164 """Emulate DictionaryObject.items for a list (index, object)."""
165 return enumerate(self)
166
167 def _to_lst(self, lst: Any) -> list[Any]:
168 # Convert to list, internal
169 result: list[Any]
170 if isinstance(lst, (list, tuple, set)):
171 result = list(lst)
172 elif isinstance(lst, PdfObject):
173 result = [lst]
174 elif isinstance(lst, str):
175 if lst[0] == "/":
176 result = [NameObject(lst)]
177 else:
178 result = [TextStringObject(lst)]
179 elif isinstance(lst, bytes):
180 result = [ByteStringObject(lst)]
181 else: # for numbers,...
182 result = [lst]
183 return result
184
185 def __add__(self, lst: Any) -> "ArrayObject":
186 """
187 Allow extension by adding list or add one element only
188
189 Args:
190 lst: any list, tuples are extended the list.
191 other types(numbers,...) will be appended.
192 if str is passed it will be converted into TextStringObject
193 or NameObject (if starting with "/")
194 if bytes is passed it will be converted into ByteStringObject
195
196 Returns:
197 ArrayObject with all elements
198
199 """
200 temp = ArrayObject(self)
201 temp.extend(self._to_lst(lst))
202 return temp
203
204 def __iadd__(self, lst: Any) -> Self:
205 """
206 Allow extension by adding list or add one element only
207
208 Args:
209 lst: any list, tuples are extended the list.
210 other types(numbers,...) will be appended.
211 if str is passed it will be converted into TextStringObject
212 or NameObject (if starting with "/")
213 if bytes is passed it will be converted into ByteStringObject
214
215 """
216 self.extend(self._to_lst(lst))
217 return self
218
219 def __isub__(self, lst: Any) -> Self:
220 """Allow to remove items"""
221 for x in self._to_lst(lst):
222 try:
223 index = self.index(x)
224 del self[index]
225 except ValueError:
226 pass
227 return self
228
229 def write_to_stream(
230 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
231 ) -> None:
232 if encryption_key is not None: # deprecated
233 deprecation_no_replacement(
234 "the encryption_key parameter of write_to_stream", "5.0.0"
235 )
236 stream.write(b"[")
237 for data in self:
238 stream.write(b" ")
239 data.write_to_stream(stream)
240 stream.write(b" ]")
241
242 @staticmethod
243 def read_from_stream(
244 stream: StreamType,
245 pdf: Optional[PdfReaderProtocol],
246 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
247 ) -> "ArrayObject":
248 arr = ArrayObject()
249 tmp = stream.read(1)
250 if tmp != b"[":
251 raise PdfReadError("Could not read array")
252 while True:
253 # skip leading whitespace
254 tok = stream.read(1)
255 while tok.isspace():
256 tok = stream.read(1)
257 if tok == b"":
258 break
259 if tok == b"%":
260 stream.seek(-1, 1)
261 skip_over_comment(stream)
262 continue
263 stream.seek(-1, 1)
264 # check for array ending
265 peek_ahead = stream.read(1)
266 if peek_ahead == b"]":
267 break
268 stream.seek(-1, 1)
269 # read and append object
270 arr.append(read_object(stream, pdf, forced_encoding))
271 return arr
272
273
274class DictionaryObject(dict[Any, Any], PdfObject):
275 def replicate(
276 self,
277 pdf_dest: PdfWriterProtocol,
278 ) -> "DictionaryObject":
279 d__ = cast(
280 "DictionaryObject",
281 self._reference_clone(self.__class__(), pdf_dest, False),
282 )
283 for k, v in self.items():
284 d__[k.replicate(pdf_dest)] = (
285 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
286 )
287 return d__
288
289 def clone(
290 self,
291 pdf_dest: PdfWriterProtocol,
292 force_duplicate: bool = False,
293 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
294 ) -> "DictionaryObject":
295 """Clone object into pdf_dest."""
296 try:
297 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
298 return self
299 except Exception:
300 pass
301
302 visited: set[tuple[int, int]] = set() # (idnum, generation)
303 d__ = cast(
304 "DictionaryObject",
305 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
306 )
307 if ignore_fields is None:
308 ignore_fields = []
309 if len(d__.keys()) == 0:
310 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
311 return d__
312
313 def _clone(
314 self,
315 src: "DictionaryObject",
316 pdf_dest: PdfWriterProtocol,
317 force_duplicate: bool,
318 ignore_fields: Optional[Sequence[Union[str, int]]],
319 visited: set[tuple[int, int]], # (idnum, generation)
320 ) -> None:
321 """
322 Update the object from src.
323
324 Args:
325 src: "DictionaryObject":
326 pdf_dest:
327 force_duplicate:
328 ignore_fields:
329
330 """
331 # First we remove the ignore_fields
332 # that are for a limited number of levels
333 assert ignore_fields is not None
334 ignore_fields = list(ignore_fields)
335 x = 0
336 while x < len(ignore_fields):
337 if isinstance(ignore_fields[x], int):
338 if cast(int, ignore_fields[x]) <= 0:
339 del ignore_fields[x]
340 del ignore_fields[x]
341 continue
342 ignore_fields[x] -= 1 # type:ignore
343 x += 1
344 # Check if this is a chain list, we need to loop to prevent recur
345 if any(
346 field not in ignore_fields
347 and field in src
348 and isinstance(src.raw_get(field), IndirectObject)
349 and isinstance(src[field], DictionaryObject)
350 and (
351 src.get("/Type", None) is None
352 or cast(DictionaryObject, src[field]).get("/Type", None) is None
353 or src.get("/Type", None)
354 == cast(DictionaryObject, src[field]).get("/Type", None)
355 )
356 for field in ["/Next", "/Prev", "/N", "/V"]
357 ):
358 ignore_fields = list(ignore_fields)
359 for lst in (("/Next", "/Prev"), ("/N", "/V")):
360 for k in lst:
361 objs = []
362 if (
363 k in src
364 and k not in self
365 and isinstance(src.raw_get(k), IndirectObject)
366 and isinstance(src[k], DictionaryObject)
367 # If need to go further the idea is to check
368 # that the types are the same
369 and (
370 src.get("/Type", None) is None
371 or cast(DictionaryObject, src[k]).get("/Type", None) is None
372 or src.get("/Type", None)
373 == cast(DictionaryObject, src[k]).get("/Type", None)
374 )
375 ):
376 cur_obj: Optional[DictionaryObject] = cast(
377 "DictionaryObject", src[k]
378 )
379 prev_obj: Optional[DictionaryObject] = self
380 while cur_obj is not None:
381 clon = cast(
382 "DictionaryObject",
383 cur_obj._reference_clone(
384 cur_obj.__class__(), pdf_dest, force_duplicate
385 ),
386 )
387 # Check to see if we've previously processed our item
388 if clon.indirect_reference is not None:
389 idnum = clon.indirect_reference.idnum
390 generation = clon.indirect_reference.generation
391 if (idnum, generation) in visited:
392 cur_obj = None
393 break
394 visited.add((idnum, generation))
395 objs.append((cur_obj, clon))
396 assert prev_obj is not None
397 prev_obj[NameObject(k)] = clon.indirect_reference
398 prev_obj = clon
399 try:
400 if cur_obj == src:
401 cur_obj = None
402 else:
403 cur_obj = cast("DictionaryObject", cur_obj[k])
404 except Exception:
405 cur_obj = None
406 for s, c in objs:
407 c._clone(
408 s, pdf_dest, force_duplicate, ignore_fields, visited
409 )
410
411 for k, v in src.items():
412 if k not in ignore_fields:
413 if isinstance(v, StreamObject):
414 if not hasattr(v, "indirect_reference"):
415 v.indirect_reference = None
416 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
417 assert vv.indirect_reference is not None
418 self[k.clone(pdf_dest)] = vv.indirect_reference
419 elif k not in self:
420 self[NameObject(k)] = (
421 v.clone(pdf_dest, force_duplicate, ignore_fields)
422 if hasattr(v, "clone")
423 else v
424 )
425
426 def hash_bin(self) -> int:
427 """
428 Used to detect modified object.
429
430 Returns:
431 Hash considering type and value.
432
433 """
434 return hash(
435 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
436 )
437
438 def raw_get(self, key: Any) -> Any:
439 return dict.__getitem__(self, key)
440
441 def get_inherited(self, key: str, default: Any = None) -> Any:
442 """
443 Returns the value of a key or from the parent if not found.
444 If not found returns default.
445
446 Args:
447 key: string identifying the field to return
448
449 default: default value to return
450
451 Returns:
452 Current key or inherited one, otherwise default value.
453
454 """
455 if key in self:
456 return self[key]
457 try:
458 if "/Parent" not in self:
459 return default
460 raise KeyError("Not present")
461 except KeyError:
462 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited(
463 key, default
464 )
465
466 def __setitem__(self, key: Any, value: Any) -> Any:
467 if not isinstance(key, PdfObject):
468 raise ValueError("Key must be a PdfObject")
469 if not isinstance(value, PdfObject):
470 raise ValueError("Value must be a PdfObject")
471 return dict.__setitem__(self, key, value)
472
473 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
474 if not isinstance(key, PdfObject):
475 raise ValueError("Key must be a PdfObject")
476 if not isinstance(value, PdfObject):
477 raise ValueError("Value must be a PdfObject")
478 return dict.setdefault(self, key, value)
479
480 def __getitem__(self, key: Any) -> PdfObject:
481 return cast(PdfObject, dict.__getitem__(self, key).get_object())
482
483 @property
484 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
485 """
486 Retrieve XMP (Extensible Metadata Platform) data relevant to this
487 object, if available.
488
489 See Table 347 — Additional entries in a metadata stream dictionary.
490
491 Returns:
492 Returns a :class:`~pypdf.xmp.XmpInformation` instance
493 that can be used to access XMP metadata from the document. Can also
494 return None if no metadata was found on the document root.
495
496 """
497 from ..xmp import XmpInformation # noqa: PLC0415
498
499 metadata = self.get("/Metadata", None)
500 if is_null_or_none(metadata):
501 return None
502 assert metadata is not None, "mypy"
503 metadata = metadata.get_object()
504 return XmpInformation(metadata)
505
506 def write_to_stream(
507 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
508 ) -> None:
509 if encryption_key is not None: # deprecated
510 deprecation_no_replacement(
511 "the encryption_key parameter of write_to_stream", "5.0.0"
512 )
513 stream.write(b"<<\n")
514 for key, value in self.items():
515 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
516 continue
517 key.write_to_stream(stream, encryption_key)
518 stream.write(b" ")
519 value.write_to_stream(stream)
520 stream.write(b"\n")
521 stream.write(b">>")
522
523 @classmethod
524 def _get_next_object_position(
525 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol
526 ) -> int:
527 out = position_end
528 for generation in generations:
529 location = pdf.xref[generation]
530 values = [x for x in location.values() if position_before < x <= position_end]
531 if values:
532 out = min(out, *values)
533 return out
534
535 @classmethod
536 def _read_unsized_from_stream(
537 cls, stream: BinaryStreamType, pdf: PdfReaderProtocol
538 ) -> bytes:
539 object_position = cls._get_next_object_position(
540 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf
541 ) - 1
542 current_position = stream.tell()
543 # Read until the next object position.
544 read_value = stream.read(object_position - stream.tell())
545 endstream_position = read_value.find(b"endstream")
546 if endstream_position < 0:
547 raise PdfReadError(
548 f"Unable to find 'endstream' marker for obj starting at {current_position}."
549 )
550 # 9 = len(b"endstream")
551 stream.seek(current_position + endstream_position + 9)
552 return read_value[: endstream_position - 1]
553
554 @staticmethod
555 def read_from_stream(
556 stream: StreamType,
557 pdf: Optional[PdfReaderProtocol],
558 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
559 ) -> "DictionaryObject":
560 tmp = stream.read(2)
561 if tmp != b"<<":
562 raise PdfReadError(
563 f"Dictionary read error at byte {hex(stream.tell())}: "
564 "stream must begin with '<<'"
565 )
566 data: dict[Any, Any] = {}
567 while True:
568 tok = read_non_whitespace(stream)
569 if tok == b"\x00":
570 continue
571 if tok == b"%":
572 stream.seek(-1, 1)
573 skip_over_comment(stream)
574 continue
575 if not tok:
576 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
577
578 if tok == b">":
579 stream.read(1)
580 break
581 stream.seek(-1, 1)
582 try:
583 try:
584 key = read_object(stream, pdf)
585 if isinstance(key, NullObject):
586 break
587 if not isinstance(key, NameObject):
588 raise PdfReadError(
589 f"Expecting a NameObject for key but found {key!r}"
590 )
591 except PdfReadError as exc:
592 if pdf is not None and pdf.strict:
593 raise
594 logger_warning(exc.__repr__(), __name__)
595 continue
596 tok = read_non_whitespace(stream)
597 stream.seek(-1, 1)
598 value = read_object(stream, pdf, forced_encoding)
599 except (RecursionError, LimitReachedError) as exc:
600 raise PdfReadError(exc.__repr__())
601 except Exception as exc:
602 if pdf is not None and pdf.strict:
603 raise PdfReadError(exc.__repr__())
604 logger_warning(exc.__repr__(), __name__)
605 retval = DictionaryObject()
606 retval.update(data)
607 return retval # return partial data
608
609 if not data.get(key):
610 data[key] = value
611 else:
612 # multiple definitions of key not permitted
613 msg = (
614 f"Multiple definitions in dictionary at byte "
615 f"{hex(stream.tell())} for key {key}"
616 )
617 if pdf is not None and pdf.strict:
618 raise PdfReadError(msg)
619 logger_warning(msg, __name__)
620
621 pos = stream.tell()
622 s = read_non_whitespace(stream)
623 if s == b"s" and stream.read(5) == b"tream":
624 eol = stream.read(1)
625 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
626 # patch provided by Danial Sandler
627 while eol == b" ":
628 eol = stream.read(1)
629 if eol not in (b"\n", b"\r"):
630 raise PdfStreamError("Stream data must be followed by a newline")
631 if eol == b"\r" and stream.read(1) != b"\n":
632 stream.seek(-1, 1)
633 # this is a stream object, not a dictionary
634 if StreamAttributes.LENGTH not in data:
635 if pdf is not None and pdf.strict:
636 raise PdfStreamError("Stream length not defined")
637 logger_warning(
638 f"Stream length not defined @pos={stream.tell()}", __name__
639 )
640 data[NameObject(StreamAttributes.LENGTH)] = NumberObject(-1)
641 length = data[StreamAttributes.LENGTH]
642 if isinstance(length, IndirectObject):
643 t = stream.tell()
644 assert pdf is not None, "mypy"
645 length = pdf.get_object(length)
646 stream.seek(t, 0)
647 if length is None: # if the PDF is damaged
648 length = -1
649 pstart = stream.tell()
650 if length >= 0:
651 from ..filters import MAX_DECLARED_STREAM_LENGTH # noqa: PLC0415
652 if length > MAX_DECLARED_STREAM_LENGTH:
653 raise LimitReachedError(f"Declared stream length of {length} exceeds maximum allowed length.")
654
655 data["__streamdata__"] = stream.read(length)
656 else:
657 data["__streamdata__"] = read_until_regex(
658 stream, re.compile(b"endstream")
659 )
660 e = read_non_whitespace(stream)
661 ndstream = stream.read(8)
662 if (e + ndstream) != b"endstream":
663 # the odd PDF file has a length that is too long, so
664 # we need to read backwards to find the "endstream" ending.
665 # ReportLab (unknown version) generates files with this bug,
666 # and Python users into PDF files tend to be our audience.
667 # we need to do this to correct the streamdata and chop off
668 # an extra character.
669 pos = stream.tell()
670 stream.seek(-10, 1)
671 end = stream.read(9)
672 if end == b"endstream":
673 # we found it by looking back one character further.
674 data["__streamdata__"] = data["__streamdata__"][:-1]
675 elif pdf is not None and not pdf.strict:
676 stream.seek(pstart, 0)
677 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf)
678 pos = stream.tell()
679 else:
680 stream.seek(pos, 0)
681 raise PdfReadError(
682 "Unable to find 'endstream' marker after stream at byte "
683 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
684 )
685 else:
686 stream.seek(pos, 0)
687 if "__streamdata__" in data:
688 return StreamObject.initialize_from_dictionary(data)
689 retval = DictionaryObject()
690 retval.update(data)
691 return retval
692
693
694class TreeObject(DictionaryObject):
695 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
696 DictionaryObject.__init__(self)
697 if dct:
698 self.update(dct)
699
700 def has_children(self) -> bool:
701 return "/First" in self
702
703 def __iter__(self) -> Any:
704 return self.children()
705
706 def children(self) -> Iterable[Any]:
707 if not self.has_children():
708 return
709
710 child_ref = self[NameObject("/First")]
711 last = self[NameObject("/Last")]
712 child = child_ref.get_object()
713 visited: set[int] = set()
714 while True:
715 child_id = id(child)
716 if child_id in visited:
717 logger_warning(f"Detected cycle in outline structure for {child}", __name__)
718 return
719 visited.add(child_id)
720
721 yield child
722
723 if child == last:
724 return
725 child_ref = child.get(NameObject("/Next")) # type: ignore
726 if is_null_or_none(child_ref):
727 return
728 child = child_ref.get_object()
729
730 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
731 self.insert_child(child, None, pdf)
732
733 def inc_parent_counter_default(
734 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
735 ) -> None:
736 if is_null_or_none(parent):
737 return
738 assert parent is not None, "mypy"
739 parent = cast("TreeObject", parent.get_object())
740 if "/Count" in parent:
741 parent[NameObject("/Count")] = NumberObject(
742 max(0, cast(int, parent[NameObject("/Count")]) + n)
743 )
744 self.inc_parent_counter_default(parent.get("/Parent", None), n)
745
746 def inc_parent_counter_outline(
747 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
748 ) -> None:
749 if is_null_or_none(parent):
750 return
751 assert parent is not None, "mypy"
752 parent = cast("TreeObject", parent.get_object())
753 # BooleanObject requires comparison with == not is
754 opn = parent.get("/%is_open%", True) == True # noqa: E712
755 c = cast(int, parent.get("/Count", 0))
756 if c < 0:
757 c = abs(c)
758 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
759 if not opn:
760 return
761 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
762
763 def insert_child(
764 self,
765 child: Any,
766 before: Any,
767 pdf: PdfWriterProtocol,
768 inc_parent_counter: Optional[Callable[..., Any]] = None,
769 ) -> IndirectObject:
770 if inc_parent_counter is None:
771 inc_parent_counter = self.inc_parent_counter_default
772 child_obj = child.get_object()
773 assert child.indirect_reference is not None, "mypy"
774 child_reference: IndirectObject = child.indirect_reference
775
776 prev: Optional[DictionaryObject]
777 if "/First" not in self: # no child yet
778 self[NameObject("/First")] = child_reference
779 self[NameObject("/Count")] = NumberObject(0)
780 self[NameObject("/Last")] = child_reference
781 child_obj[NameObject("/Parent")] = self.indirect_reference
782 inc_parent_counter(self, child_obj.get("/Count", 1))
783 if "/Next" in child_obj:
784 del child_obj["/Next"]
785 if "/Prev" in child_obj:
786 del child_obj["/Prev"]
787 return child_reference
788 prev = cast("DictionaryObject", self["/Last"])
789
790 while prev.indirect_reference != before:
791 if "/Next" in prev:
792 prev = cast("TreeObject", prev["/Next"])
793 else: # append at the end
794 prev[NameObject("/Next")] = cast("TreeObject", child_reference)
795 child_obj[NameObject("/Prev")] = prev.indirect_reference
796 child_obj[NameObject("/Parent")] = self.indirect_reference
797 if "/Next" in child_obj:
798 del child_obj["/Next"]
799 self[NameObject("/Last")] = child_reference
800 inc_parent_counter(self, child_obj.get("/Count", 1))
801 return child_reference
802 try: # insert as first or in the middle
803 assert isinstance(prev["/Prev"], DictionaryObject)
804 prev["/Prev"][NameObject("/Next")] = child_reference
805 child_obj[NameObject("/Prev")] = prev["/Prev"]
806 except Exception: # it means we are inserting in first position
807 del child_obj["/Next"]
808 child_obj[NameObject("/Next")] = prev
809 prev[NameObject("/Prev")] = child_reference
810 child_obj[NameObject("/Parent")] = self.indirect_reference
811 inc_parent_counter(self, child_obj.get("/Count", 1))
812 return child_reference
813
814 def _remove_node_from_tree(
815 self, prev: Any, prev_ref: Any, cur: Any, last: Any
816 ) -> None:
817 """
818 Adjust the pointers of the linked list and tree node count.
819
820 Args:
821 prev:
822 prev_ref:
823 cur:
824 last:
825
826 """
827 next_ref = cur.get(NameObject("/Next"), None)
828 if prev is None:
829 if next_ref:
830 # Removing first tree node
831 next_obj = next_ref.get_object()
832 del next_obj[NameObject("/Prev")]
833 self[NameObject("/First")] = next_ref
834 self[NameObject("/Count")] = NumberObject(
835 self[NameObject("/Count")] - 1 # type: ignore
836 )
837
838 else:
839 # Removing only tree node
840 self[NameObject("/Count")] = NumberObject(0)
841 del self[NameObject("/First")]
842 if NameObject("/Last") in self:
843 del self[NameObject("/Last")]
844 else:
845 if next_ref:
846 # Removing middle tree node
847 next_obj = next_ref.get_object()
848 next_obj[NameObject("/Prev")] = prev_ref
849 prev[NameObject("/Next")] = next_ref
850 else:
851 # Removing last tree node
852 assert cur == last
853 del prev[NameObject("/Next")]
854 self[NameObject("/Last")] = prev_ref
855 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore
856
857 def remove_child(self, child: Any) -> None:
858 child_obj = child.get_object()
859 child = child_obj.indirect_reference
860
861 if NameObject("/Parent") not in child_obj:
862 raise ValueError("Removed child does not appear to be a tree item")
863 if child_obj[NameObject("/Parent")] != self:
864 raise ValueError("Removed child is not a member of this tree")
865
866 found = False
867 prev_ref = None
868 prev = None
869 cur_ref: Optional[Any] = self[NameObject("/First")]
870 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore
871 last_ref = self[NameObject("/Last")]
872 last = last_ref.get_object()
873 while cur is not None:
874 if cur == child_obj:
875 self._remove_node_from_tree(prev, prev_ref, cur, last)
876 found = True
877 break
878
879 # Go to the next node
880 prev_ref = cur_ref
881 prev = cur
882 if NameObject("/Next") in cur:
883 cur_ref = cur[NameObject("/Next")]
884 cur = cur_ref.get_object()
885 else:
886 cur_ref = None
887 cur = None
888
889 if not found:
890 raise ValueError("Removal couldn't find item in tree")
891
892 _reset_node_tree_relationship(child_obj)
893
894 def remove_from_tree(self) -> None:
895 """Remove the object from the tree it is in."""
896 if NameObject("/Parent") not in self:
897 raise ValueError("Removed child does not appear to be a tree item")
898 cast("TreeObject", self["/Parent"]).remove_child(self)
899
900 def empty_tree(self) -> None:
901 for child in self:
902 child_obj = child.get_object()
903 _reset_node_tree_relationship(child_obj)
904
905 if NameObject("/Count") in self:
906 del self[NameObject("/Count")]
907 if NameObject("/First") in self:
908 del self[NameObject("/First")]
909 if NameObject("/Last") in self:
910 del self[NameObject("/Last")]
911
912
913def _reset_node_tree_relationship(child_obj: Any) -> None:
914 """
915 Call this after a node has been removed from a tree.
916
917 This resets the nodes attributes in respect to that tree.
918
919 Args:
920 child_obj:
921
922 """
923 del child_obj[NameObject("/Parent")]
924 if NameObject("/Next") in child_obj:
925 del child_obj[NameObject("/Next")]
926 if NameObject("/Prev") in child_obj:
927 del child_obj[NameObject("/Prev")]
928
929
930class StreamObject(DictionaryObject):
931 def __init__(self) -> None:
932 self._data: bytes = b""
933 self.decoded_self: Optional[DecodedStreamObject] = None
934
935 def replicate(
936 self,
937 pdf_dest: PdfWriterProtocol,
938 ) -> "StreamObject":
939 d__ = cast(
940 "StreamObject",
941 self._reference_clone(self.__class__(), pdf_dest, False),
942 )
943 d__._data = self._data
944 try:
945 decoded_self = self.decoded_self
946 if decoded_self is None:
947 self.decoded_self = None
948 else:
949 self.decoded_self = cast(
950 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
951 )
952 except Exception:
953 pass
954 for k, v in self.items():
955 d__[k.replicate(pdf_dest)] = (
956 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
957 )
958 return d__
959
960 def _clone(
961 self,
962 src: DictionaryObject,
963 pdf_dest: PdfWriterProtocol,
964 force_duplicate: bool,
965 ignore_fields: Optional[Sequence[Union[str, int]]],
966 visited: set[tuple[int, int]],
967 ) -> None:
968 """
969 Update the object from src.
970
971 Args:
972 src:
973 pdf_dest:
974 force_duplicate:
975 ignore_fields:
976
977 """
978 self._data = cast("StreamObject", src)._data
979 try:
980 decoded_self = cast("StreamObject", src).decoded_self
981 if decoded_self is None:
982 self.decoded_self = None
983 else:
984 self.decoded_self = cast(
985 "DecodedStreamObject",
986 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
987 )
988 except Exception:
989 pass
990 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
991
992 def hash_bin(self) -> int:
993 """
994 Used to detect modified object.
995
996 Returns:
997 Hash considering type and value.
998
999 """
1000 # Use _data to prevent errors on non-decoded streams.
1001 return hash((super().hash_bin(), self._data))
1002
1003 def get_data(self) -> bytes:
1004 return self._data
1005
1006 def set_data(self, data: bytes) -> None:
1007 self._data = data
1008
1009 def hash_value_data(self) -> bytes:
1010 data = super().hash_value_data()
1011 data += self.get_data()
1012 return data
1013
1014 def write_to_stream(
1015 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1016 ) -> None:
1017 if encryption_key is not None: # deprecated
1018 deprecation_no_replacement(
1019 "the encryption_key parameter of write_to_stream", "5.0.0"
1020 )
1021 self[NameObject(StreamAttributes.LENGTH)] = NumberObject(len(self._data))
1022 DictionaryObject.write_to_stream(self, stream)
1023 del self[StreamAttributes.LENGTH]
1024 stream.write(b"\nstream\n")
1025 stream.write(self._data)
1026 stream.write(b"\nendstream")
1027
1028 @staticmethod
1029 def initialize_from_dictionary(
1030 data: dict[str, Any]
1031 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1032 retval: Union[EncodedStreamObject, DecodedStreamObject]
1033 if StreamAttributes.FILTER in data:
1034 retval = EncodedStreamObject()
1035 else:
1036 retval = DecodedStreamObject()
1037 retval._data = data["__streamdata__"]
1038 del data["__streamdata__"]
1039 if StreamAttributes.LENGTH in data:
1040 del data[StreamAttributes.LENGTH]
1041 retval.update(data)
1042 return retval
1043
1044 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1045 from ..filters import FlateDecode # noqa: PLC0415
1046
1047 if StreamAttributes.FILTER in self:
1048 f = self[StreamAttributes.FILTER]
1049 if isinstance(f, ArrayObject):
1050 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1051 try:
1052 params = ArrayObject(
1053 [NullObject(), *self.get(StreamAttributes.DECODE_PARMS, ArrayObject())]
1054 )
1055 except TypeError:
1056 # case of error where the * operator is not working (not an array
1057 params = ArrayObject(
1058 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, ArrayObject())]
1059 )
1060 else:
1061 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1062 params = ArrayObject(
1063 [NullObject(), self.get(StreamAttributes.DECODE_PARMS, NullObject())]
1064 )
1065 else:
1066 f = NameObject(FT.FLATE_DECODE)
1067 params = None
1068 retval = EncodedStreamObject()
1069 retval.update(self)
1070 retval[NameObject(StreamAttributes.FILTER)] = f
1071 if params is not None:
1072 retval[NameObject(StreamAttributes.DECODE_PARMS)] = params
1073 retval._data = FlateDecode.encode(self._data, level)
1074 return retval
1075
1076 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any:
1077 """
1078 Try to decode the stream object as an image
1079
1080 Args:
1081 pillow_parameters: parameters provided to Pillow Image.save() method,
1082 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save>
1083
1084 Returns:
1085 a PIL image if proper decoding has been found
1086 Raises:
1087 Exception: Errors during decoding will be reported.
1088 It is recommended to catch exceptions to prevent
1089 stops in your program.
1090
1091 """
1092 from ._image_xobject import _xobj_to_image # noqa: PLC0415
1093
1094 if self.get("/Subtype", "") != "/Image":
1095 try:
1096 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover
1097 except AttributeError:
1098 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover
1099 logger_warning(msg, __name__)
1100 extension, _, img = _xobj_to_image(self, pillow_parameters)
1101 if extension is None:
1102 return None # pragma: no cover
1103 return img
1104
1105
1106class DecodedStreamObject(StreamObject):
1107 pass
1108
1109
1110class EncodedStreamObject(StreamObject):
1111 def __init__(self) -> None:
1112 self.decoded_self: Optional[DecodedStreamObject] = None
1113
1114 # This overrides the parent method
1115 def get_data(self) -> bytes:
1116 from ..filters import decode_stream_data # noqa: PLC0415
1117
1118 if self.decoded_self is not None:
1119 # Cached version of decoded object
1120 return self.decoded_self.get_data()
1121
1122 # Create decoded object
1123 decoded = DecodedStreamObject()
1124 decoded.set_data(decode_stream_data(self))
1125 for key, value in self.items():
1126 if key not in (StreamAttributes.LENGTH, StreamAttributes.FILTER, StreamAttributes.DECODE_PARMS):
1127 decoded[key] = value
1128 self.decoded_self = decoded
1129 return decoded.get_data()
1130
1131 # This overrides the parent method:
1132 def set_data(self, data: bytes) -> None:
1133 from ..filters import FlateDecode # noqa: PLC0415
1134
1135 if self.get(StreamAttributes.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1136 if not isinstance(data, bytes):
1137 raise TypeError("Data must be bytes")
1138 if self.decoded_self is None:
1139 self.get_data() # to create self.decoded_self
1140 assert self.decoded_self is not None, "mypy"
1141 self.decoded_self.set_data(data)
1142 super().set_data(FlateDecode.encode(data))
1143 else:
1144 raise PdfReadError(
1145 "Streams encoded with a filter different from FlateDecode are not supported"
1146 )
1147
1148
1149CONTENT_STREAM_ARRAY_MAX_LENGTH = 10_000
1150
1151
1152class ContentStream(DecodedStreamObject):
1153 """
1154 In order to be fast, this data structure can contain either:
1155
1156 * raw data in ._data
1157 * parsed stream operations in ._operations.
1158
1159 At any time, ContentStream object can either have both of those fields defined,
1160 or one field defined and the other set to None.
1161
1162 These fields are "rebuilt" lazily, when accessed:
1163
1164 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1165 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1166
1167 Conversely, these fields can be invalidated:
1168
1169 * when .set_data() is called, ._operations is set to None.
1170 * when .operations is set, ._data is set to None.
1171 """
1172
1173 def __init__(
1174 self,
1175 stream: Any,
1176 pdf: Any,
1177 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1178 ) -> None:
1179 self.pdf = pdf
1180 self._operations: list[tuple[Any, bytes]] = []
1181
1182 # stream may be a StreamObject or an ArrayObject containing
1183 # StreamObjects to be concatenated together.
1184 if stream is None:
1185 super().set_data(b"")
1186 else:
1187 stream = stream.get_object()
1188 if isinstance(stream, ArrayObject):
1189 from pypdf.filters import MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH # noqa: PLC0415
1190
1191 if (stream_length := len(stream)) > CONTENT_STREAM_ARRAY_MAX_LENGTH:
1192 raise LimitReachedError(
1193 f"Array-based stream has {stream_length} > {CONTENT_STREAM_ARRAY_MAX_LENGTH} elements."
1194 )
1195 data = bytearray()
1196 length = 0
1197 for s in stream:
1198 s_resolved = s.get_object()
1199 if isinstance(s_resolved, NullObject):
1200 continue
1201 if not isinstance(s_resolved, StreamObject):
1202 # No need to emit an exception here for now - the PDF structure
1203 # seems to already be broken beforehand in these cases.
1204 logger_warning(
1205 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.",
1206 __name__
1207 )
1208 else:
1209 new_data = s_resolved.get_data()
1210 length += len(new_data)
1211 if length > MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH:
1212 raise LimitReachedError(
1213 f"Array-based stream has at least {length} > "
1214 f"{MAX_ARRAY_BASED_STREAM_OUTPUT_LENGTH} output bytes."
1215 )
1216 data += new_data
1217 if len(data) == 0 or data[-1:] != b"\n":
1218 # There should be no direct need to check for a change of one byte.
1219 length += 1
1220 data += b"\n"
1221 super().set_data(bytes(data))
1222 else:
1223 stream_data = stream.get_data()
1224 assert stream_data is not None
1225 super().set_data(stream_data)
1226 self.forced_encoding = forced_encoding
1227
1228 def replicate(
1229 self,
1230 pdf_dest: PdfWriterProtocol,
1231 ) -> "ContentStream":
1232 d__ = cast(
1233 "ContentStream",
1234 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1235 )
1236 d__._data = self._data
1237 try:
1238 decoded_self = self.decoded_self
1239 if decoded_self is None:
1240 self.decoded_self = None
1241 else:
1242 self.decoded_self = cast(
1243 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1244 )
1245 except Exception:
1246 pass
1247 for k, v in self.items():
1248 d__[k.replicate(pdf_dest)] = (
1249 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1250 )
1251 return d__
1252 d__.set_data(self._data)
1253 d__.pdf = pdf_dest
1254 d__._operations = list(self._operations)
1255 d__.forced_encoding = self.forced_encoding
1256 return d__
1257
1258 def clone(
1259 self,
1260 pdf_dest: Any,
1261 force_duplicate: bool = False,
1262 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1263 ) -> "ContentStream":
1264 """
1265 Clone object into pdf_dest.
1266
1267 Args:
1268 pdf_dest:
1269 force_duplicate:
1270 ignore_fields:
1271
1272 Returns:
1273 The cloned ContentStream
1274
1275 """
1276 try:
1277 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
1278 return self
1279 except Exception:
1280 pass
1281
1282 visited: set[tuple[int, int]] = set()
1283 d__ = cast(
1284 "ContentStream",
1285 self._reference_clone(
1286 self.__class__(None, None), pdf_dest, force_duplicate
1287 ),
1288 )
1289 if ignore_fields is None:
1290 ignore_fields = []
1291 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1292 return d__
1293
1294 def _clone(
1295 self,
1296 src: DictionaryObject,
1297 pdf_dest: PdfWriterProtocol,
1298 force_duplicate: bool,
1299 ignore_fields: Optional[Sequence[Union[str, int]]],
1300 visited: set[tuple[int, int]],
1301 ) -> None:
1302 """
1303 Update the object from src.
1304
1305 Args:
1306 src:
1307 pdf_dest:
1308 force_duplicate:
1309 ignore_fields:
1310
1311 """
1312 src_cs = cast("ContentStream", src)
1313 super().set_data(src_cs._data)
1314 self.pdf = pdf_dest
1315 self._operations = list(src_cs._operations)
1316 self.forced_encoding = src_cs.forced_encoding
1317 # no need to call DictionaryObjection or anything
1318 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1319
1320 def _parse_content_stream(self, stream: StreamType) -> None:
1321 # 7.8.2 Content Streams
1322 stream.seek(0, 0)
1323 operands: list[Union[int, str, PdfObject]] = []
1324 while True:
1325 peek = read_non_whitespace(stream)
1326 if peek in (b"", 0):
1327 break
1328 stream.seek(-1, 1)
1329 if peek.isalpha() or peek in (b"'", b'"'):
1330 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1331 if operator == b"BI":
1332 # begin inline image - a completely different parsing
1333 # mechanism is required, of course... thanks buddy...
1334 assert operands == []
1335 ii = self._read_inline_image(stream)
1336 self._operations.append((ii, b"INLINE IMAGE"))
1337 else:
1338 self._operations.append((operands, operator))
1339 operands = []
1340 elif peek == b"%":
1341 # If we encounter a comment in the content stream, we have to
1342 # handle it here. Typically, read_object will handle
1343 # encountering a comment -- but read_object assumes that
1344 # following the comment must be the object we're trying to
1345 # read. In this case, it could be an operator instead.
1346 while peek not in (b"\r", b"\n", b""):
1347 peek = stream.read(1)
1348 else:
1349 operands.append(read_object(stream, None, self.forced_encoding))
1350
1351 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]:
1352 # begin reading just after the "BI" - begin image
1353 # first read the dictionary of settings.
1354 settings = DictionaryObject()
1355 while True:
1356 tok = read_non_whitespace(stream)
1357 stream.seek(-1, 1)
1358 if tok == b"I":
1359 # "ID" - begin of image data
1360 break
1361 key = read_object(stream, self.pdf)
1362 tok = read_non_whitespace(stream)
1363 stream.seek(-1, 1)
1364 value = read_object(stream, self.pdf)
1365 settings[key] = value
1366 # left at beginning of ID
1367 tmp = stream.read(3)
1368 assert tmp[:2] == b"ID"
1369 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1370 savpos = stream.tell()
1371 if isinstance(filtr, list):
1372 filtr = filtr[0] # used forencoding
1373 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1374 data = extract_inline__ascii_hex_decode(stream)
1375 elif "A85" in filtr or "ASCII85Decode" in filtr:
1376 data = extract_inline__ascii85_decode(stream)
1377 elif "RL" in filtr or "RunLengthDecode" in filtr:
1378 data = extract_inline__run_length_decode(stream)
1379 elif "DCT" in filtr or "DCTDecode" in filtr:
1380 data = extract_inline__dct_decode(stream)
1381 elif filtr == "not set":
1382 cs = settings.get("/CS", "")
1383 if isinstance(cs, list):
1384 cs = cs[0]
1385 if "RGB" in cs:
1386 lcs = 3
1387 elif "CMYK" in cs:
1388 lcs = 4
1389 else:
1390 bits = settings.get(
1391 "/BPC",
1392 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1393 )
1394 if bits > 0:
1395 lcs = bits / 8.0
1396 else:
1397 data = extract_inline_default(stream)
1398 lcs = -1
1399 if lcs > 0:
1400 data = stream.read(
1401 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1402 )
1403 # Move to the `EI` if possible.
1404 ei = read_non_whitespace(stream)
1405 stream.seek(-1, 1)
1406 else:
1407 data = extract_inline_default(stream)
1408
1409 ei = stream.read(3)
1410 stream.seek(-1, 1)
1411 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1412 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1413 stream.seek(savpos, 0)
1414 data = extract_inline_default(stream)
1415 ei = stream.read(3)
1416 stream.seek(-1, 1)
1417 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1418 # Check the same condition again. This should never fail as
1419 # edge cases are covered by `extract_inline_default` above,
1420 # but check this ot make sure that we are behind the `EI` afterwards.
1421 raise PdfStreamError(
1422 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1423 )
1424 return {"settings": settings, "data": data}
1425
1426 # This overrides the parent method
1427 def get_data(self) -> bytes:
1428 if not self._data:
1429 new_data = BytesIO()
1430 for operands, operator in self._operations:
1431 if operator == b"INLINE IMAGE":
1432 new_data.write(b"BI")
1433 dict_text = BytesIO()
1434 operands["settings"].write_to_stream(dict_text)
1435 new_data.write(dict_text.getvalue()[2:-2])
1436 new_data.write(b"ID ")
1437 new_data.write(operands["data"])
1438 new_data.write(b"EI")
1439 else:
1440 for op in operands:
1441 op.write_to_stream(new_data)
1442 new_data.write(b" ")
1443 new_data.write(operator)
1444 new_data.write(b"\n")
1445 self._data = new_data.getvalue()
1446 return self._data
1447
1448 # This overrides the parent method
1449 def set_data(self, data: bytes) -> None:
1450 super().set_data(data)
1451 self._operations = []
1452
1453 @property
1454 def operations(self) -> list[tuple[Any, bytes]]:
1455 if not self._operations and self._data:
1456 self._parse_content_stream(BytesIO(self._data))
1457 self._data = b""
1458 return self._operations
1459
1460 @operations.setter
1461 def operations(self, operations: list[tuple[Any, bytes]]) -> None:
1462 self._operations = operations
1463 self._data = b""
1464
1465 def isolate_graphics_state(self) -> None:
1466 if self._operations:
1467 self._operations.insert(0, ([], b"q"))
1468 self._operations.append(([], b"Q"))
1469 elif self._data:
1470 self._data = b"q\n" + self._data + b"\nQ\n"
1471
1472 # This overrides the parent method
1473 def write_to_stream(
1474 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1475 ) -> None:
1476 if not self._data and self._operations:
1477 self.get_data() # this ensures ._data is rebuilt
1478 super().write_to_stream(stream, encryption_key)
1479
1480
1481def read_object(
1482 stream: StreamType,
1483 pdf: Optional[PdfReaderProtocol],
1484 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1485) -> Union[PdfObject, int, str, ContentStream]:
1486 tok = stream.read(1)
1487 stream.seek(-1, 1) # reset to start
1488 if tok == b"/":
1489 return NameObject.read_from_stream(stream, pdf)
1490 if tok == b"<":
1491 # hexadecimal string OR dictionary
1492 peek = stream.read(2)
1493 stream.seek(-2, 1) # reset to start
1494 if peek == b"<<":
1495 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1496 return read_hex_string_from_stream(stream, forced_encoding)
1497 if tok == b"[":
1498 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1499 if tok in (b"t", b"f"):
1500 return BooleanObject.read_from_stream(stream)
1501 if tok == b"(":
1502 return read_string_from_stream(stream, forced_encoding)
1503 if tok == b"e" and stream.read(6) == b"endobj":
1504 return NullObject()
1505 if tok == b"n":
1506 return NullObject.read_from_stream(stream)
1507 if tok == b"%":
1508 # comment
1509 skip_over_comment(stream)
1510 tok = read_non_whitespace(stream)
1511 stream.seek(-1, 1)
1512 return read_object(stream, pdf, forced_encoding)
1513 if tok in b"0123456789+-.":
1514 # number object OR indirect reference
1515 peek = stream.read(20)
1516 stream.seek(-len(peek), 1) # reset to start
1517 if IndirectPattern.match(peek) is not None:
1518 assert pdf is not None, "mypy"
1519 return IndirectObject.read_from_stream(stream, pdf)
1520 return NumberObject.read_from_stream(stream)
1521 pos = stream.tell()
1522 stream.seek(-20, 1)
1523 stream_extract = stream.read(80)
1524 stream.seek(pos)
1525 read_until_whitespace(stream)
1526 raise PdfReadError(
1527 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1528 )
1529
1530
1531class Field(TreeObject):
1532 """
1533 A class representing a field dictionary.
1534
1535 This class is accessed through
1536 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1537 """
1538
1539 def __init__(self, data: DictionaryObject) -> None:
1540 DictionaryObject.__init__(self)
1541 field_attributes = (
1542 FieldDictionaryAttributes.attributes()
1543 + CheckboxRadioButtonAttributes.attributes()
1544 )
1545 self.indirect_reference = data.indirect_reference
1546 for attr in field_attributes:
1547 try:
1548 self[NameObject(attr)] = data[attr]
1549 except KeyError:
1550 pass
1551 if isinstance(self.get("/V"), EncodedStreamObject):
1552 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1553 if isinstance(d, bytes):
1554 d_str = d.decode()
1555 elif d is None:
1556 d_str = ""
1557 else:
1558 raise Exception("Should never happen")
1559 self[NameObject("/V")] = TextStringObject(d_str)
1560
1561 # TABLE 8.69 Entries common to all field dictionaries
1562 @property
1563 def field_type(self) -> Optional[NameObject]:
1564 """Read-only property accessing the type of this field."""
1565 return self.get(FieldDictionaryAttributes.FT)
1566
1567 @property
1568 def parent(self) -> Optional[DictionaryObject]:
1569 """Read-only property accessing the parent of this field."""
1570 return self.get(FieldDictionaryAttributes.Parent)
1571
1572 @property
1573 def kids(self) -> Optional["ArrayObject"]:
1574 """Read-only property accessing the kids of this field."""
1575 return self.get(FieldDictionaryAttributes.Kids)
1576
1577 @property
1578 def name(self) -> Optional[str]:
1579 """Read-only property accessing the name of this field."""
1580 return self.get(FieldDictionaryAttributes.T)
1581
1582 @property
1583 def alternate_name(self) -> Optional[str]:
1584 """Read-only property accessing the alternate name of this field."""
1585 return self.get(FieldDictionaryAttributes.TU)
1586
1587 @property
1588 def mapping_name(self) -> Optional[str]:
1589 """
1590 Read-only property accessing the mapping name of this field.
1591
1592 This name is used by pypdf as a key in the dictionary returned by
1593 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1594 """
1595 return self.get(FieldDictionaryAttributes.TM)
1596
1597 @property
1598 def flags(self) -> Optional[int]:
1599 """
1600 Read-only property accessing the field flags, specifying various
1601 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1602 """
1603 return self.get(FieldDictionaryAttributes.Ff)
1604
1605 @property
1606 def value(self) -> Optional[Any]:
1607 """
1608 Read-only property accessing the value of this field.
1609
1610 Format varies based on field type.
1611 """
1612 return self.get(FieldDictionaryAttributes.V)
1613
1614 @property
1615 def default_value(self) -> Optional[Any]:
1616 """Read-only property accessing the default value of this field."""
1617 return self.get(FieldDictionaryAttributes.DV)
1618
1619 @property
1620 def additional_actions(self) -> Optional[DictionaryObject]:
1621 """
1622 Read-only property accessing the additional actions dictionary.
1623
1624 This dictionary defines the field's behavior in response to trigger
1625 events. See Section 8.5.2 of the PDF 1.7 reference.
1626 """
1627 return self.get(FieldDictionaryAttributes.AA)
1628
1629
1630class Destination(TreeObject):
1631 """
1632 A class representing a destination within a PDF file.
1633
1634 See section 12.3.2 of the PDF 2.0 reference.
1635
1636 Args:
1637 title: Title of this destination.
1638 page: Reference to the page of this destination. Should
1639 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1640 fit: How the destination is displayed.
1641
1642 Raises:
1643 PdfReadError: If destination type is invalid.
1644
1645 """
1646
1647 node: Optional[
1648 DictionaryObject
1649 ] = None # node provide access to the original Object
1650
1651 def __init__(
1652 self,
1653 title: Union[str, bytes],
1654 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1655 fit: Fit,
1656 ) -> None:
1657 self._filtered_children: list[Any] = [] # used in PdfWriter
1658
1659 typ = fit.fit_type
1660 args = fit.fit_args
1661
1662 DictionaryObject.__init__(self)
1663 self[NameObject("/Title")] = TextStringObject(title)
1664 self[NameObject("/Page")] = page
1665 self[NameObject("/Type")] = typ
1666
1667 # from table 8.2 of the PDF 1.7 reference.
1668 if typ == "/XYZ":
1669 if len(args) < 1: # left is missing : should never occur
1670 args.append(NumberObject(0.0))
1671 if len(args) < 2: # top is missing
1672 args.append(NumberObject(0.0))
1673 if len(args) < 3: # zoom is missing
1674 args.append(NumberObject(0.0))
1675 (
1676 self[NameObject(TA.LEFT)],
1677 self[NameObject(TA.TOP)],
1678 self[NameObject("/Zoom")],
1679 ) = args
1680 elif len(args) == 0:
1681 pass
1682 elif typ == TF.FIT_R:
1683 (
1684 self[NameObject(TA.LEFT)],
1685 self[NameObject(TA.BOTTOM)],
1686 self[NameObject(TA.RIGHT)],
1687 self[NameObject(TA.TOP)],
1688 ) = args
1689 elif typ in [TF.FIT_H, TF.FIT_BH]:
1690 try: # Prefer to be more robust not only to null parameters
1691 (self[NameObject(TA.TOP)],) = args
1692 except Exception:
1693 (self[NameObject(TA.TOP)],) = (NullObject(),)
1694 elif typ in [TF.FIT_V, TF.FIT_BV]:
1695 try: # Prefer to be more robust not only to null parameters
1696 (self[NameObject(TA.LEFT)],) = args
1697 except Exception:
1698 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1699 elif typ in [TF.FIT, TF.FIT_B]:
1700 pass
1701 else:
1702 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1703
1704 @property
1705 def dest_array(self) -> "ArrayObject":
1706 return ArrayObject(
1707 [self.raw_get("/Page"), self["/Type"]]
1708 + [
1709 self[x]
1710 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1711 if x in self
1712 ]
1713 )
1714
1715 def write_to_stream(
1716 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1717 ) -> None:
1718 if encryption_key is not None: # deprecated
1719 deprecation_no_replacement(
1720 "the encryption_key parameter of write_to_stream", "5.0.0"
1721 )
1722 stream.write(b"<<\n")
1723 key = NameObject("/D")
1724 key.write_to_stream(stream)
1725 stream.write(b" ")
1726 value = self.dest_array
1727 value.write_to_stream(stream)
1728
1729 key = NameObject("/S")
1730 key.write_to_stream(stream)
1731 stream.write(b" ")
1732 value_s = NameObject("/GoTo")
1733 value_s.write_to_stream(stream)
1734
1735 stream.write(b"\n")
1736 stream.write(b">>")
1737
1738 @property
1739 def title(self) -> Optional[str]:
1740 """Read-only property accessing the destination title."""
1741 return self.get("/Title")
1742
1743 @property
1744 def page(self) -> Optional[IndirectObject]:
1745 """Read-only property accessing the IndirectObject of the destination page."""
1746 return self.get("/Page")
1747
1748 @property
1749 def typ(self) -> Optional[str]:
1750 """Read-only property accessing the destination type."""
1751 return self.get("/Type")
1752
1753 @property
1754 def zoom(self) -> Optional[int]:
1755 """Read-only property accessing the zoom factor."""
1756 return self.get("/Zoom", None)
1757
1758 @property
1759 def left(self) -> Optional[FloatObject]:
1760 """Read-only property accessing the left horizontal coordinate."""
1761 return self.get("/Left", None)
1762
1763 @property
1764 def right(self) -> Optional[FloatObject]:
1765 """Read-only property accessing the right horizontal coordinate."""
1766 return self.get("/Right", None)
1767
1768 @property
1769 def top(self) -> Optional[FloatObject]:
1770 """Read-only property accessing the top vertical coordinate."""
1771 return self.get("/Top", None)
1772
1773 @property
1774 def bottom(self) -> Optional[FloatObject]:
1775 """Read-only property accessing the bottom vertical coordinate."""
1776 return self.get("/Bottom", None)
1777
1778 @property
1779 def color(self) -> Optional["ArrayObject"]:
1780 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1781 return cast(
1782 "ArrayObject",
1783 self.get("/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])),
1784 )
1785
1786 @property
1787 def font_format(self) -> Optional[OutlineFontFlag]:
1788 """
1789 Read-only property accessing the font type.
1790
1791 1=italic, 2=bold, 3=both
1792 """
1793 return OutlineFontFlag(self.get("/F", 0))
1794
1795 @property
1796 def outline_count(self) -> Optional[int]:
1797 """
1798 Read-only property accessing the outline count.
1799
1800 positive = expanded
1801 negative = collapsed
1802 absolute value = number of visible descendants at all levels
1803 """
1804 return self.get("/Count", None)