1# Copyright (c) 2006, Mathieu Fenniak
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted provided that the following conditions are
6# met:
7#
8# * Redistributions of source code must retain the above copyright notice,
9# this list of conditions and the following disclaimer.
10# * Redistributions in binary form must reproduce the above copyright notice,
11# this list of conditions and the following disclaimer in the documentation
12# and/or other materials provided with the distribution.
13# * The name of the author may not be used to endorse or promote products
14# derived from this software without specific prior written permission.
15#
16# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
19# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
20# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
21# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
22# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
23# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
24# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
25# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
26# POSSIBILITY OF SUCH DAMAGE.
27
28
29__author__ = "Mathieu Fenniak"
30__author_email__ = "biziqe@mathieu.fenniak.net"
31
32import logging
33import re
34import sys
35from io import BytesIO
36from math import ceil
37from typing import (
38 Any,
39 Callable,
40 Dict,
41 Iterable,
42 List,
43 Optional,
44 Sequence,
45 Set,
46 Tuple,
47 Union,
48 cast,
49)
50
51from .._protocols import PdfReaderProtocol, PdfWriterProtocol, XmpInformationProtocol
52from .._utils import (
53 WHITESPACES,
54 StreamType,
55 deprecation_no_replacement,
56 deprecation_with_replacement,
57 logger_warning,
58 read_non_whitespace,
59 read_until_regex,
60 read_until_whitespace,
61 skip_over_comment,
62)
63from ..constants import (
64 CheckboxRadioButtonAttributes,
65 FieldDictionaryAttributes,
66 OutlineFontFlag,
67)
68from ..constants import FilterTypes as FT
69from ..constants import StreamAttributes as SA
70from ..constants import TypArguments as TA
71from ..constants import TypFitArguments as TF
72from ..errors import STREAM_TRUNCATED_PREMATURELY, PdfReadError, PdfStreamError
73from ._base import (
74 BooleanObject,
75 ByteStringObject,
76 FloatObject,
77 IndirectObject,
78 NameObject,
79 NullObject,
80 NumberObject,
81 PdfObject,
82 TextStringObject,
83 is_null_or_none,
84)
85from ._fit import Fit
86from ._image_inline import (
87 extract_inline_A85,
88 extract_inline_AHx,
89 extract_inline_DCT,
90 extract_inline_default,
91 extract_inline_RL,
92)
93from ._utils import read_hex_string_from_stream, read_string_from_stream
94
95if sys.version_info >= (3, 11):
96 from typing import Self
97else:
98 from typing_extensions import Self
99
100logger = logging.getLogger(__name__)
101IndirectPattern = re.compile(rb"[+-]?(\d+)\s+(\d+)\s+R[^a-zA-Z]")
102
103
104class ArrayObject(List[Any], PdfObject):
105 def replicate(
106 self,
107 pdf_dest: PdfWriterProtocol,
108 ) -> "ArrayObject":
109 arr = cast(
110 "ArrayObject",
111 self._reference_clone(ArrayObject(), pdf_dest, False),
112 )
113 for data in self:
114 if hasattr(data, "replicate"):
115 arr.append(data.replicate(pdf_dest))
116 else:
117 arr.append(data)
118 return arr
119
120 def clone(
121 self,
122 pdf_dest: PdfWriterProtocol,
123 force_duplicate: bool = False,
124 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
125 ) -> "ArrayObject":
126 """Clone object into pdf_dest."""
127 try:
128 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
129 return self
130 except Exception:
131 pass
132 arr = cast(
133 "ArrayObject",
134 self._reference_clone(ArrayObject(), pdf_dest, force_duplicate),
135 )
136 for data in self:
137 if isinstance(data, StreamObject):
138 dup = data._reference_clone(
139 data.clone(pdf_dest, force_duplicate, ignore_fields),
140 pdf_dest,
141 force_duplicate,
142 )
143 arr.append(dup.indirect_reference)
144 elif hasattr(data, "clone"):
145 arr.append(data.clone(pdf_dest, force_duplicate, ignore_fields))
146 else:
147 arr.append(data)
148 return arr
149
150 def hash_bin(self) -> int:
151 """
152 Used to detect modified object.
153
154 Returns:
155 Hash considering type and value.
156
157 """
158 return hash((self.__class__, tuple(x.hash_bin() for x in self)))
159
160 def items(self) -> Iterable[Any]:
161 """Emulate DictionaryObject.items for a list (index, object)."""
162 return enumerate(self)
163
164 def _to_lst(self, lst: Any) -> List[Any]:
165 # Convert to list, internal
166 if isinstance(lst, (list, tuple, set)):
167 pass
168 elif isinstance(lst, PdfObject):
169 lst = [lst]
170 elif isinstance(lst, str):
171 if lst[0] == "/":
172 lst = [NameObject(lst)]
173 else:
174 lst = [TextStringObject(lst)]
175 elif isinstance(lst, bytes):
176 lst = [ByteStringObject(lst)]
177 else: # for numbers,...
178 lst = [lst]
179 return lst
180
181 def __add__(self, lst: Any) -> "ArrayObject":
182 """
183 Allow extension by adding list or add one element only
184
185 Args:
186 lst: any list, tuples are extended the list.
187 other types(numbers,...) will be appended.
188 if str is passed it will be converted into TextStringObject
189 or NameObject (if starting with "/")
190 if bytes is passed it will be converted into ByteStringObject
191
192 Returns:
193 ArrayObject with all elements
194
195 """
196 temp = ArrayObject(self)
197 temp.extend(self._to_lst(lst))
198 return temp
199
200 def __iadd__(self, lst: Any) -> Self:
201 """
202 Allow extension by adding list or add one element only
203
204 Args:
205 lst: any list, tuples are extended the list.
206 other types(numbers,...) will be appended.
207 if str is passed it will be converted into TextStringObject
208 or NameObject (if starting with "/")
209 if bytes is passed it will be converted into ByteStringObject
210
211 """
212 self.extend(self._to_lst(lst))
213 return self
214
215 def __isub__(self, lst: Any) -> Self:
216 """Allow to remove items"""
217 for x in self._to_lst(lst):
218 try:
219 x = self.index(x)
220 del self[x]
221 except ValueError:
222 pass
223 return self
224
225 def write_to_stream(
226 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
227 ) -> None:
228 if encryption_key is not None: # deprecated
229 deprecation_no_replacement(
230 "the encryption_key parameter of write_to_stream", "5.0.0"
231 )
232 stream.write(b"[")
233 for data in self:
234 stream.write(b" ")
235 data.write_to_stream(stream)
236 stream.write(b" ]")
237
238 @staticmethod
239 def read_from_stream(
240 stream: StreamType,
241 pdf: Optional[PdfReaderProtocol],
242 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None,
243 ) -> "ArrayObject":
244 arr = ArrayObject()
245 tmp = stream.read(1)
246 if tmp != b"[":
247 raise PdfReadError("Could not read array")
248 while True:
249 # skip leading whitespace
250 tok = stream.read(1)
251 while tok.isspace():
252 tok = stream.read(1)
253 if tok == b"":
254 break
255 if tok == b"%":
256 stream.seek(-1, 1)
257 skip_over_comment(stream)
258 continue
259 stream.seek(-1, 1)
260 # check for array ending
261 peek_ahead = stream.read(1)
262 if peek_ahead == b"]":
263 break
264 stream.seek(-1, 1)
265 # read and append object
266 arr.append(read_object(stream, pdf, forced_encoding))
267 return arr
268
269
270class DictionaryObject(Dict[Any, Any], PdfObject):
271 def replicate(
272 self,
273 pdf_dest: PdfWriterProtocol,
274 ) -> "DictionaryObject":
275 d__ = cast(
276 "DictionaryObject",
277 self._reference_clone(self.__class__(), pdf_dest, False),
278 )
279 for k, v in self.items():
280 d__[k.replicate(pdf_dest)] = (
281 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
282 )
283 return d__
284
285 def clone(
286 self,
287 pdf_dest: PdfWriterProtocol,
288 force_duplicate: bool = False,
289 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
290 ) -> "DictionaryObject":
291 """Clone object into pdf_dest."""
292 try:
293 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
294 return self
295 except Exception:
296 pass
297
298 visited: Set[Tuple[int, int]] = set() # (idnum, generation)
299 d__ = cast(
300 "DictionaryObject",
301 self._reference_clone(self.__class__(), pdf_dest, force_duplicate),
302 )
303 if ignore_fields is None:
304 ignore_fields = []
305 if len(d__.keys()) == 0:
306 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
307 return d__
308
309 def _clone(
310 self,
311 src: "DictionaryObject",
312 pdf_dest: PdfWriterProtocol,
313 force_duplicate: bool,
314 ignore_fields: Optional[Sequence[Union[str, int]]],
315 visited: Set[Tuple[int, int]], # (idnum, generation)
316 ) -> None:
317 """
318 Update the object from src.
319
320 Args:
321 src: "DictionaryObject":
322 pdf_dest:
323 force_duplicate:
324 ignore_fields:
325
326 """
327 # first we remove for the ignore_fields
328 # that are for a limited number of levels
329 x = 0
330 assert ignore_fields is not None
331 ignore_fields = list(ignore_fields)
332 while x < len(ignore_fields):
333 if isinstance(ignore_fields[x], int):
334 if cast(int, ignore_fields[x]) <= 0:
335 del ignore_fields[x]
336 del ignore_fields[x]
337 continue
338 ignore_fields[x] -= 1 # type:ignore
339 x += 1
340 # First check if this is a chain list, we need to loop to prevent recur
341 if any(
342 field not in ignore_fields
343 and field in src
344 and isinstance(src.raw_get(field), IndirectObject)
345 and isinstance(src[field], DictionaryObject)
346 and (
347 src.get("/Type", None) is None
348 or cast(DictionaryObject, src[field]).get("/Type", None) is None
349 or src.get("/Type", None)
350 == cast(DictionaryObject, src[field]).get("/Type", None)
351 )
352 for field in ["/Next", "/Prev", "/N", "/V"]
353 ):
354 ignore_fields = list(ignore_fields)
355 for lst in (("/Next", "/Prev"), ("/N", "/V")):
356 for k in lst:
357 objs = []
358 if (
359 k in src
360 and k not in self
361 and isinstance(src.raw_get(k), IndirectObject)
362 and isinstance(src[k], DictionaryObject)
363 # IF need to go further the idea is to check
364 # that the types are the same:
365 and (
366 src.get("/Type", None) is None
367 or cast(DictionaryObject, src[k]).get("/Type", None) is None
368 or src.get("/Type", None)
369 == cast(DictionaryObject, src[k]).get("/Type", None)
370 )
371 ):
372 cur_obj: Optional[DictionaryObject] = cast(
373 "DictionaryObject", src[k]
374 )
375 prev_obj: Optional[DictionaryObject] = self
376 while cur_obj is not None:
377 clon = cast(
378 "DictionaryObject",
379 cur_obj._reference_clone(
380 cur_obj.__class__(), pdf_dest, force_duplicate
381 ),
382 )
383 # check to see if we've previously processed our item
384 if clon.indirect_reference is not None:
385 idnum = clon.indirect_reference.idnum
386 generation = clon.indirect_reference.generation
387 if (idnum, generation) in visited:
388 cur_obj = None
389 break
390 visited.add((idnum, generation))
391 objs.append((cur_obj, clon))
392 assert prev_obj is not None
393 prev_obj[NameObject(k)] = clon.indirect_reference
394 prev_obj = clon
395 try:
396 if cur_obj == src:
397 cur_obj = None
398 else:
399 cur_obj = cast("DictionaryObject", cur_obj[k])
400 except Exception:
401 cur_obj = None
402 for s, c in objs:
403 c._clone(
404 s, pdf_dest, force_duplicate, ignore_fields, visited
405 )
406
407 for k, v in src.items():
408 if k not in ignore_fields:
409 if isinstance(v, StreamObject):
410 if not hasattr(v, "indirect_reference"):
411 v.indirect_reference = None
412 vv = v.clone(pdf_dest, force_duplicate, ignore_fields)
413 assert vv.indirect_reference is not None
414 self[k.clone(pdf_dest)] = vv.indirect_reference
415 elif k not in self:
416 self[NameObject(k)] = (
417 v.clone(pdf_dest, force_duplicate, ignore_fields)
418 if hasattr(v, "clone")
419 else v
420 )
421
422 def hash_bin(self) -> int:
423 """
424 Used to detect modified object.
425
426 Returns:
427 Hash considering type and value.
428
429 """
430 return hash(
431 (self.__class__, tuple(((k, v.hash_bin()) for k, v in self.items())))
432 )
433
434 def raw_get(self, key: Any) -> Any:
435 return dict.__getitem__(self, key)
436
437 def get_inherited(self, key: str, default: Any = None) -> Any:
438 """
439 Returns the value of a key or from the parent if not found.
440 If not found returns default.
441
442 Args:
443 key: string identifying the field to return
444
445 default: default value to return
446
447 Returns:
448 Current key or inherited one, otherwise default value.
449
450 """
451 if key in self:
452 return self[key]
453 try:
454 if "/Parent" not in self:
455 return default
456 raise KeyError("Not present")
457 except KeyError:
458 return cast("DictionaryObject", self["/Parent"].get_object()).get_inherited(
459 key, default
460 )
461
462 def __setitem__(self, key: Any, value: Any) -> Any:
463 if not isinstance(key, PdfObject):
464 raise ValueError("Key must be a PdfObject")
465 if not isinstance(value, PdfObject):
466 raise ValueError("Value must be a PdfObject")
467 return dict.__setitem__(self, key, value)
468
469 def setdefault(self, key: Any, value: Optional[Any] = None) -> Any:
470 if not isinstance(key, PdfObject):
471 raise ValueError("Key must be a PdfObject")
472 if not isinstance(value, PdfObject):
473 raise ValueError("Value must be a PdfObject")
474 return dict.setdefault(self, key, value) # type: ignore
475
476 def __getitem__(self, key: Any) -> PdfObject:
477 return dict.__getitem__(self, key).get_object()
478
479 @property
480 def xmp_metadata(self) -> Optional[XmpInformationProtocol]:
481 """
482 Retrieve XMP (Extensible Metadata Platform) data relevant to the this
483 object, if available.
484
485 See Table 347 — Additional entries in a metadata stream dictionary.
486
487 Returns:
488 Returns a :class:`~pypdf.xmp.XmpInformation` instance
489 that can be used to access XMP metadata from the document. Can also
490 return None if no metadata was found on the document root.
491
492 """
493 from ..xmp import XmpInformation # noqa: PLC0415
494
495 metadata = self.get("/Metadata", None)
496 if is_null_or_none(metadata):
497 return None
498 assert metadata is not None, "mypy"
499 metadata = metadata.get_object()
500
501 if not isinstance(metadata, XmpInformation):
502 metadata = XmpInformation(metadata)
503 self[NameObject("/Metadata")] = metadata
504 return metadata
505
506 def write_to_stream(
507 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
508 ) -> None:
509 if encryption_key is not None: # deprecated
510 deprecation_no_replacement(
511 "the encryption_key parameter of write_to_stream", "5.0.0"
512 )
513 stream.write(b"<<\n")
514 for key, value in self.items():
515 if len(key) > 2 and key[1] == "%" and key[-1] == "%":
516 continue
517 key.write_to_stream(stream, encryption_key)
518 stream.write(b" ")
519 value.write_to_stream(stream)
520 stream.write(b"\n")
521 stream.write(b">>")
522
523 @staticmethod
524 def read_from_stream(
525 stream: StreamType,
526 pdf: Optional[PdfReaderProtocol],
527 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None,
528 ) -> "DictionaryObject":
529 def get_next_obj_pos(
530 p: int, p1: int, rem_gens: List[int], pdf: PdfReaderProtocol
531 ) -> int:
532 out = p1
533 for gen in rem_gens:
534 loc = pdf.xref[gen]
535 try:
536 values = [x for x in loc.values() if p < x <= p1]
537 if values:
538 out = min(out, *values)
539 except ValueError:
540 pass
541 return out
542
543 def read_unsized_from_stream(
544 stream: StreamType, pdf: PdfReaderProtocol
545 ) -> bytes:
546 # we are just pointing at beginning of the stream
547 eon = get_next_obj_pos(stream.tell(), 2**32, list(pdf.xref), pdf) - 1
548 curr = stream.tell()
549 rw = stream.read(eon - stream.tell())
550 p = rw.find(b"endstream")
551 if p < 0:
552 raise PdfReadError(
553 f"Unable to find 'endstream' marker for obj starting at {curr}."
554 )
555 stream.seek(curr + p + 9)
556 return rw[: p - 1]
557
558 tmp = stream.read(2)
559 if tmp != b"<<":
560 raise PdfReadError(
561 f"Dictionary read error at byte {hex(stream.tell())}: "
562 "stream must begin with '<<'"
563 )
564 data: Dict[Any, Any] = {}
565 while True:
566 tok = read_non_whitespace(stream)
567 if tok == b"\x00":
568 continue
569 if tok == b"%":
570 stream.seek(-1, 1)
571 skip_over_comment(stream)
572 continue
573 if not tok:
574 raise PdfStreamError(STREAM_TRUNCATED_PREMATURELY)
575
576 if tok == b">":
577 stream.read(1)
578 break
579 stream.seek(-1, 1)
580 try:
581 try:
582 key = read_object(stream, pdf)
583 if isinstance(key, NullObject):
584 break
585 if not isinstance(key, NameObject):
586 raise PdfReadError(
587 f"Expecting a NameObject for key but found {key!r}"
588 )
589 except PdfReadError as exc:
590 if pdf is not None and pdf.strict:
591 raise
592 logger_warning(exc.__repr__(), __name__)
593 continue
594 tok = read_non_whitespace(stream)
595 stream.seek(-1, 1)
596 value = read_object(stream, pdf, forced_encoding)
597 except Exception as exc:
598 if pdf is not None and pdf.strict:
599 raise PdfReadError(exc.__repr__())
600 logger_warning(exc.__repr__(), __name__)
601 retval = DictionaryObject()
602 retval.update(data)
603 return retval # return partial data
604
605 if not data.get(key):
606 data[key] = value
607 else:
608 # multiple definitions of key not permitted
609 msg = (
610 f"Multiple definitions in dictionary at byte "
611 f"{hex(stream.tell())} for key {key}"
612 )
613 if pdf is not None and pdf.strict:
614 raise PdfReadError(msg)
615 logger_warning(msg, __name__)
616
617 pos = stream.tell()
618 s = read_non_whitespace(stream)
619 if s == b"s" and stream.read(5) == b"tream":
620 eol = stream.read(1)
621 # Occasional PDF file output has spaces after 'stream' keyword but before EOL.
622 # patch provided by Danial Sandler
623 while eol == b" ":
624 eol = stream.read(1)
625 if eol not in (b"\n", b"\r"):
626 raise PdfStreamError("Stream data must be followed by a newline")
627 if eol == b"\r" and stream.read(1) != b"\n":
628 stream.seek(-1, 1)
629 # this is a stream object, not a dictionary
630 if SA.LENGTH not in data:
631 if pdf is not None and pdf.strict:
632 raise PdfStreamError("Stream length not defined")
633 logger_warning(
634 f"Stream length not defined @pos={stream.tell()}", __name__
635 )
636 data[NameObject(SA.LENGTH)] = NumberObject(-1)
637 length = data[SA.LENGTH]
638 if isinstance(length, IndirectObject):
639 t = stream.tell()
640 assert pdf is not None, "mypy"
641 length = pdf.get_object(length)
642 stream.seek(t, 0)
643 if length is None: # if the PDF is damaged
644 length = -1
645 pstart = stream.tell()
646 if length > 0:
647 data["__streamdata__"] = stream.read(length)
648 else:
649 data["__streamdata__"] = read_until_regex(
650 stream, re.compile(b"endstream")
651 )
652 e = read_non_whitespace(stream)
653 ndstream = stream.read(8)
654 if (e + ndstream) != b"endstream":
655 # the odd PDF file has a length that is too long, so
656 # we need to read backwards to find the "endstream" ending.
657 # ReportLab (unknown version) generates files with this bug,
658 # and Python users into PDF files tend to be our audience.
659 # we need to do this to correct the streamdata and chop off
660 # an extra character.
661 pos = stream.tell()
662 stream.seek(-10, 1)
663 end = stream.read(9)
664 if end == b"endstream":
665 # we found it by looking back one character further.
666 data["__streamdata__"] = data["__streamdata__"][:-1]
667 elif pdf is not None and not pdf.strict:
668 stream.seek(pstart, 0)
669 data["__streamdata__"] = read_unsized_from_stream(stream, pdf)
670 pos = stream.tell()
671 else:
672 stream.seek(pos, 0)
673 raise PdfReadError(
674 "Unable to find 'endstream' marker after stream at byte "
675 f"{hex(stream.tell())} (nd='{ndstream!r}', end='{end!r}')."
676 )
677 else:
678 stream.seek(pos, 0)
679 if "__streamdata__" in data:
680 return StreamObject.initialize_from_dictionary(data)
681 retval = DictionaryObject()
682 retval.update(data)
683 return retval
684
685
686class TreeObject(DictionaryObject):
687 def __init__(self, dct: Optional[DictionaryObject] = None) -> None:
688 DictionaryObject.__init__(self)
689 if dct:
690 self.update(dct)
691
692 def has_children(self) -> bool:
693 return "/First" in self
694
695 def __iter__(self) -> Any:
696 return self.children()
697
698 def children(self) -> Iterable[Any]:
699 if not self.has_children():
700 return
701
702 child_ref = self[NameObject("/First")]
703 child = child_ref.get_object()
704 while True:
705 yield child
706 if child == self[NameObject("/Last")]:
707 return
708 child_ref = child.get(NameObject("/Next")) # type: ignore
709 if is_null_or_none(child_ref):
710 return
711 child = child_ref.get_object()
712
713 def add_child(self, child: Any, pdf: PdfWriterProtocol) -> None:
714 self.insert_child(child, None, pdf)
715
716 def inc_parent_counter_default(
717 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
718 ) -> None:
719 if is_null_or_none(parent):
720 return
721 assert parent is not None, "mypy"
722 parent = cast("TreeObject", parent.get_object())
723 if "/Count" in parent:
724 parent[NameObject("/Count")] = NumberObject(
725 max(0, cast(int, parent[NameObject("/Count")]) + n)
726 )
727 self.inc_parent_counter_default(parent.get("/Parent", None), n)
728
729 def inc_parent_counter_outline(
730 self, parent: Union[None, IndirectObject, "TreeObject"], n: int
731 ) -> None:
732 if is_null_or_none(parent):
733 return
734 assert parent is not None, "mypy"
735 parent = cast("TreeObject", parent.get_object())
736 # BooleanObject requires comparison with == not is
737 opn = parent.get("/%is_open%", True) == True # noqa: E712
738 c = cast(int, parent.get("/Count", 0))
739 if c < 0:
740 c = abs(c)
741 parent[NameObject("/Count")] = NumberObject((c + n) * (1 if opn else -1))
742 if not opn:
743 return
744 self.inc_parent_counter_outline(parent.get("/Parent", None), n)
745
746 def insert_child(
747 self,
748 child: Any,
749 before: Any,
750 pdf: PdfWriterProtocol,
751 inc_parent_counter: Optional[Callable[..., Any]] = None,
752 ) -> IndirectObject:
753 if inc_parent_counter is None:
754 inc_parent_counter = self.inc_parent_counter_default
755 child_obj = child.get_object()
756 child = child.indirect_reference # get_reference(child_obj)
757
758 prev: Optional[DictionaryObject]
759 if "/First" not in self: # no child yet
760 self[NameObject("/First")] = child
761 self[NameObject("/Count")] = NumberObject(0)
762 self[NameObject("/Last")] = child
763 child_obj[NameObject("/Parent")] = self.indirect_reference
764 inc_parent_counter(self, child_obj.get("/Count", 1))
765 if "/Next" in child_obj:
766 del child_obj["/Next"]
767 if "/Prev" in child_obj:
768 del child_obj["/Prev"]
769 return child
770 prev = cast("DictionaryObject", self["/Last"])
771
772 while prev.indirect_reference != before:
773 if "/Next" in prev:
774 prev = cast("TreeObject", prev["/Next"])
775 else: # append at the end
776 prev[NameObject("/Next")] = cast("TreeObject", child)
777 child_obj[NameObject("/Prev")] = prev.indirect_reference
778 child_obj[NameObject("/Parent")] = self.indirect_reference
779 if "/Next" in child_obj:
780 del child_obj["/Next"]
781 self[NameObject("/Last")] = child
782 inc_parent_counter(self, child_obj.get("/Count", 1))
783 return child
784 try: # insert as first or in the middle
785 assert isinstance(prev["/Prev"], DictionaryObject)
786 prev["/Prev"][NameObject("/Next")] = child
787 child_obj[NameObject("/Prev")] = prev["/Prev"]
788 except Exception: # it means we are inserting in first position
789 del child_obj["/Next"]
790 child_obj[NameObject("/Next")] = prev
791 prev[NameObject("/Prev")] = child
792 child_obj[NameObject("/Parent")] = self.indirect_reference
793 inc_parent_counter(self, child_obj.get("/Count", 1))
794 return child
795
796 def _remove_node_from_tree(
797 self, prev: Any, prev_ref: Any, cur: Any, last: Any
798 ) -> None:
799 """
800 Adjust the pointers of the linked list and tree node count.
801
802 Args:
803 prev:
804 prev_ref:
805 cur:
806 last:
807
808 """
809 next_ref = cur.get(NameObject("/Next"), None)
810 if prev is None:
811 if next_ref:
812 # Removing first tree node
813 next_obj = next_ref.get_object()
814 del next_obj[NameObject("/Prev")]
815 self[NameObject("/First")] = next_ref
816 self[NameObject("/Count")] = NumberObject(
817 self[NameObject("/Count")] - 1 # type: ignore
818 )
819
820 else:
821 # Removing only tree node
822 self[NameObject("/Count")] = NumberObject(0)
823 del self[NameObject("/First")]
824 if NameObject("/Last") in self:
825 del self[NameObject("/Last")]
826 else:
827 if next_ref:
828 # Removing middle tree node
829 next_obj = next_ref.get_object()
830 next_obj[NameObject("/Prev")] = prev_ref
831 prev[NameObject("/Next")] = next_ref
832 else:
833 # Removing last tree node
834 assert cur == last
835 del prev[NameObject("/Next")]
836 self[NameObject("/Last")] = prev_ref
837 self[NameObject("/Count")] = NumberObject(self[NameObject("/Count")] - 1) # type: ignore
838
839 def remove_child(self, child: Any) -> None:
840 child_obj = child.get_object()
841 child = child_obj.indirect_reference
842
843 if NameObject("/Parent") not in child_obj:
844 raise ValueError("Removed child does not appear to be a tree item")
845 if child_obj[NameObject("/Parent")] != self:
846 raise ValueError("Removed child is not a member of this tree")
847
848 found = False
849 prev_ref = None
850 prev = None
851 cur_ref: Optional[Any] = self[NameObject("/First")]
852 cur: Optional[Dict[str, Any]] = cur_ref.get_object() # type: ignore
853 last_ref = self[NameObject("/Last")]
854 last = last_ref.get_object()
855 while cur is not None:
856 if cur == child_obj:
857 self._remove_node_from_tree(prev, prev_ref, cur, last)
858 found = True
859 break
860
861 # Go to the next node
862 prev_ref = cur_ref
863 prev = cur
864 if NameObject("/Next") in cur:
865 cur_ref = cur[NameObject("/Next")]
866 cur = cur_ref.get_object()
867 else:
868 cur_ref = None
869 cur = None
870
871 if not found:
872 raise ValueError("Removal couldn't find item in tree")
873
874 _reset_node_tree_relationship(child_obj)
875
876 def remove_from_tree(self) -> None:
877 """Remove the object from the tree it is in."""
878 if NameObject("/Parent") not in self:
879 raise ValueError("Removed child does not appear to be a tree item")
880 cast("TreeObject", self["/Parent"]).remove_child(self)
881
882 def empty_tree(self) -> None:
883 for child in self:
884 child_obj = child.get_object()
885 _reset_node_tree_relationship(child_obj)
886
887 if NameObject("/Count") in self:
888 del self[NameObject("/Count")]
889 if NameObject("/First") in self:
890 del self[NameObject("/First")]
891 if NameObject("/Last") in self:
892 del self[NameObject("/Last")]
893
894
895def _reset_node_tree_relationship(child_obj: Any) -> None:
896 """
897 Call this after a node has been removed from a tree.
898
899 This resets the nodes attributes in respect to that tree.
900
901 Args:
902 child_obj:
903
904 """
905 del child_obj[NameObject("/Parent")]
906 if NameObject("/Next") in child_obj:
907 del child_obj[NameObject("/Next")]
908 if NameObject("/Prev") in child_obj:
909 del child_obj[NameObject("/Prev")]
910
911
912class StreamObject(DictionaryObject):
913 def __init__(self) -> None:
914 self._data: bytes = b""
915 self.decoded_self: Optional[DecodedStreamObject] = None
916
917 def replicate(
918 self,
919 pdf_dest: PdfWriterProtocol,
920 ) -> "StreamObject":
921 d__ = cast(
922 "StreamObject",
923 self._reference_clone(self.__class__(), pdf_dest, False),
924 )
925 d__._data = self._data
926 try:
927 decoded_self = self.decoded_self
928 if decoded_self is None:
929 self.decoded_self = None
930 else:
931 self.decoded_self = cast(
932 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
933 )
934 except Exception:
935 pass
936 for k, v in self.items():
937 d__[k.replicate(pdf_dest)] = (
938 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
939 )
940 return d__
941
942 def _clone(
943 self,
944 src: DictionaryObject,
945 pdf_dest: PdfWriterProtocol,
946 force_duplicate: bool,
947 ignore_fields: Optional[Sequence[Union[str, int]]],
948 visited: Set[Tuple[int, int]],
949 ) -> None:
950 """
951 Update the object from src.
952
953 Args:
954 src:
955 pdf_dest:
956 force_duplicate:
957 ignore_fields:
958
959 """
960 self._data = cast("StreamObject", src)._data
961 try:
962 decoded_self = cast("StreamObject", src).decoded_self
963 if decoded_self is None:
964 self.decoded_self = None
965 else:
966 self.decoded_self = cast(
967 "DecodedStreamObject",
968 decoded_self.clone(pdf_dest, force_duplicate, ignore_fields),
969 )
970 except Exception:
971 pass
972 super()._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
973
974 def hash_bin(self) -> int:
975 """
976 Used to detect modified object.
977
978 Returns:
979 Hash considering type and value.
980
981 """
982 # Use _data to prevent errors on non-decoded streams.
983 return hash((super().hash_bin(), self._data))
984
985 def get_data(self) -> bytes:
986 return self._data
987
988 def set_data(self, data: bytes) -> None:
989 self._data = data
990
991 def hash_value_data(self) -> bytes:
992 data = super().hash_value_data()
993 data += self.get_data()
994 return data
995
996 def write_to_stream(
997 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
998 ) -> None:
999 if encryption_key is not None: # deprecated
1000 deprecation_no_replacement(
1001 "the encryption_key parameter of write_to_stream", "5.0.0"
1002 )
1003 self[NameObject(SA.LENGTH)] = NumberObject(len(self._data))
1004 DictionaryObject.write_to_stream(self, stream)
1005 del self[SA.LENGTH]
1006 stream.write(b"\nstream\n")
1007 stream.write(self._data)
1008 stream.write(b"\nendstream")
1009
1010 @staticmethod
1011 def initializeFromDictionary(data: Dict[str, Any]) -> None:
1012 deprecation_with_replacement(
1013 "initializeFromDictionary", "initialize_from_dictionary", "5.0.0"
1014 ) # pragma: no cover
1015
1016 @staticmethod
1017 def initialize_from_dictionary(
1018 data: Dict[str, Any]
1019 ) -> Union["EncodedStreamObject", "DecodedStreamObject"]:
1020 retval: Union[EncodedStreamObject, DecodedStreamObject]
1021 if SA.FILTER in data:
1022 retval = EncodedStreamObject()
1023 else:
1024 retval = DecodedStreamObject()
1025 retval._data = data["__streamdata__"]
1026 del data["__streamdata__"]
1027 if SA.LENGTH in data:
1028 del data[SA.LENGTH]
1029 retval.update(data)
1030 return retval
1031
1032 def flate_encode(self, level: int = -1) -> "EncodedStreamObject":
1033 from ..filters import FlateDecode # noqa: PLC0415
1034
1035 if SA.FILTER in self:
1036 f = self[SA.FILTER]
1037 if isinstance(f, ArrayObject):
1038 f = ArrayObject([NameObject(FT.FLATE_DECODE), *f])
1039 try:
1040 params = ArrayObject(
1041 [NullObject(), *self.get(SA.DECODE_PARMS, ArrayObject())]
1042 )
1043 except TypeError:
1044 # case of error where the * operator is not working (not an array
1045 params = ArrayObject(
1046 [NullObject(), self.get(SA.DECODE_PARMS, ArrayObject())]
1047 )
1048 else:
1049 f = ArrayObject([NameObject(FT.FLATE_DECODE), f])
1050 params = ArrayObject(
1051 [NullObject(), self.get(SA.DECODE_PARMS, NullObject())]
1052 )
1053 else:
1054 f = NameObject(FT.FLATE_DECODE)
1055 params = None
1056 retval = EncodedStreamObject()
1057 retval.update(self)
1058 retval[NameObject(SA.FILTER)] = f
1059 if params is not None:
1060 retval[NameObject(SA.DECODE_PARMS)] = params
1061 retval._data = FlateDecode.encode(self._data, level)
1062 return retval
1063
1064 def decode_as_image(self) -> Any:
1065 """
1066 Try to decode the stream object as an image
1067
1068 Returns:
1069 a PIL image if proper decoding has been found
1070 Raises:
1071 Exception: (any)during decoding to to invalid object or
1072 errors during decoding will be reported
1073 It is recommended to catch exceptions to prevent
1074 stops in your program.
1075
1076 """
1077 from ..filters import _xobj_to_image # noqa: PLC0415
1078
1079 if self.get("/Subtype", "") != "/Image":
1080 try:
1081 msg = f"{self.indirect_reference} does not seem to be an Image" # pragma: no cover
1082 except AttributeError:
1083 msg = f"{self.__repr__()} object does not seem to be an Image" # pragma: no cover
1084 logger_warning(msg, __name__)
1085 extension, byte_stream, img = _xobj_to_image(self)
1086 if extension is None:
1087 return None # pragma: no cover
1088 return img
1089
1090
1091class DecodedStreamObject(StreamObject):
1092 pass
1093
1094
1095class EncodedStreamObject(StreamObject):
1096 def __init__(self) -> None:
1097 self.decoded_self: Optional[DecodedStreamObject] = None
1098
1099 # This overrides the parent method
1100 def get_data(self) -> bytes:
1101 from ..filters import decode_stream_data # noqa: PLC0415
1102
1103 if self.decoded_self is not None:
1104 # cached version of decoded object
1105 return self.decoded_self.get_data()
1106 # create decoded object
1107 decoded = DecodedStreamObject()
1108
1109 decoded.set_data(decode_stream_data(self))
1110 for key, value in list(self.items()):
1111 if key not in (SA.LENGTH, SA.FILTER, SA.DECODE_PARMS):
1112 decoded[key] = value
1113 self.decoded_self = decoded
1114 return decoded.get_data()
1115
1116 # This overrides the parent method:
1117 def set_data(self, data: bytes) -> None:
1118 from ..filters import FlateDecode # noqa: PLC0415
1119
1120 if self.get(SA.FILTER, "") in (FT.FLATE_DECODE, [FT.FLATE_DECODE]):
1121 if not isinstance(data, bytes):
1122 raise TypeError("Data must be bytes")
1123 if self.decoded_self is None:
1124 self.get_data() # to create self.decoded_self
1125 assert self.decoded_self is not None, "mypy"
1126 self.decoded_self.set_data(data)
1127 super().set_data(FlateDecode.encode(data))
1128 else:
1129 raise PdfReadError(
1130 "Streams encoded with a filter different from FlateDecode are not supported"
1131 )
1132
1133
1134class ContentStream(DecodedStreamObject):
1135 """
1136 In order to be fast, this data structure can contain either:
1137
1138 * raw data in ._data
1139 * parsed stream operations in ._operations.
1140
1141 At any time, ContentStream object can either have both of those fields defined,
1142 or one field defined and the other set to None.
1143
1144 These fields are "rebuilt" lazily, when accessed:
1145
1146 * when .get_data() is called, if ._data is None, it is rebuilt from ._operations.
1147 * when .operations is called, if ._operations is None, it is rebuilt from ._data.
1148
1149 Conversely, these fields can be invalidated:
1150
1151 * when .set_data() is called, ._operations is set to None.
1152 * when .operations is set, ._data is set to None.
1153 """
1154
1155 def __init__(
1156 self,
1157 stream: Any,
1158 pdf: Any,
1159 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None,
1160 ) -> None:
1161 self.pdf = pdf
1162 self._operations: List[Tuple[Any, bytes]] = []
1163
1164 # stream may be a StreamObject or an ArrayObject containing
1165 # StreamObjects to be concatenated together.
1166 if stream is None:
1167 super().set_data(b"")
1168 else:
1169 stream = stream.get_object()
1170 if isinstance(stream, ArrayObject):
1171 data = b""
1172 for s in stream:
1173 s_resolved = s.get_object()
1174 if isinstance(s_resolved, NullObject):
1175 continue
1176 if not isinstance(s_resolved, StreamObject):
1177 # No need to emit an exception here for now - the PDF structure
1178 # seems to already be broken beforehand in these cases.
1179 logger_warning(
1180 f"Expected StreamObject, got {type(s_resolved).__name__} instead. Data might be wrong.",
1181 __name__
1182 )
1183 else:
1184 data += s_resolved.get_data()
1185 if len(data) == 0 or data[-1] != b"\n":
1186 data += b"\n"
1187 super().set_data(bytes(data))
1188 else:
1189 stream_data = stream.get_data()
1190 assert stream_data is not None
1191 super().set_data(stream_data)
1192 self.forced_encoding = forced_encoding
1193
1194 def replicate(
1195 self,
1196 pdf_dest: PdfWriterProtocol,
1197 ) -> "ContentStream":
1198 d__ = cast(
1199 "ContentStream",
1200 self._reference_clone(self.__class__(None, None), pdf_dest, False),
1201 )
1202 d__._data = self._data
1203 try:
1204 decoded_self = self.decoded_self
1205 if decoded_self is None:
1206 self.decoded_self = None
1207 else:
1208 self.decoded_self = cast(
1209 "DecodedStreamObject", decoded_self.replicate(pdf_dest)
1210 )
1211 except Exception:
1212 pass
1213 for k, v in self.items():
1214 d__[k.replicate(pdf_dest)] = (
1215 v.replicate(pdf_dest) if hasattr(v, "replicate") else v
1216 )
1217 return d__
1218 d__.set_data(self._data)
1219 d__.pdf = pdf_dest
1220 d__._operations = list(self._operations)
1221 d__.forced_encoding = self.forced_encoding
1222 return d__
1223
1224 def clone(
1225 self,
1226 pdf_dest: Any,
1227 force_duplicate: bool = False,
1228 ignore_fields: Optional[Sequence[Union[str, int]]] = (),
1229 ) -> "ContentStream":
1230 """
1231 Clone object into pdf_dest.
1232
1233 Args:
1234 pdf_dest:
1235 force_duplicate:
1236 ignore_fields:
1237
1238 Returns:
1239 The cloned ContentStream
1240
1241 """
1242 try:
1243 if self.indirect_reference.pdf == pdf_dest and not force_duplicate: # type: ignore
1244 return self
1245 except Exception:
1246 pass
1247
1248 visited: Set[Tuple[int, int]] = set()
1249 d__ = cast(
1250 "ContentStream",
1251 self._reference_clone(
1252 self.__class__(None, None), pdf_dest, force_duplicate
1253 ),
1254 )
1255 if ignore_fields is None:
1256 ignore_fields = []
1257 d__._clone(self, pdf_dest, force_duplicate, ignore_fields, visited)
1258 return d__
1259
1260 def _clone(
1261 self,
1262 src: DictionaryObject,
1263 pdf_dest: PdfWriterProtocol,
1264 force_duplicate: bool,
1265 ignore_fields: Optional[Sequence[Union[str, int]]],
1266 visited: Set[Tuple[int, int]],
1267 ) -> None:
1268 """
1269 Update the object from src.
1270
1271 Args:
1272 src:
1273 pdf_dest:
1274 force_duplicate:
1275 ignore_fields:
1276
1277 """
1278 src_cs = cast("ContentStream", src)
1279 super().set_data(src_cs._data)
1280 self.pdf = pdf_dest
1281 self._operations = list(src_cs._operations)
1282 self.forced_encoding = src_cs.forced_encoding
1283 # no need to call DictionaryObjection or anything
1284 # like super(DictionaryObject,self)._clone(src, pdf_dest, force_duplicate, ignore_fields, visited)
1285
1286 def _parse_content_stream(self, stream: StreamType) -> None:
1287 # 7.8.2 Content Streams
1288 stream.seek(0, 0)
1289 operands: List[Union[int, str, PdfObject]] = []
1290 while True:
1291 peek = read_non_whitespace(stream)
1292 if peek in (b"", 0):
1293 break
1294 stream.seek(-1, 1)
1295 if peek.isalpha() or peek in (b"'", b'"'):
1296 operator = read_until_regex(stream, NameObject.delimiter_pattern)
1297 if operator == b"BI":
1298 # begin inline image - a completely different parsing
1299 # mechanism is required, of course... thanks buddy...
1300 assert operands == []
1301 ii = self._read_inline_image(stream)
1302 self._operations.append((ii, b"INLINE IMAGE"))
1303 else:
1304 self._operations.append((operands, operator))
1305 operands = []
1306 elif peek == b"%":
1307 # If we encounter a comment in the content stream, we have to
1308 # handle it here. Typically, read_object will handle
1309 # encountering a comment -- but read_object assumes that
1310 # following the comment must be the object we're trying to
1311 # read. In this case, it could be an operator instead.
1312 while peek not in (b"\r", b"\n", b""):
1313 peek = stream.read(1)
1314 else:
1315 operands.append(read_object(stream, None, self.forced_encoding))
1316
1317 def _read_inline_image(self, stream: StreamType) -> Dict[str, Any]:
1318 # begin reading just after the "BI" - begin image
1319 # first read the dictionary of settings.
1320 settings = DictionaryObject()
1321 while True:
1322 tok = read_non_whitespace(stream)
1323 stream.seek(-1, 1)
1324 if tok == b"I":
1325 # "ID" - begin of image data
1326 break
1327 key = read_object(stream, self.pdf)
1328 tok = read_non_whitespace(stream)
1329 stream.seek(-1, 1)
1330 value = read_object(stream, self.pdf)
1331 settings[key] = value
1332 # left at beginning of ID
1333 tmp = stream.read(3)
1334 assert tmp[:2] == b"ID"
1335 filtr = settings.get("/F", settings.get("/Filter", "not set"))
1336 savpos = stream.tell()
1337 if isinstance(filtr, list):
1338 filtr = filtr[0] # used forencoding
1339 if "AHx" in filtr or "ASCIIHexDecode" in filtr:
1340 data = extract_inline_AHx(stream)
1341 elif "A85" in filtr or "ASCII85Decode" in filtr:
1342 data = extract_inline_A85(stream)
1343 elif "RL" in filtr or "RunLengthDecode" in filtr:
1344 data = extract_inline_RL(stream)
1345 elif "DCT" in filtr or "DCTDecode" in filtr:
1346 data = extract_inline_DCT(stream)
1347 elif filtr == "not set":
1348 cs = settings.get("/CS", "")
1349 if isinstance(cs, list):
1350 cs = cs[0]
1351 if "RGB" in cs:
1352 lcs = 3
1353 elif "CMYK" in cs:
1354 lcs = 4
1355 else:
1356 bits = settings.get(
1357 "/BPC",
1358 8 if cs in {"/I", "/G", "/Indexed", "/DeviceGray"} else -1,
1359 )
1360 if bits > 0:
1361 lcs = bits / 8.0
1362 else:
1363 data = extract_inline_default(stream)
1364 lcs = -1
1365 if lcs > 0:
1366 data = stream.read(
1367 ceil(cast(int, settings["/W"]) * lcs) * cast(int, settings["/H"])
1368 )
1369 # Move to the `EI` if possible.
1370 ei = read_non_whitespace(stream)
1371 stream.seek(-1, 1)
1372 else:
1373 data = extract_inline_default(stream)
1374
1375 ei = stream.read(3)
1376 stream.seek(-1, 1)
1377 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES:
1378 # Deal with wrong/missing `EI` tags. Example: Wrong dimensions specified above.
1379 stream.seek(savpos, 0)
1380 data = extract_inline_default(stream)
1381 ei = stream.read(3)
1382 stream.seek(-1, 1)
1383 if ei[:2] != b"EI" or ei[2:3] not in WHITESPACES: # pragma: no cover
1384 # Check the same condition again. This should never fail as
1385 # edge cases are covered by `extract_inline_default` above,
1386 # but check this ot make sure that we are behind the `EI` afterwards.
1387 raise PdfStreamError(
1388 f"Could not extract inline image, even using fallback. Expected 'EI', got {ei!r}"
1389 )
1390 return {"settings": settings, "data": data}
1391
1392 # This overrides the parent method
1393 def get_data(self) -> bytes:
1394 if not self._data:
1395 new_data = BytesIO()
1396 for operands, operator in self._operations:
1397 if operator == b"INLINE IMAGE":
1398 new_data.write(b"BI")
1399 dict_text = BytesIO()
1400 operands["settings"].write_to_stream(dict_text)
1401 new_data.write(dict_text.getvalue()[2:-2])
1402 new_data.write(b"ID ")
1403 new_data.write(operands["data"])
1404 new_data.write(b"EI")
1405 else:
1406 for op in operands:
1407 op.write_to_stream(new_data)
1408 new_data.write(b" ")
1409 new_data.write(operator)
1410 new_data.write(b"\n")
1411 self._data = new_data.getvalue()
1412 return self._data
1413
1414 # This overrides the parent method
1415 def set_data(self, data: bytes) -> None:
1416 super().set_data(data)
1417 self._operations = []
1418
1419 @property
1420 def operations(self) -> List[Tuple[Any, bytes]]:
1421 if not self._operations and self._data:
1422 self._parse_content_stream(BytesIO(self._data))
1423 self._data = b""
1424 return self._operations
1425
1426 @operations.setter
1427 def operations(self, operations: List[Tuple[Any, bytes]]) -> None:
1428 self._operations = operations
1429 self._data = b""
1430
1431 def isolate_graphics_state(self) -> None:
1432 if self._operations:
1433 self._operations.insert(0, ([], b"q"))
1434 self._operations.append(([], b"Q"))
1435 elif self._data:
1436 self._data = b"q\n" + self._data + b"\nQ\n"
1437
1438 # This overrides the parent method
1439 def write_to_stream(
1440 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1441 ) -> None:
1442 if not self._data and self._operations:
1443 self.get_data() # this ensures ._data is rebuilt
1444 super().write_to_stream(stream, encryption_key)
1445
1446
1447def read_object(
1448 stream: StreamType,
1449 pdf: Optional[PdfReaderProtocol],
1450 forced_encoding: Union[None, str, List[str], Dict[int, str]] = None,
1451) -> Union[PdfObject, int, str, ContentStream]:
1452 tok = stream.read(1)
1453 stream.seek(-1, 1) # reset to start
1454 if tok == b"/":
1455 return NameObject.read_from_stream(stream, pdf)
1456 if tok == b"<":
1457 # hexadecimal string OR dictionary
1458 peek = stream.read(2)
1459 stream.seek(-2, 1) # reset to start
1460 if peek == b"<<":
1461 return DictionaryObject.read_from_stream(stream, pdf, forced_encoding)
1462 return read_hex_string_from_stream(stream, forced_encoding)
1463 if tok == b"[":
1464 return ArrayObject.read_from_stream(stream, pdf, forced_encoding)
1465 if tok in (b"t", b"f"):
1466 return BooleanObject.read_from_stream(stream)
1467 if tok == b"(":
1468 return read_string_from_stream(stream, forced_encoding)
1469 if tok == b"e" and stream.read(6) == b"endobj":
1470 return NullObject()
1471 if tok == b"n":
1472 return NullObject.read_from_stream(stream)
1473 if tok == b"%":
1474 # comment
1475 skip_over_comment(stream)
1476 tok = read_non_whitespace(stream)
1477 stream.seek(-1, 1)
1478 return read_object(stream, pdf, forced_encoding)
1479 if tok in b"0123456789+-.":
1480 # number object OR indirect reference
1481 peek = stream.read(20)
1482 stream.seek(-len(peek), 1) # reset to start
1483 if IndirectPattern.match(peek) is not None:
1484 assert pdf is not None, "mypy"
1485 return IndirectObject.read_from_stream(stream, pdf)
1486 return NumberObject.read_from_stream(stream)
1487 pos = stream.tell()
1488 stream.seek(-20, 1)
1489 stream_extract = stream.read(80)
1490 stream.seek(pos)
1491 read_until_whitespace(stream)
1492 raise PdfReadError(
1493 f"Invalid Elementary Object starting with {tok!r} @{pos}: {stream_extract!r}"
1494 )
1495
1496
1497class Field(TreeObject):
1498 """
1499 A class representing a field dictionary.
1500
1501 This class is accessed through
1502 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1503 """
1504
1505 def __init__(self, data: DictionaryObject) -> None:
1506 DictionaryObject.__init__(self)
1507 field_attributes = (
1508 FieldDictionaryAttributes.attributes()
1509 + CheckboxRadioButtonAttributes.attributes()
1510 )
1511 self.indirect_reference = data.indirect_reference
1512 for attr in field_attributes:
1513 try:
1514 self[NameObject(attr)] = data[attr]
1515 except KeyError:
1516 pass
1517 if isinstance(self.get("/V"), EncodedStreamObject):
1518 d = cast(EncodedStreamObject, self[NameObject("/V")]).get_data()
1519 if isinstance(d, bytes):
1520 d_str = d.decode()
1521 elif d is None:
1522 d_str = ""
1523 else:
1524 raise Exception("Should never happen")
1525 self[NameObject("/V")] = TextStringObject(d_str)
1526
1527 # TABLE 8.69 Entries common to all field dictionaries
1528 @property
1529 def field_type(self) -> Optional[NameObject]:
1530 """Read-only property accessing the type of this field."""
1531 return self.get(FieldDictionaryAttributes.FT)
1532
1533 @property
1534 def parent(self) -> Optional[DictionaryObject]:
1535 """Read-only property accessing the parent of this field."""
1536 return self.get(FieldDictionaryAttributes.Parent)
1537
1538 @property
1539 def kids(self) -> Optional["ArrayObject"]:
1540 """Read-only property accessing the kids of this field."""
1541 return self.get(FieldDictionaryAttributes.Kids)
1542
1543 @property
1544 def name(self) -> Optional[str]:
1545 """Read-only property accessing the name of this field."""
1546 return self.get(FieldDictionaryAttributes.T)
1547
1548 @property
1549 def alternate_name(self) -> Optional[str]:
1550 """Read-only property accessing the alternate name of this field."""
1551 return self.get(FieldDictionaryAttributes.TU)
1552
1553 @property
1554 def mapping_name(self) -> Optional[str]:
1555 """
1556 Read-only property accessing the mapping name of this field.
1557
1558 This name is used by pypdf as a key in the dictionary returned by
1559 :meth:`get_fields()<pypdf.PdfReader.get_fields>`
1560 """
1561 return self.get(FieldDictionaryAttributes.TM)
1562
1563 @property
1564 def flags(self) -> Optional[int]:
1565 """
1566 Read-only property accessing the field flags, specifying various
1567 characteristics of the field (see Table 8.70 of the PDF 1.7 reference).
1568 """
1569 return self.get(FieldDictionaryAttributes.Ff)
1570
1571 @property
1572 def value(self) -> Optional[Any]:
1573 """
1574 Read-only property accessing the value of this field.
1575
1576 Format varies based on field type.
1577 """
1578 return self.get(FieldDictionaryAttributes.V)
1579
1580 @property
1581 def default_value(self) -> Optional[Any]:
1582 """Read-only property accessing the default value of this field."""
1583 return self.get(FieldDictionaryAttributes.DV)
1584
1585 @property
1586 def additional_actions(self) -> Optional[DictionaryObject]:
1587 """
1588 Read-only property accessing the additional actions dictionary.
1589
1590 This dictionary defines the field's behavior in response to trigger
1591 events. See Section 8.5.2 of the PDF 1.7 reference.
1592 """
1593 return self.get(FieldDictionaryAttributes.AA)
1594
1595
1596class Destination(TreeObject):
1597 """
1598 A class representing a destination within a PDF file.
1599
1600 See section 12.3.2 of the PDF 2.0 reference.
1601
1602 Args:
1603 title: Title of this destination.
1604 page: Reference to the page of this destination. Should
1605 be an instance of :class:`IndirectObject<pypdf.generic.IndirectObject>`.
1606 fit: How the destination is displayed.
1607
1608 Raises:
1609 PdfReadError: If destination type is invalid.
1610
1611 """
1612
1613 node: Optional[
1614 DictionaryObject
1615 ] = None # node provide access to the original Object
1616
1617 def __init__(
1618 self,
1619 title: str,
1620 page: Union[NumberObject, IndirectObject, NullObject, DictionaryObject],
1621 fit: Fit,
1622 ) -> None:
1623 self._filtered_children: List[Any] = [] # used in PdfWriter
1624
1625 typ = fit.fit_type
1626 args = fit.fit_args
1627
1628 DictionaryObject.__init__(self)
1629 self[NameObject("/Title")] = TextStringObject(title)
1630 self[NameObject("/Page")] = page
1631 self[NameObject("/Type")] = typ
1632
1633 # from table 8.2 of the PDF 1.7 reference.
1634 if typ == "/XYZ":
1635 if len(args) < 1: # left is missing : should never occur
1636 args.append(NumberObject(0.0))
1637 if len(args) < 2: # top is missing
1638 args.append(NumberObject(0.0))
1639 if len(args) < 3: # zoom is missing
1640 args.append(NumberObject(0.0))
1641 (
1642 self[NameObject(TA.LEFT)],
1643 self[NameObject(TA.TOP)],
1644 self[NameObject("/Zoom")],
1645 ) = args
1646 elif len(args) == 0:
1647 pass
1648 elif typ == TF.FIT_R:
1649 (
1650 self[NameObject(TA.LEFT)],
1651 self[NameObject(TA.BOTTOM)],
1652 self[NameObject(TA.RIGHT)],
1653 self[NameObject(TA.TOP)],
1654 ) = args
1655 elif typ in [TF.FIT_H, TF.FIT_BH]:
1656 try: # Prefer to be more robust not only to null parameters
1657 (self[NameObject(TA.TOP)],) = args
1658 except Exception:
1659 (self[NameObject(TA.TOP)],) = (NullObject(),)
1660 elif typ in [TF.FIT_V, TF.FIT_BV]:
1661 try: # Prefer to be more robust not only to null parameters
1662 (self[NameObject(TA.LEFT)],) = args
1663 except Exception:
1664 (self[NameObject(TA.LEFT)],) = (NullObject(),)
1665 elif typ in [TF.FIT, TF.FIT_B]:
1666 pass
1667 else:
1668 raise PdfReadError(f"Unknown Destination Type: {typ!r}")
1669
1670 @property
1671 def dest_array(self) -> "ArrayObject":
1672 return ArrayObject(
1673 [self.raw_get("/Page"), self["/Type"]]
1674 + [
1675 self[x]
1676 for x in ["/Left", "/Bottom", "/Right", "/Top", "/Zoom"]
1677 if x in self
1678 ]
1679 )
1680
1681 def write_to_stream(
1682 self, stream: StreamType, encryption_key: Union[None, str, bytes] = None
1683 ) -> None:
1684 if encryption_key is not None: # deprecated
1685 deprecation_no_replacement(
1686 "the encryption_key parameter of write_to_stream", "5.0.0"
1687 )
1688 stream.write(b"<<\n")
1689 key = NameObject("/D")
1690 key.write_to_stream(stream)
1691 stream.write(b" ")
1692 value = self.dest_array
1693 value.write_to_stream(stream)
1694
1695 key = NameObject("/S")
1696 key.write_to_stream(stream)
1697 stream.write(b" ")
1698 value_s = NameObject("/GoTo")
1699 value_s.write_to_stream(stream)
1700
1701 stream.write(b"\n")
1702 stream.write(b">>")
1703
1704 @property
1705 def title(self) -> Optional[str]:
1706 """Read-only property accessing the destination title."""
1707 return self.get("/Title")
1708
1709 @property
1710 def page(self) -> Optional[int]:
1711 """Read-only property accessing the destination page number."""
1712 return self.get("/Page")
1713
1714 @property
1715 def typ(self) -> Optional[str]:
1716 """Read-only property accessing the destination type."""
1717 return self.get("/Type")
1718
1719 @property
1720 def zoom(self) -> Optional[int]:
1721 """Read-only property accessing the zoom factor."""
1722 return self.get("/Zoom", None)
1723
1724 @property
1725 def left(self) -> Optional[FloatObject]:
1726 """Read-only property accessing the left horizontal coordinate."""
1727 return self.get("/Left", None)
1728
1729 @property
1730 def right(self) -> Optional[FloatObject]:
1731 """Read-only property accessing the right horizontal coordinate."""
1732 return self.get("/Right", None)
1733
1734 @property
1735 def top(self) -> Optional[FloatObject]:
1736 """Read-only property accessing the top vertical coordinate."""
1737 return self.get("/Top", None)
1738
1739 @property
1740 def bottom(self) -> Optional[FloatObject]:
1741 """Read-only property accessing the bottom vertical coordinate."""
1742 return self.get("/Bottom", None)
1743
1744 @property
1745 def color(self) -> Optional["ArrayObject"]:
1746 """Read-only property accessing the color in (R, G, B) with values 0.0-1.0."""
1747 return self.get(
1748 "/C", ArrayObject([FloatObject(0), FloatObject(0), FloatObject(0)])
1749 )
1750
1751 @property
1752 def font_format(self) -> Optional[OutlineFontFlag]:
1753 """
1754 Read-only property accessing the font type.
1755
1756 1=italic, 2=bold, 3=both
1757 """
1758 return self.get("/F", 0)
1759
1760 @property
1761 def outline_count(self) -> Optional[int]:
1762 """
1763 Read-only property accessing the outline count.
1764
1765 positive = expanded
1766 negative = collapsed
1767 absolute value = number of visible descendants at all levels
1768 """
1769 return self.get("/Count", None)