1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from collections.abc import Iterable, Sequence
36from io import BytesIO
37from math import ceil
38from typing import (
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
45
46from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
47from .._utils import (
48 WHITESPACES,
49 StreamType,
50 deprecation_no_replacement,
51 logger_warning,
52 read_non_whitespace,
53 read_until_regex,
54 read_until_whitespace,
55 skip_over_comment,
56)
57from ..constants import (
58 CheckboxRadioButtonAttributes,
59 FieldDictionaryAttributes,
60 OutlineFontFlag,
61)
62from ..constants import FilterTypes as FT
63from ..constants import StreamAttributes as SA
64from ..constants import TypArguments as TA
65from ..constants import TypFitArguments as TF
66from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError
67from ._base import (
68 BooleanObject,
69 ByteStringObject,
70 FloatObject,
71 IndirectObject,
72 NameObject,
73 NullObject,
74 NumberObject,
75 PdfObject,
76 TextStringObject,
77 is_null_or_none,
78)
79from ._fit import Fit
80from ._image_inline import (
81 extract_inline__ascii85_decode,
82 extract_inline__ascii_hex_decode,
83 extract_inline__dct_decode,
84 extract_inline__run_length_decode,
85 extract_inline_default,
86)
87from ._utils import read_hex_string_from_stream, read_string_from_stream
88
89if sys.version_info >= (3, 11):
90 from typing import Self
91else:
92 from typing_extensions import Self
93
94logger = logging.getLogger(__name__)
95
96IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
97
98
99class ArrayObject(list[Any], PdfObject):
100 def replicate(
101 self,
102 pdf_dest: PdfWriterProtocol,
103 ) -> "ArrayObject":
104 arr = cast(
105 "ArrayObject",
106 self._reference_clone(ArrayObject(), pdf_dest, False),
107 )
108 for data in self:
109 if hasattr(data, "replicate"):
110 arr.append(data.replicate(pdf_dest))
111 else:
112 arr.append(data)
113 return arr
114
115 def clone(
116 self,
117 pdf_dest: PdfWriterProtocol,
118 force_duplicate: bool = False,
119 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
120 ) -> "ArrayObject":
121 """Clone object into pdf_dest."""
122 try:
123 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
124 return self
125 except Exception:
126 pass
127 arr = cast(
128 "ArrayObject",
129 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate),
130 )
131 for data in self:
132 if isinstance(data, StreamObject):
133 dup = data._reference_clone(
134 data.clone(pdf_dest, force_duplicate, ignore_fields),
135 pdf_dest,
136 force_duplicate,
137 )
138 arr.append(dup.indirect_reference)
139 elif hasattr(data, "clone"):
140 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
141 else:
142 arr.append(data)
143 return arr
144
145 def hash_bin(self) -> int:
146 """
147 Used to detect modified object.
148
149 Returns:
150 Hash considering type and value.
151
152 """
153 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
154
155 def items(self) -> Iterable[Any]:
156 """Emulate DictionaryObject.items for a list (index, object)."""
157 return enumerate(self)
158
159 def _to_lst(self, lst: Any) -> list[Any]:
160 # Convert to list, internal
161 if isinstance(lst, (list, tuple, set)):
162 pass
163 elif isinstance(lst, PdfObject):
164 lst = [lst]
165 elif isinstance(lst, str):
166 if lst[0] == "/":
167 lst = [NameObject(lst)]
168 else:
169 lst = [TextStringObject(lst)]
170 elif isinstance(lst, bytes):
171 lst = [ByteStringObject(lst)]
172 else: # for numbers,...
173 lst = [lst]
174 return lst
175
176 def __add__(self, lst: Any) -> "ArrayObject":
177 """
178 Allow extension by adding list or add one element only
179
180 Args:
181 lst: any list, tuples are extended the list.
182 other types(numbers,...) will be appended.
183 if str is passed it will be converted into TextStringObject
184 or NameObject (if starting with "/")
185 if bytes is passed it will be converted into ByteStringObject
186
187 Returns:
188 ArrayObject with all elements
189
190 """
191 temp = ArrayObject(self)
192 temp.extend(self._to_lst(lst))
193 return temp
194
195 def __iadd__(self, lst: Any) -> Self:
196 """
197 Allow extension by adding list or add one element only
198
199 Args:
200 lst: any list, tuples are extended the list.
201 other types(numbers,...) will be appended.
202 if str is passed it will be converted into TextStringObject
203 or NameObject (if starting with "/")
204 if bytes is passed it will be converted into ByteStringObject
205
206 """
207 self.extend(self._to_lst(lst))
208 return self
209
210 def __isub__(self, lst: Any) -> Self:
211 """Allow to remove items"""
212 for x in self._to_lst(lst):
213 try:
214 index = self.index(x)
215 del self[index]
216 except ValueError:
217 pass
218 return self
219
220 def write_to_stream(
221 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
222 ) -> None:
223 if encryption_key is not None: # deprecated
224 deprecation_no_replacement(
225 "the encryption_key parameter of write_to_stream", "5.0.0"
226 )
227 stream.write(b"[")
228 for data in self:
229 stream.write(b" ")
230 data.write_to_stream(stream)
231 stream.write(b" ]")
232
233 @staticmethod
234 def read_from_stream(
235 stream: StreamType,
236 pdf: Optional[PdfReaderProtocol],
237 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
238 ) -> "ArrayObject":
239 arr = ArrayObject()
240 tmp = stream.read(1)
241 if tmp != b"[":
242 raise PdfReadError("Could not read array")
243 while True:
244 # skip leading whitespace
245 tok = stream.read(1)
246 while tok.isspace():
247 tok = stream.read(1)
248 if tok == b"":
249 break
250 if tok == b"%":
251 stream.seek(-1, 1)
252 skip_over_comment(stream)
253 continue
254 stream.seek(-1, 1)
255 # check for array ending
256 peek_ahead = stream.read(1)
257 if peek_ahead == b"]":
258 break
259 stream.seek(-1, 1)
260 # read and append object
261 arr.append(read_object(stream, pdf, forced_encoding))
262 return arr
263
264
265class DictionaryObject(dict[Any, Any], PdfObject):
266 def replicate(
267 self,
268 pdf_dest: PdfWriterProtocol,
269 ) -> "DictionaryObject":
270 d__ = cast(
271 "DictionaryObject",
272 self._reference_clone(self.__class__(), pdf_dest, False),
273 )
274 for k, v in self.items():
275 d__[k.replicate(pdf_dest)] = (
276 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
277 )
278 return d__
279
280 def clone(
281 self,
282 pdf_dest: PdfWriterProtocol,
283 force_duplicate: bool = False,
284 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
285 ) -> "DictionaryObject":
286 """Clone object into pdf_dest."""
287 try:
288 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
289 return self
290 except Exception:
291 pass
292
293 visited: set[tuple[int, int]] = set() # (idnum, generation)
294 d__ = cast(
295 "DictionaryObject",
296 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
297 )
298 if ignore_fields is None:
299 ignore_fields = []
300 if len(d__.keys()) == 0:
301 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
302 return d__
303
304 def _clone(
305 self,
306 src: "DictionaryObject",
307 pdf_dest: PdfWriterProtocol,
308 force_duplicate: bool,
309 ignore_fields: Optional[Sequence[Union[str, int]]],
310 visited: set[tuple[int, int]], # (idnum, generation)
311 ) -> None:
312 """
313 Update the object from src.
314
315 Args:
316 src: "DictionaryObject":
317 pdf_dest:
318 force_duplicate:
319 ignore_fields:
320
321 """
322 # First we remove the ignore_fields
323 # that are for a limited number of levels
324 assert ignore_fields is not None
325 ignore_fields = list(ignore_fields)
326 x = 0
327 while x < len(ignore_fields):
328 if isinstance(ignore_fields[x], int):
329 if cast(int, ignore_fields[x]) <= 0:
330 del ignore_fields[x]
331 del ignore_fields[x]
332 continue
333 ignore_fields[x] -= 1 # type:ignore
334 x += 1
335 # Check if this is a chain list, we need to loop to prevent recur
336 if any(
337 field not in ignore_fields
338 and field in src
339 and isinstance(src.raw_get(field), IndirectObject)
340 and isinstance(src[field], DictionaryObject)
341 and (
342 src.get("/Type", None) is None
343 or cast(DictionaryObject, src[field]).get("/Type", None) is None
344 or src.get("/Type", None)
345 == cast(DictionaryObject, src[field]).get("/Type", None)
346 )
347 for field in ["/Next", "/Prev", "/N", "/V"]
348 ):
349 ignore_fields = list(ignore_fields)
350 for lst in (("/Next", "/Prev"), ("/N", "/V")):
351 for k in lst:
352 objs = []
353 if (
354 k in src
355 and k not in self
356 and isinstance(src.raw_get(k), IndirectObject)
357 and isinstance(src[k], DictionaryObject)
358 # If need to go further the idea is to check
359 # that the types are the same
360 and (
361 src.get("/Type", None) is None
362 or cast(DictionaryObject, src[k]).get("/Type", None) is None
363 or src.get("/Type", None)
364 == cast(DictionaryObject, src[k]).get("/Type", None)
365 )
366 ):
367 cur_obj: Optional[DictionaryObject] = cast(
368 "DictionaryObject", src[k]
369 )
370 prev_obj: Optional[DictionaryObject] = self
371 while cur_obj is not None:
372 clon = cast(
373 "DictionaryObject",
374 cur_obj._reference_clone(
375 cur_obj.__class__(), pdf_dest, force_duplicate
376 ),
377 )
378 # Check to see if we've previously processed our item
379 if clon.indirect_reference is not None:
380 idnum = clon.indirect_reference.idnum
381 generation = clon.indirect_reference.generation
382 if (idnum, generation) in visited:
383 cur_obj = None
384 break
385 visited.add((idnum, generation))
386 objs.append((cur_obj, clon))
387 assert prev_obj is not None
388 prev_obj[NameObject(k)] = clon.indirect_reference
389 prev_obj = clon
390 try:
391 if cur_obj == src:
392 cur_obj = None
393 else:
394 cur_obj = cast("DictionaryObject", cur_obj[k])
395 except Exception:
396 cur_obj = None
397 for s, c in objs:
398 c._clone(
399 s, pdf_dest, force_duplicate, ignore_fields, visited
400 )
401
402 for k, v in src.items():
403 if k not in ignore_fields:
404 if isinstance(v, StreamObject):
405 if not hasattr(v, "indirect_reference"):
406 v.indirect_reference = None
407 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
408 assert vv.indirect_reference is not None
409 self[k.clone(pdf_dest)] = vv.indirect_reference
410 elif k not in self:
411 self[NameObject(k)] = (
412 v.clone(pdf_dest, force_duplicate, ignore_fields)
413 if hasattr(v, "clone")
414 else v
415 )
416
417 def hash_bin(self) -> int:
418 """
419 Used to detect modified object.
420
421 Returns:
422 Hash considering type and value.
423
424 """
425 return hash(
426 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
427 )
428
429 def raw_get(self, key: Any) -> Any:
430 return dict.__getitem__(self, key)
431
432 def get_inherited(self, key: str, default: Any = None) -> Any:
433 """
434 Returns the value of a key or from the parent if not found.
435 If not found returns default.
436
437 Args:
438 key: string identifying the field to return
439
440 default: default value to return
441
442 Returns:
443 Current key or inherited one, otherwise default value.
444
445 """
446 if key in self:
447 return self[key]
448 try:
449 if "/Parent" not in self:
450 return default
451 raise KeyError("Not present")
452 except KeyError:
453 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited(
454 key, default
455 )
456
457 def __setitem__(self, key: Any, value: Any) -> Any:
458 if not isinstance(key, PdfObject):
459 raise ValueError("Key must be a PdfObject")
460 if not isinstance(value, PdfObject):
461 raise ValueError("Value must be a PdfObject")
462 return dict.__setitem__(self, key, value)
463
464 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
465 if not isinstance(key, PdfObject):
466 raise ValueError("Key must be a PdfObject")
467 if not isinstance(value, PdfObject):
468 raise ValueError("Value must be a PdfObject")
469 return dict.setdefault(self, key, value)
470
471 def __getitem__(self, key: Any) -> PdfObject:
472 return dict.__getitem__(self, key).get_object()
473
474 @property
475 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
476 """
477 Retrieve XMP (Extensible Metadata Platform) data relevant to this
478 object, if available.
479
480 See Table 347 — Additional entries in a metadata stream dictionary.
481
482 Returns:
483 Returns a :class:`~pypdf.xmp.XmpInformation` instance
484 that can be used to access XMP metadata from the document. Can also
485 return None if no metadata was found on the document root.
486
487 """
488 from ..xmp import XmpInformation # noqa: PLC0415
489
490 metadata = self.get("/Metadata", None)
491 if is_null_or_none(metadata):
492 return None
493 assert metadata is not None, "mypy"
494 metadata = metadata.get_object()
495 return XmpInformation(metadata)
496
497 def write_to_stream(
498 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
499 ) -> None:
500 if encryption_key is not None: # deprecated
501 deprecation_no_replacement(
502 "the encryption_key parameter of write_to_stream", "5.0.0"
503 )
504 stream.write(b"<<\n")
505 for key, value in self.items():
506 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
507 continue
508 key.write_to_stream(stream, encryption_key)
509 stream.write(b" ")
510 value.write_to_stream(stream)
511 stream.write(b"\n")
512 stream.write(b">>")
513
514 @classmethod
515 def _get_next_object_position(
516 cls, position_before: int, position_end: int, generations: list[int], pdf: PdfReaderProtocol
517 ) -> int:
518 out = position_end
519 for generation in generations:
520 location = pdf.xref[generation]
521 values = [x for x in location.values() if position_before < x <= position_end]
522 if values:
523 out = min(out, *values)
524 return out
525
526 @classmethod
527 def _read_unsized_from_stream(
528 cls, stream: StreamType, pdf: PdfReaderProtocol
529 ) -> bytes:
530 object_position = cls._get_next_object_position(
531 position_before=stream.tell(), position_end=2 ** 32, generations=list(pdf.xref), pdf=pdf
532 ) - 1
533 current_position = stream.tell()
534 # Read until the next object position.
535 read_value = stream.read(object_position - stream.tell())
536 endstream_position = read_value.find(b"endstream")
537 if endstream_position < 0:
538 raise PdfReadError(
539 f"Unable to find 'endstream' marker for obj starting at {current_position}."
540 )
541 # 9 = len(b"endstream")
542 stream.seek(current_position + endstream_position + 9)
543 return read_value[: endstream_position - 1]
544
545 @staticmethod
546 def read_from_stream(
547 stream: StreamType,
548 pdf: Optional[PdfReaderProtocol],
549 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
550 ) -> "DictionaryObject":
551 tmp = stream.read(2)
552 if tmp != b"<<":
553 raise PdfReadError(
554 f"Dictionary read error at byte {hex(stream.tell())}: "
555 "stream must begin with '<<'"
556 )
557 data: dict[Any, Any] = {}
558 while True:
559 tok = read_non_whitespace(stream)
560 if tok == b"\x00":
561 continue
562 if tok == b"%":
563 stream.seek(-1, 1)
564 skip_over_comment(stream)
565 continue
566 if not tok:
567 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
568
569 if tok == b">":
570 stream.read(1)
571 break
572 stream.seek(-1, 1)
573 try:
574 try:
575 key = read_object(stream, pdf)
576 if isinstance(key, NullObject):
577 break
578 if not isinstance(key, NameObject):
579 raise PdfReadError(
580 f"Expecting a NameObject for key but found {key!r}"
581 )
582 except PdfReadError as exc:
583 if pdf is not None and pdf.strict:
584 raise
585 logger_warning(exc.__repr__(), __name__)
586 continue
587 tok = read_non_whitespace(stream)
588 stream.seek(-1, 1)
589 value = read_object(stream, pdf, forced_encoding)
590 except Exception as exc:
591 if pdf is not None and pdf.strict:
592 raise PdfReadError(exc.__repr__())
593 logger_warning(exc.__repr__(), __name__)
594 retval = DictionaryObject()
595 retval.update(data)
596 return retval # return partial data
597
598 if not data.get(key):
599 data[key] = value
600 else:
601 # multiple definitions of key not permitted
602 msg = (
603 f"Multiple definitions in dictionary at byte "
604 f"{hex(stream.tell())} for key {key}"
605 )
606 if pdf is not None and pdf.strict:
607 raise PdfReadError(msg)
608 logger_warning(msg, __name__)
609
610 pos = stream.tell()
611 s = read_non_whitespace(stream)
612 if s == b"s" and stream.read(5) == b"tream":
613 eol = stream.read(1)
614 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
615 # patch provided by Danial Sandler
616 while eol == b" ":
617 eol = stream.read(1)
618 if eol not in (b"\n", b"\r"):
619 raise PdfStreamError("Stream data must be followed by a newline")
620 if eol == b"\r" and stream.read(1) != b"\n":
621 stream.seek(-1, 1)
622 # this is a stream object, not a dictionary
623 if SA.LENGTH not in data:
624 if pdf is not None and pdf.strict:
625 raise PdfStreamError("Stream length not defined")
626 logger_warning(
627 f"Stream length not defined @pos={stream.tell()}", __name__
628 )
629 data[NameObject(SA.LENGTH)] = NumberObject(-1)
630 length = data[SA.LENGTH]
631 if isinstance(length, IndirectObject):
632 t = stream.tell()
633 assert pdf is not None, "mypy"
634 length = pdf.get_object(length)
635 stream.seek(t, 0)
636 if length is None: # if the PDF is damaged
637 length = -1
638 pstart = stream.tell()
639 if length >= 0:
640 data["__streamdata__"] = stream.read(length)
641 else:
642 data["__streamdata__"] = read_until_regex(
643 stream, re.compile(b"endstream")
644 )
645 e = read_non_whitespace(stream)
646 ndstream = stream.read(8)
647 if (e + ndstream) != b"endstream":
648 # the odd PDF file has a length that is too long, so
649 # we need to read backwards to find the "endstream" ending.
650 # ReportLab (unknown version) generates files with this bug,
651 # and Python users into PDF files tend to be our audience.
652 # we need to do this to correct the streamdata and chop off
653 # an extra character.
654 pos = stream.tell()
655 stream.seek(-10, 1)
656 end = stream.read(9)
657 if end == b"endstream":
658 # we found it by looking back one character further.
659 data["__streamdata__"] = data["__streamdata__"][:-1]
660 elif pdf is not None and not pdf.strict:
661 stream.seek(pstart, 0)
662 data["__streamdata__"] = DictionaryObject._read_unsized_from_stream(stream, pdf)
663 pos = stream.tell()
664 else:
665 stream.seek(pos, 0)
666 raise PdfReadError(
667 "Unable to find 'endstream' marker after stream at byte "
668 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
669 )
670 else:
671 stream.seek(pos, 0)
672 if "__streamdata__" in data:
673 return StreamObject.initialize_from_dictionary(data)
674 retval = DictionaryObject()
675 retval.update(data)
676 return retval
677
678
679class TreeObject(DictionaryObject):
680 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
681 DictionaryObject.__init__(self)
682 if dct:
683 self.update(dct)
684
685 def has_children(self) -> bool:
686 return "/First" in self
687
688 def __iter__(self) -> Any:
689 return self.children()
690
691 def children(self) -> Iterable[Any]:
692 if not self.has_children():
693 return
694
695 child_ref = self[NameObject("/First")]
696 child = child_ref.get_object()
697 while True:
698 yield child
699 if child == self[NameObject("/Last")]:
700 return
701 child_ref = child.get(NameObject("/Next")) # type: ignore
702 if is_null_or_none(child_ref):
703 return
704 child = child_ref.get_object()
705
706 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
707 self.insert_child(child, None, pdf)
708
709 def inc_parent_counter_default(
710 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
711 ) -> None:
712 if is_null_or_none(parent):
713 return
714 assert parent is not None, "mypy"
715 parent = cast("TreeObject", parent.get_object())
716 if "/Count" in parent:
717 parent[NameObject("/Count")] = NumberObject(
718 max(0, cast(int, parent[NameObject("/Count")]) + n)
719 )
720 self.inc_parent_counter_default(parent.get("/Parent", None), n)
721
722 def inc_parent_counter_outline(
723 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
724 ) -> None:
725 if is_null_or_none(parent):
726 return
727 assert parent is not None, "mypy"
728 parent = cast("TreeObject", parent.get_object())
729 # BooleanObject requires comparison with == not is
730 opn = parent.get("/%is_open%", True) == True # noqa: E712
731 c = cast(int, parent.get("/Count", 0))
732 if c < 0:
733 c = abs(c)
734 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
735 if not opn:
736 return
737 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
738
739 def insert_child(
740 self,
741 child: Any,
742 before: Any,
743 pdf: PdfWriterProtocol,
744 inc_parent_counter: Optional[Callable[..., Any]] = None,
745 ) -> IndirectObject:
746 if inc_parent_counter is None:
747 inc_parent_counter = self.inc_parent_counter_default
748 child_obj = child.get_object()
749 child = child.indirect_reference # get_reference(child_obj)
750
751 prev: Optional[DictionaryObject]
752 if "/First" not in self: # no child yet
753 self[NameObject("/First")] = child
754 self[NameObject("/Count")] = NumberObject(0)
755 self[NameObject("/Last")] = child
756 child_obj[NameObject("/Parent")] = self.indirect_reference
757 inc_parent_counter(self, child_obj.get("/Count", 1))
758 if "/Next" in child_obj:
759 del child_obj["/Next"]
760 if "/Prev" in child_obj:
761 del child_obj["/Prev"]
762 return child
763 prev = cast("DictionaryObject", self["/Last"])
764
765 while prev.indirect_reference != before:
766 if "/Next" in prev:
767 prev = cast("TreeObject", prev["/Next"])
768 else: # append at the end
769 prev[NameObject("/Next")] = cast("TreeObject", child)
770 child_obj[NameObject("/Prev")] = prev.indirect_reference
771 child_obj[NameObject("/Parent")] = self.indirect_reference
772 if "/Next" in child_obj:
773 del child_obj["/Next"]
774 self[NameObject("/Last")] = child
775 inc_parent_counter(self, child_obj.get("/Count", 1))
776 return child
777 try: # insert as first or in the middle
778 assert isinstance(prev["/Prev"], DictionaryObject)
779 prev["/Prev"][NameObject("/Next")] = child
780 child_obj[NameObject("/Prev")] = prev["/Prev"]
781 except Exception: # it means we are inserting in first position
782 del child_obj["/Next"]
783 child_obj[NameObject("/Next")] = prev
784 prev[NameObject("/Prev")] = child
785 child_obj[NameObject("/Parent")] = self.indirect_reference
786 inc_parent_counter(self, child_obj.get("/Count", 1))
787 return child
788
789 def _remove_node_from_tree(
790 self, prev: Any, prev_ref: Any, cur: Any, last: Any
791 ) -> None:
792 """
793 Adjust the pointers of the linked list and tree node count.
794
795 Args:
796 prev:
797 prev_ref:
798 cur:
799 last:
800
801 """
802 next_ref = cur.get(NameObject("/Next"), None)
803 if prev is None:
804 if next_ref:
805 # Removing first tree node
806 next_obj = next_ref.get_object()
807 del next_obj[NameObject("/Prev")]
808 self[NameObject("/First")] = next_ref
809 self[NameObject("/Count")] = NumberObject(
810 self[NameObject("/Count")] - 1 # type: ignore
811 )
812
813 else:
814 # Removing only tree node
815 self[NameObject("/Count")] = NumberObject(0)
816 del self[NameObject("/First")]
817 if NameObject("/Last") in self:
818 del self[NameObject("/Last")]
819 else:
820 if next_ref:
821 # Removing middle tree node
822 next_obj = next_ref.get_object()
823 next_obj[NameObject("/Prev")] = prev_ref
824 prev[NameObject("/Next")] = next_ref
825 else:
826 # Removing last tree node
827 assert cur == last
828 del prev[NameObject("/Next")]
829 self[NameObject("/Last")] = prev_ref
830 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore
831
832 def remove_child(self, child: Any) -> None:
833 child_obj = child.get_object()
834 child = child_obj.indirect_reference
835
836 if NameObject("/Parent") not in child_obj:
837 raise ValueError("Removed child does not appear to be a tree item")
838 if child_obj[NameObject("/Parent")] != self:
839 raise ValueError("Removed child is not a member of this tree")
840
841 found = False
842 prev_ref = None
843 prev = None
844 cur_ref: Optional[Any] = self[NameObject("/First")]
845 cur: Optional[dict[str, Any]] = cur_ref.get_object() # type: ignore
846 last_ref = self[NameObject("/Last")]
847 last = last_ref.get_object()
848 while cur is not None:
849 if cur == child_obj:
850 self._remove_node_from_tree(prev, prev_ref, cur, last)
851 found = True
852 break
853
854 # Go to the next node
855 prev_ref = cur_ref
856 prev = cur
857 if NameObject("/Next") in cur:
858 cur_ref = cur[NameObject("/Next")]
859 cur = cur_ref.get_object()
860 else:
861 cur_ref = None
862 cur = None
863
864 if not found:
865 raise ValueError("Removal couldn't find item in tree")
866
867 _reset_node_tree_relationship(child_obj)
868
869 def remove_from_tree(self) -> None:
870 """Remove the object from the tree it is in."""
871 if NameObject("/Parent") not in self:
872 raise ValueError("Removed child does not appear to be a tree item")
873 cast("TreeObject", self["/Parent"]).remove_child(self)
874
875 def empty_tree(self) -> None:
876 for child in self:
877 child_obj = child.get_object()
878 _reset_node_tree_relationship(child_obj)
879
880 if NameObject("/Count") in self:
881 del self[NameObject("/Count")]
882 if NameObject("/First") in self:
883 del self[NameObject("/First")]
884 if NameObject("/Last") in self:
885 del self[NameObject("/Last")]
886
887
888def _reset_node_tree_relationship(child_obj: Any) -> None:
889 """
890 Call this after a node has been removed from a tree.
891
892 This resets the nodes attributes in respect to that tree.
893
894 Args:
895 child_obj:
896
897 """
898 del child_obj[NameObject("/Parent")]
899 if NameObject("/Next") in child_obj:
900 del child_obj[NameObject("/Next")]
901 if NameObject("/Prev") in child_obj:
902 del child_obj[NameObject("/Prev")]
903
904
905class StreamObject(DictionaryObject):
906 def __init__(self) -> None:
907 self._data: bytes = b""
908 self.decoded_self: Optional[DecodedStreamObject] = None
909
910 def replicate(
911 self,
912 pdf_dest: PdfWriterProtocol,
913 ) -> "StreamObject":
914 d__ = cast(
915 "StreamObject",
916 self._reference_clone(self.__class__(), pdf_dest, False),
917 )
918 d__._data = self._data
919 try:
920 decoded_self = self.decoded_self
921 if decoded_self is None:
922 self.decoded_self = None
923 else:
924 self.decoded_self = cast(
925 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
926 )
927 except Exception:
928 pass
929 for k, v in self.items():
930 d__[k.replicate(pdf_dest)] = (
931 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
932 )
933 return d__
934
935 def _clone(
936 self,
937 src: DictionaryObject,
938 pdf_dest: PdfWriterProtocol,
939 force_duplicate: bool,
940 ignore_fields: Optional[Sequence[Union[str, int]]],
941 visited: set[tuple[int, int]],
942 ) -> None:
943 """
944 Update the object from src.
945
946 Args:
947 src:
948 pdf_dest:
949 force_duplicate:
950 ignore_fields:
951
952 """
953 self._data = cast("StreamObject", src)._data
954 try:
955 decoded_self = cast("StreamObject", src).decoded_self
956 if decoded_self is None:
957 self.decoded_self = None
958 else:
959 self.decoded_self = cast(
960 "DecodedStreamObject",
961 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
962 )
963 except Exception:
964 pass
965 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
966
967 def hash_bin(self) -> int:
968 """
969 Used to detect modified object.
970
971 Returns:
972 Hash considering type and value.
973
974 """
975 # Use _data to prevent errors on non-decoded streams.
976 return hash((super().hash_bin(), self._data))
977
978 def get_data(self) -> bytes:
979 return self._data
980
981 def set_data(self, data: bytes) -> None:
982 self._data = data
983
984 def hash_value_data(self) -> bytes:
985 data = super().hash_value_data()
986 data += self.get_data()
987 return data
988
989 def write_to_stream(
990 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
991 ) -> None:
992 if encryption_key is not None: # deprecated
993 deprecation_no_replacement(
994 "the encryption_key parameter of write_to_stream", "5.0.0"
995 )
996 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data))
997 DictionaryObject.write_to_stream(self, stream)
998 del self[SA.LENGTH]
999 stream.write(b"\nstream\n")
1000 stream.write(self._data)
1001 stream.write(b"\nendstream")
1002
1003 @staticmethod
1004 def initialize_from_dictionary(
1005 data: dict[str, Any]
1006 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1007 retval: Union[EncodedStreamObject, DecodedStreamObject]
1008 if SA.FILTER in data:
1009 retval = EncodedStreamObject()
1010 else:
1011 retval = DecodedStreamObject()
1012 retval._data = data["__streamdata__"]
1013 del data["__streamdata__"]
1014 if SA.LENGTH in data:
1015 del data[SA.LENGTH]
1016 retval.update(data)
1017 return retval
1018
1019 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1020 from ..filters import FlateDecode # noqa: PLC0415
1021
1022 if SA.FILTER in self:
1023 f = self[SA.FILTER]
1024 if isinstance(f, ArrayObject):
1025 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1026 try:
1027 params = ArrayObject(
1028 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())]
1029 )
1030 except TypeError:
1031 # case of error where the * operator is not working (not an array
1032 params = ArrayObject(
1033 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())]
1034 )
1035 else:
1036 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1037 params = ArrayObject(
1038 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())]
1039 )
1040 else:
1041 f = NameObject(FT.FLATE_DECODE)
1042 params = None
1043 retval = EncodedStreamObject()
1044 retval.update(self)
1045 retval[NameObject(SA.FILTER)] = f
1046 if params is not None:
1047 retval[NameObject(SA.DECODE_PARMS)] = params
1048 retval._data = FlateDecode.encode(self._data, level)
1049 return retval
1050
1051 def decode_as_image(self, pillow_parameters: Union[dict[str, Any], None] = None) -> Any:
1052 """
1053 Try to decode the stream object as an image
1054
1055 Args:
1056 pillow_parameters: parameters provided to Pillow Image.save() method,
1057 cf. <https://pillow.readthedocs.io/en/stable/reference/Image.html#PIL.Image.Image.save>
1058
1059 Returns:
1060 a PIL image if proper decoding has been found
1061 Raises:
1062 Exception: Errors during decoding will be reported.
1063 It is recommended to catch exceptions to prevent
1064 stops in your program.
1065
1066 """
1067 from .._xobj_image_helpers import _xobj_to_image # noqa: PLC0415
1068
1069 if self.get("/Subtype", "") != "/Image":
1070 try:
1071 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover
1072 except AttributeError:
1073 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover
1074 logger_warning(msg, __name__)
1075 extension, _, img = _xobj_to_image(self, pillow_parameters)
1076 if extension is None:
1077 return None # pragma: no cover
1078 return img
1079
1080
1081class DecodedStreamObject(StreamObject):
1082 pass
1083
1084
1085class EncodedStreamObject(StreamObject):
1086 def __init__(self) -> None:
1087 self.decoded_self: Optional[DecodedStreamObject] = None
1088
1089 # This overrides the parent method
1090 def get_data(self) -> bytes:
1091 from ..filters import decode_stream_data # noqa: PLC0415
1092
1093 if self.decoded_self is not None:
1094 # Cached version of decoded object
1095 return self.decoded_self.get_data()
1096
1097 # Create decoded object
1098 decoded = DecodedStreamObject()
1099 decoded.set_data(decode_stream_data(self))
1100 for key, value in self.items():
1101 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS):
1102 decoded[key] = value
1103 self.decoded_self = decoded
1104 return decoded.get_data()
1105
1106 # This overrides the parent method:
1107 def set_data(self, data: bytes) -> None:
1108 from ..filters import FlateDecode # noqa: PLC0415
1109
1110 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1111 if not isinstance(data, bytes):
1112 raise TypeError("Data must be bytes")
1113 if self.decoded_self is None:
1114 self.get_data() # to create self.decoded_self
1115 assert self.decoded_self is not None, "mypy"
1116 self.decoded_self.set_data(data)
1117 super().set_data(FlateDecode.encode(data))
1118 else:
1119 raise PdfReadError(
1120 "Streams encoded with a filter different from FlateDecode are not supported"
1121 )
1122
1123
1124class ContentStream(DecodedStreamObject):
1125 """
1126 In order to be fast, this data structure can contain either:
1127
1128 * raw data in ._data
1129 * parsed stream operations in ._operations.
1130
1131 At any time, ContentStream object can either have both of those fields defined,
1132 or one field defined and the other set to None.
1133
1134 These fields are "rebuilt" lazily, when accessed:
1135
1136 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1137 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1138
1139 Conversely, these fields can be invalidated:
1140
1141 * when .set_data() is called, ._operations is set to None.
1142 * when .operations is set, ._data is set to None.
1143 """
1144
1145 def __init__(
1146 self,
1147 stream: Any,
1148 pdf: Any,
1149 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1150 ) -> None:
1151 self.pdf = pdf
1152 self._operations: list[tuple[Any, bytes]] = []
1153
1154 # stream may be a StreamObject or an ArrayObject containing
1155 # StreamObjects to be concatenated together.
1156 if stream is None:
1157 super().set_data(b"")
1158 else:
1159 stream = stream.get_object()
1160 if isinstance(stream, ArrayObject):
1161 data = b""
1162 for s in stream:
1163 s_resolved = s.get_object()
1164 if isinstance(s_resolved, NullObject):
1165 continue
1166 if not isinstance(s_resolved, StreamObject):
1167 # No need to emit an exception here for now - the PDF structure
1168 # seems to already be broken beforehand in these cases.
1169 logger_warning(
1170 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.",
1171 __name__
1172 )
1173 else:
1174 data += s_resolved.get_data()
1175 if len(data) == 0 or data[-1] != b"\n":
1176 data += b"\n"
1177 super().set_data(bytes(data))
1178 else:
1179 stream_data = stream.get_data()
1180 assert stream_data is not None
1181 super().set_data(stream_data)
1182 self.forced_encoding = forced_encoding
1183
1184 def replicate(
1185 self,
1186 pdf_dest: PdfWriterProtocol,
1187 ) -> "ContentStream":
1188 d__ = cast(
1189 "ContentStream",
1190 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1191 )
1192 d__._data = self._data
1193 try:
1194 decoded_self = self.decoded_self
1195 if decoded_self is None:
1196 self.decoded_self = None
1197 else:
1198 self.decoded_self = cast(
1199 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1200 )
1201 except Exception:
1202 pass
1203 for k, v in self.items():
1204 d__[k.replicate(pdf_dest)] = (
1205 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1206 )
1207 return d__
1208 d__.set_data(self._data)
1209 d__.pdf = pdf_dest
1210 d__._operations = list(self._operations)
1211 d__.forced_encoding = self.forced_encoding
1212 return d__
1213
1214 def clone(
1215 self,
1216 pdf_dest: Any,
1217 force_duplicate: bool = False,
1218 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1219 ) -> "ContentStream":
1220 """
1221 Clone object into pdf_dest.
1222
1223 Args:
1224 pdf_dest:
1225 force_duplicate:
1226 ignore_fields:
1227
1228 Returns:
1229 The cloned ContentStream
1230
1231 """
1232 try:
1233 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
1234 return self
1235 except Exception:
1236 pass
1237
1238 visited: set[tuple[int, int]] = set()
1239 d__ = cast(
1240 "ContentStream",
1241 self._reference_clone(
1242 self.__class__(None, None), pdf_dest, force_duplicate
1243 ),
1244 )
1245 if ignore_fields is None:
1246 ignore_fields = []
1247 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1248 return d__
1249
1250 def _clone(
1251 self,
1252 src: DictionaryObject,
1253 pdf_dest: PdfWriterProtocol,
1254 force_duplicate: bool,
1255 ignore_fields: Optional[Sequence[Union[str, int]]],
1256 visited: set[tuple[int, int]],
1257 ) -> None:
1258 """
1259 Update the object from src.
1260
1261 Args:
1262 src:
1263 pdf_dest:
1264 force_duplicate:
1265 ignore_fields:
1266
1267 """
1268 src_cs = cast("ContentStream", src)
1269 super().set_data(src_cs._data)
1270 self.pdf = pdf_dest
1271 self._operations = list(src_cs._operations)
1272 self.forced_encoding = src_cs.forced_encoding
1273 # no need to call DictionaryObjection or anything
1274 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1275
1276 def _parse_content_stream(self, stream: StreamType) -> None:
1277 # 7.8.2 Content Streams
1278 stream.seek(0, 0)
1279 operands: list[Union[int, str, PdfObject]] = []
1280 while True:
1281 peek = read_non_whitespace(stream)
1282 if peek in (b"", 0):
1283 break
1284 stream.seek(-1, 1)
1285 if peek.isalpha() or peek in (b"'", b'"'):
1286 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1287 if operator == b"BI":
1288 # begin inline image - a completely different parsing
1289 # mechanism is required, of course... thanks buddy...
1290 assert operands == []
1291 ii = self._read_inline_image(stream)
1292 self._operations.append((ii, b"INLINE IMAGE"))
1293 else:
1294 self._operations.append((operands, operator))
1295 operands = []
1296 elif peek == b"%":
1297 # If we encounter a comment in the content stream, we have to
1298 # handle it here. Typically, read_object will handle
1299 # encountering a comment -- but read_object assumes that
1300 # following the comment must be the object we're trying to
1301 # read. In this case, it could be an operator instead.
1302 while peek not in (b"\r", b"\n", b""):
1303 peek = stream.read(1)
1304 else:
1305 operands.append(read_object(stream, None, self.forced_encoding))
1306
1307 def _read_inline_image(self, stream: StreamType) -> dict[str, Any]:
1308 # begin reading just after the "BI" - begin image
1309 # first read the dictionary of settings.
1310 settings = DictionaryObject()
1311 while True:
1312 tok = read_non_whitespace(stream)
1313 stream.seek(-1, 1)
1314 if tok == b"I":
1315 # "ID" - begin of image data
1316 break
1317 key = read_object(stream, self.pdf)
1318 tok = read_non_whitespace(stream)
1319 stream.seek(-1, 1)
1320 value = read_object(stream, self.pdf)
1321 settings[key] = value
1322 # left at beginning of ID
1323 tmp = stream.read(3)
1324 assert tmp[:2] == b"ID"
1325 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1326 savpos = stream.tell()
1327 if isinstance(filtr, list):
1328 filtr = filtr[0] # used forencoding
1329 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1330 data = extract_inline__ascii_hex_decode(stream)
1331 elif "A85" in filtr or "ASCII85Decode" in filtr:
1332 data = extract_inline__ascii85_decode(stream)
1333 elif "RL" in filtr or "RunLengthDecode" in filtr:
1334 data = extract_inline__run_length_decode(stream)
1335 elif "DCT" in filtr or "DCTDecode" in filtr:
1336 data = extract_inline__dct_decode(stream)
1337 elif filtr == "not set":
1338 cs = settings.get("/CS", "")
1339 if isinstance(cs, list):
1340 cs = cs[0]
1341 if "RGB" in cs:
1342 lcs = 3
1343 elif "CMYK" in cs:
1344 lcs = 4
1345 else:
1346 bits = settings.get(
1347 "/BPC",
1348 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1349 )
1350 if bits > 0:
1351 lcs = bits / 8.0
1352 else:
1353 data = extract_inline_default(stream)
1354 lcs = -1
1355 if lcs > 0:
1356 data = stream.read(
1357 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1358 )
1359 # Move to the `EI` if possible.
1360 ei = read_non_whitespace(stream)
1361 stream.seek(-1, 1)
1362 else:
1363 data = extract_inline_default(stream)
1364
1365 ei = stream.read(3)
1366 stream.seek(-1, 1)
1367 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1368 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1369 stream.seek(savpos, 0)
1370 data = extract_inline_default(stream)
1371 ei = stream.read(3)
1372 stream.seek(-1, 1)
1373 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1374 # Check the same condition again. This should never fail as
1375 # edge cases are covered by `extract_inline_default` above,
1376 # but check this ot make sure that we are behind the `EI` afterwards.
1377 raise PdfStreamError(
1378 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1379 )
1380 return {"settings": settings, "data": data}
1381
1382 # This overrides the parent method
1383 def get_data(self) -> bytes:
1384 if not self._data:
1385 new_data = BytesIO()
1386 for operands, operator in self._operations:
1387 if operator == b"INLINE IMAGE":
1388 new_data.write(b"BI")
1389 dict_text = BytesIO()
1390 operands["settings"].write_to_stream(dict_text)
1391 new_data.write(dict_text.getvalue()[2:-2])
1392 new_data.write(b"ID ")
1393 new_data.write(operands["data"])
1394 new_data.write(b"EI")
1395 else:
1396 for op in operands:
1397 op.write_to_stream(new_data)
1398 new_data.write(b" ")
1399 new_data.write(operator)
1400 new_data.write(b"\n")
1401 self._data = new_data.getvalue()
1402 return self._data
1403
1404 # This overrides the parent method
1405 def set_data(self, data: bytes) -> None:
1406 super().set_data(data)
1407 self._operations = []
1408
1409 @property
1410 def operations(self) -> list[tuple[Any, bytes]]:
1411 if not self._operations and self._data:
1412 self._parse_content_stream(BytesIO(self._data))
1413 self._data = b""
1414 return self._operations
1415
1416 @operations.setter
1417 def operations(self, operations: list[tuple[Any, bytes]]) -> None:
1418 self._operations = operations
1419 self._data = b""
1420
1421 def isolate_graphics_state(self) -> None:
1422 if self._operations:
1423 self._operations.insert(0, ([], b"q"))
1424 self._operations.append(([], b"Q"))
1425 elif self._data:
1426 self._data = b"q\n" + self._data + b"\nQ\n"
1427
1428 # This overrides the parent method
1429 def write_to_stream(
1430 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1431 ) -> None:
1432 if not self._data and self._operations:
1433 self.get_data() # this ensures ._data is rebuilt
1434 super().write_to_stream(stream, encryption_key)
1435
1436
1437def read_object(
1438 stream: StreamType,
1439 pdf: Optional[PdfReaderProtocol],
1440 forced_encoding: Union[None, str, list[str], dict[int, str]] = None,
1441) -> Union[PdfObject, int, str, ContentStream]:
1442 tok = stream.read(1)
1443 stream.seek(-1, 1) # reset to start
1444 if tok == b"/":
1445 return NameObject.read_from_stream(stream, pdf)
1446 if tok == b"<":
1447 # hexadecimal string OR dictionary
1448 peek = stream.read(2)
1449 stream.seek(-2, 1) # reset to start
1450 if peek == b"<<":
1451 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1452 return read_hex_string_from_stream(stream, forced_encoding)
1453 if tok == b"[":
1454 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1455 if tok in (b"t", b"f"):
1456 return BooleanObject.read_from_stream(stream)
1457 if tok == b"(":
1458 return read_string_from_stream(stream, forced_encoding)
1459 if tok == b"e" and stream.read(6) == b"endobj":
1460 return NullObject()
1461 if tok == b"n":
1462 return NullObject.read_from_stream(stream)
1463 if tok == b"%":
1464 # comment
1465 skip_over_comment(stream)
1466 tok = read_non_whitespace(stream)
1467 stream.seek(-1, 1)
1468 return read_object(stream, pdf, forced_encoding)
1469 if tok in b"0123456789+-.":
1470 # number object OR indirect reference
1471 peek = stream.read(20)
1472 stream.seek(-len(peek), 1) # reset to start
1473 if IndirectPattern.match(peek) is not None:
1474 assert pdf is not None, "mypy"
1475 return IndirectObject.read_from_stream(stream, pdf)
1476 return NumberObject.read_from_stream(stream)
1477 pos = stream.tell()
1478 stream.seek(-20, 1)
1479 stream_extract = stream.read(80)
1480 stream.seek(pos)
1481 read_until_whitespace(stream)
1482 raise PdfReadError(
1483 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1484 )
1485
1486
1487class Field(TreeObject):
1488 """
1489 A class representing a field dictionary.
1490
1491 This class is accessed through
1492 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1493 """
1494
1495 def __init__(self, data: DictionaryObject) -> None:
1496 DictionaryObject.__init__(self)
1497 field_attributes = (
1498 FieldDictionaryAttributes.attributes()
1499 + CheckboxRadioButtonAttributes.attributes()
1500 )
1501 self.indirect_reference = data.indirect_reference
1502 for attr in field_attributes:
1503 try:
1504 self[NameObject(attr)] = data[attr]
1505 except KeyError:
1506 pass
1507 if isinstance(self.get("/V"), EncodedStreamObject):
1508 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1509 if isinstance(d, bytes):
1510 d_str = d.decode()
1511 elif d is None:
1512 d_str = ""
1513 else:
1514 raise Exception("Should never happen")
1515 self[NameObject("/V")] = TextStringObject(d_str)
1516
1517 # TABLE 8.69 Entries common to all field dictionaries
1518 @property
1519 def field_type(self) -> Optional[NameObject]:
1520 """Read-only property accessing the type of this field."""
1521 return self.get(FieldDictionaryAttributes.FT)
1522
1523 @property
1524 def parent(self) -> Optional[DictionaryObject]:
1525 """Read-only property accessing the parent of this field."""
1526 return self.get(FieldDictionaryAttributes.Parent)
1527
1528 @property
1529 def kids(self) -> Optional["ArrayObject"]:
1530 """Read-only property accessing the kids of this field."""
1531 return self.get(FieldDictionaryAttributes.Kids)
1532
1533 @property
1534 def name(self) -> Optional[str]:
1535 """Read-only property accessing the name of this field."""
1536 return self.get(FieldDictionaryAttributes.T)
1537
1538 @property
1539 def alternate_name(self) -> Optional[str]:
1540 """Read-only property accessing the alternate name of this field."""
1541 return self.get(FieldDictionaryAttributes.TU)
1542
1543 @property
1544 def mapping_name(self) -> Optional[str]:
1545 """
1546 Read-only property accessing the mapping name of this field.
1547
1548 This name is used by pypdf as a key in the dictionary returned by
1549 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1550 """
1551 return self.get(FieldDictionaryAttributes.TM)
1552
1553 @property
1554 def flags(self) -> Optional[int]:
1555 """
1556 Read-only property accessing the field flags, specifying various
1557 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1558 """
1559 return self.get(FieldDictionaryAttributes.Ff)
1560
1561 @property
1562 def value(self) -> Optional[Any]:
1563 """
1564 Read-only property accessing the value of this field.
1565
1566 Format varies based on field type.
1567 """
1568 return self.get(FieldDictionaryAttributes.V)
1569
1570 @property
1571 def default_value(self) -> Optional[Any]:
1572 """Read-only property accessing the default value of this field."""
1573 return self.get(FieldDictionaryAttributes.DV)
1574
1575 @property
1576 def additional_actions(self) -> Optional[DictionaryObject]:
1577 """
1578 Read-only property accessing the additional actions dictionary.
1579
1580 This dictionary defines the field's behavior in response to trigger
1581 events. See Section 8.5.2 of the PDF 1.7 reference.
1582 """
1583 return self.get(FieldDictionaryAttributes.AA)
1584
1585
1586class Destination(TreeObject):
1587 """
1588 A class representing a destination within a PDF file.
1589
1590 See section 12.3.2 of the PDF 2.0 reference.
1591
1592 Args:
1593 title: Title of this destination.
1594 page: Reference to the page of this destination. Should
1595 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1596 fit: How the destination is displayed.
1597
1598 Raises:
1599 PdfReadError: If destination type is invalid.
1600
1601 """
1602
1603 node: Optional[
1604 DictionaryObject
1605 ] = None # node provide access to the original Object
1606
1607 def __init__(
1608 self,
1609 title: Union[str, bytes],
1610 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1611 fit: Fit,
1612 ) -> None:
1613 self._filtered_children: list[Any] = [] # used in PdfWriter
1614
1615 typ = fit.fit_type
1616 args = fit.fit_args
1617
1618 DictionaryObject.__init__(self)
1619 self[NameObject("/Title")] = TextStringObject(title)
1620 self[NameObject("/Page")] = page
1621 self[NameObject("/Type")] = typ
1622
1623 # from table 8.2 of the PDF 1.7 reference.
1624 if typ == "/XYZ":
1625 if len(args) < 1: # left is missing : should never occur
1626 args.append(NumberObject(0.0))
1627 if len(args) < 2: # top is missing
1628 args.append(NumberObject(0.0))
1629 if len(args) < 3: # zoom is missing
1630 args.append(NumberObject(0.0))
1631 (
1632 self[NameObject(TA.LEFT)],
1633 self[NameObject(TA.TOP)],
1634 self[NameObject("/Zoom")],
1635 ) = args
1636 elif len(args) == 0:
1637 pass
1638 elif typ == TF.FIT_R:
1639 (
1640 self[NameObject(TA.LEFT)],
1641 self[NameObject(TA.BOTTOM)],
1642 self[NameObject(TA.RIGHT)],
1643 self[NameObject(TA.TOP)],
1644 ) = args
1645 elif typ in [TF.FIT_H, TF.FIT_BH]:
1646 try: # Prefer to be more robust not only to null parameters
1647 (self[NameObject(TA.TOP)],) = args
1648 except Exception:
1649 (self[NameObject(TA.TOP)],) = (NullObject(),)
1650 elif typ in [TF.FIT_V, TF.FIT_BV]:
1651 try: # Prefer to be more robust not only to null parameters
1652 (self[NameObject(TA.LEFT)],) = args
1653 except Exception:
1654 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1655 elif typ in [TF.FIT, TF.FIT_B]:
1656 pass
1657 else:
1658 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1659
1660 @property
1661 def dest_array(self) -> "ArrayObject":
1662 return ArrayObject(
1663 [self.raw_get("/Page"), self["/Type"]]
1664 + [
1665 self[x]
1666 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1667 if x in self
1668 ]
1669 )
1670
1671 def write_to_stream(
1672 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1673 ) -> None:
1674 if encryption_key is not None: # deprecated
1675 deprecation_no_replacement(
1676 "the encryption_key parameter of write_to_stream", "5.0.0"
1677 )
1678 stream.write(b"<<\n")
1679 key = NameObject("/D")
1680 key.write_to_stream(stream)
1681 stream.write(b" ")
1682 value = self.dest_array
1683 value.write_to_stream(stream)
1684
1685 key = NameObject("/S")
1686 key.write_to_stream(stream)
1687 stream.write(b" ")
1688 value_s = NameObject("/GoTo")
1689 value_s.write_to_stream(stream)
1690
1691 stream.write(b"\n")
1692 stream.write(b">>")
1693
1694 @property
1695 def title(self) -> Optional[str]:
1696 """Read-only property accessing the destination title."""
1697 return self.get("/Title")
1698
1699 @property
1700 def page(self) -> Optional[IndirectObject]:
1701 """Read-only property accessing the IndirectObject of the destination page."""
1702 return self.get("/Page")
1703
1704 @property
1705 def typ(self) -> Optional[str]:
1706 """Read-only property accessing the destination type."""
1707 return self.get("/Type")
1708
1709 @property
1710 def zoom(self) -> Optional[int]:
1711 """Read-only property accessing the zoom factor."""
1712 return self.get("/Zoom", None)
1713
1714 @property
1715 def left(self) -> Optional[FloatObject]:
1716 """Read-only property accessing the left horizontal coordinate."""
1717 return self.get("/Left", None)
1718
1719 @property
1720 def right(self) -> Optional[FloatObject]:
1721 """Read-only property accessing the right horizontal coordinate."""
1722 return self.get("/Right", None)
1723
1724 @property
1725 def top(self) -> Optional[FloatObject]:
1726 """Read-only property accessing the top vertical coordinate."""
1727 return self.get("/Top", None)
1728
1729 @property
1730 def bottom(self) -> Optional[FloatObject]:
1731 """Read-only property accessing the bottom vertical coordinate."""
1732 return self.get("/Bottom", None)
1733
1734 @property
1735 def color(self) -> Optional["ArrayObject"]:
1736 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1737 return self.get(
1738 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])
1739 )
1740
1741 @property
1742 def font_format(self) -> Optional[OutlineFontFlag]:
1743 """
1744 Read-only property accessing the font type.
1745
1746 1=italic, 2=bold, 3=both
1747 """
1748 return self.get("/F", 0)
1749
1750 @property
1751 def outline_count(self) -> Optional[int]:
1752 """
1753 Read-only property accessing the outline count.
1754
1755 positive = expanded
1756 negative = collapsed
1757 absolute value = number of visible descendants at all levels
1758 """
1759 return self.get("/Count", None)