Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_writer.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com>
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15# * The name of the author may not be used to endorse or promote products
16# derived from this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
30import decimal
31import enum
32import hashlib
33import re
34import struct
35import uuid
36from collections.abc import Iterable, Mapping
37from io import BytesIO, FileIO, IOBase
38from itertools import compress
39from pathlib import Path
40from re import Pattern
41from types import TracebackType
42from typing import (
43 IO,
44 Any,
45 Callable,
46 Optional,
47 Union,
48 cast,
49)
51from ._doc_common import DocumentInformation, PdfDocCommon
52from ._encryption import EncryptAlgorithm, Encryption
53from ._page import PageObject, Transformation
54from ._page_labels import nums_clear_range, nums_insert, nums_next
55from ._reader import PdfReader
56from ._utils import (
57 StrByteType,
58 StreamType,
59 _get_max_pdf_version_header,
60 deprecation_no_replacement,
61 logger_warning,
62)
63from .constants import AnnotationDictionaryAttributes as AA
64from .constants import CatalogAttributes as CA
65from .constants import (
66 CatalogDictionary,
67 GoToActionArguments,
68 ImageType,
69 InteractiveFormDictEntries,
70 OutlineFontFlag,
71 PageLabelStyle,
72 PagesAttributes,
73 TypFitArguments,
74 UserAccessPermissions,
75)
76from .constants import Core as CO
77from .constants import FieldDictionaryAttributes as FA
78from .constants import PageAttributes as PG
79from .constants import TrailerKeys as TK
80from .errors import PdfReadError, PyPdfError
81from .generic import (
82 PAGE_FIT,
83 ArrayObject,
84 BooleanObject,
85 ByteStringObject,
86 ContentStream,
87 Destination,
88 DictionaryObject,
89 EmbeddedFile,
90 Fit,
91 FloatObject,
92 IndirectObject,
93 NameObject,
94 NullObject,
95 NumberObject,
96 PdfObject,
97 RectangleObject,
98 ReferenceLink,
99 StreamObject,
100 TextStringObject,
101 TreeObject,
102 ViewerPreferences,
103 create_string_object,
104 extract_links,
105 hex_to_rgb,
106 is_null_or_none,
107)
108from .generic._appearance_stream import TextStreamAppearance
109from .pagerange import PageRange, PageRangeSpec
110from .types import (
111 AnnotationSubtype,
112 BorderArrayType,
113 LayoutType,
114 OutlineItemType,
115 OutlineType,
116 PagemodeType,
117)
118from .xmp import XmpInformation
120ALL_DOCUMENT_PERMISSIONS = UserAccessPermissions.all()
123class ObjectDeletionFlag(enum.IntFlag):
124 NONE = 0
125 TEXT = enum.auto()
126 LINKS = enum.auto()
127 ATTACHMENTS = enum.auto()
128 OBJECTS_3D = enum.auto()
129 ALL_ANNOTATIONS = enum.auto()
130 XOBJECT_IMAGES = enum.auto()
131 INLINE_IMAGES = enum.auto()
132 DRAWING_IMAGES = enum.auto()
133 IMAGES = XOBJECT_IMAGES | INLINE_IMAGES | DRAWING_IMAGES
136def _rolling_checksum(stream: BytesIO, blocksize: int = 65536) -> str:
137 hash = hashlib.md5(usedforsecurity=False)
138 for block in iter(lambda: stream.read(blocksize), b""):
139 hash.update(block)
140 return hash.hexdigest()
143class PdfWriter(PdfDocCommon):
144 """
145 Write a PDF file out, given pages produced by another class or through
146 cloning a PDF file during initialization.
148 Typically data is added from a :class:`PdfReader<pypdf.PdfReader>`.
150 Args:
151 clone_from: identical to fileobj (for compatibility)
153 incremental: If true, loads the document and set the PdfWriter in incremental mode.
155 When writing incrementally, the original document is written first and new/modified
156 content is appended. To be used for signed document/forms to keep signature valid.
158 full: If true, loads all the objects (always full if incremental = True).
159 This parameter may allow loading large PDFs.
161 strict: If true, pypdf will raise an exception if a PDF does not follow the specification.
162 If false, pypdf will try to be forgiving and do something reasonable, but it will log
163 a warning message. It is a best-effort approach.
165 """
167 def __init__(
168 self,
169 fileobj: Union[None, PdfReader, StrByteType, Path] = "",
170 clone_from: Union[None, PdfReader, StrByteType, Path] = None,
171 incremental: bool = False,
172 full: bool = False,
173 strict: bool = False,
174 ) -> None:
175 self.strict = strict
176 """
177 If true, pypdf will raise an exception if a PDF does not follow the specification.
178 If false, pypdf will try to be forgiving and do something reasonable, but it will log
179 a warning message. It is a best-effort approach.
180 """
182 self.incremental = incremental or full
183 """
184 Returns if the PdfWriter object has been started in incremental mode.
185 """
187 self._objects: list[Optional[PdfObject]] = []
188 """
189 The indirect objects in the PDF.
190 For the incremental case, it will be filled with None
191 in clone_reader_document_root.
192 """
194 self._original_hash: list[int] = []
195 """
196 List of hashes after import; used to identify changes.
197 """
199 self._idnum_hash: dict[bytes, tuple[IndirectObject, list[IndirectObject]]] = {}
200 """
201 Maps hash values of indirect objects to the list of IndirectObjects.
202 This is used for compression.
203 """
205 self._id_translated: dict[int, dict[int, int]] = {}
206 """List of already translated IDs.
207 dict[id(pdf)][(idnum, generation)]
208 """
210 self._info_obj: Optional[PdfObject]
211 """The PDF files's document information dictionary,
212 defined by Info in the PDF file's trailer dictionary."""
214 self._ID: Union[ArrayObject, None] = None
215 """The PDF file identifier,
216 defined by the ID in the PDF file's trailer dictionary."""
218 self._unresolved_links: list[tuple[ReferenceLink, ReferenceLink]] = []
219 "Tracks links in pages added to the writer for resolving later."
220 self._merged_in_pages: dict[Optional[IndirectObject], Optional[IndirectObject]] = {}
221 "Tracks pages added to the writer and what page they turned into."
223 if self.incremental:
224 if isinstance(fileobj, (str, Path)):
225 with open(fileobj, "rb") as f:
226 fileobj = BytesIO(f.read(-1))
227 if isinstance(fileobj, BytesIO):
228 fileobj = PdfReader(fileobj)
229 if not isinstance(fileobj, PdfReader):
230 raise PyPdfError("Invalid type for incremental mode")
231 self._reader = fileobj # prev content is in _reader.stream
232 self._header = fileobj.pdf_header.encode()
233 self._readonly = True # TODO: to be analysed
234 else:
235 self._header = b"%PDF-1.3"
236 self._info_obj = self._add_object(
237 DictionaryObject(
238 {NameObject("/Producer"): create_string_object("pypdf")}
239 )
240 )
242 def _get_clone_from(
243 fileobj: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
244 clone_from: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
245 ) -> Union[None, PdfReader, str, Path, IO[Any], BytesIO]:
246 if isinstance(fileobj, (str, Path, IO, BytesIO)) and (
247 fileobj == "" or clone_from is not None
248 ):
249 return clone_from
250 cloning = True
251 if isinstance(fileobj, (str, Path)) and (
252 not Path(str(fileobj)).exists()
253 or Path(str(fileobj)).stat().st_size == 0
254 ):
255 cloning = False
256 if isinstance(fileobj, (IOBase, BytesIO)):
257 t = fileobj.tell()
258 if fileobj.seek(0, 2) == 0:
259 cloning = False
260 fileobj.seek(t, 0)
261 if cloning:
262 clone_from = fileobj
263 return clone_from
265 clone_from = _get_clone_from(fileobj, clone_from)
266 # To prevent overwriting
267 self.temp_fileobj = fileobj
268 self.fileobj = ""
269 self._with_as_usage = False
270 self._cloned = False
271 # The root of our page tree node
272 pages = DictionaryObject(
273 {
274 NameObject(PagesAttributes.TYPE): NameObject("/Pages"),
275 NameObject(PagesAttributes.COUNT): NumberObject(0),
276 NameObject(PagesAttributes.KIDS): ArrayObject(),
277 }
278 )
279 self.flattened_pages = []
280 self._encryption: Optional[Encryption] = None
281 self._encrypt_entry: Optional[DictionaryObject] = None
283 if clone_from is not None:
284 if not isinstance(clone_from, PdfReader):
285 clone_from = PdfReader(clone_from)
286 self.clone_document_from_reader(clone_from)
287 self._cloned = True
288 else:
289 self._pages = self._add_object(pages)
290 self._root_object = DictionaryObject(
291 {
292 NameObject(PagesAttributes.TYPE): NameObject(CO.CATALOG),
293 NameObject(CO.PAGES): self._pages,
294 }
295 )
296 self._add_object(self._root_object)
297 if full and not incremental:
298 self.incremental = False
299 if isinstance(self._ID, list):
300 if isinstance(self._ID[0], TextStringObject):
301 self._ID[0] = ByteStringObject(self._ID[0].get_original_bytes())
302 if isinstance(self._ID[1], TextStringObject):
303 self._ID[1] = ByteStringObject(self._ID[1].get_original_bytes())
305 # for commonality
306 @property
307 def is_encrypted(self) -> bool:
308 """
309 Read-only boolean property showing whether this PDF file is encrypted.
311 Note that this property, if true, will remain true even after the
312 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
313 """
314 return False
316 @property
317 def root_object(self) -> DictionaryObject:
318 """
319 Provide direct access to PDF Structure.
321 Note:
322 Recommended only for read access.
324 """
325 return self._root_object
327 @property
328 def _info(self) -> Optional[DictionaryObject]:
329 """
330 Provide access to "/Info". Standardized with PdfReader.
332 Returns:
333 /Info Dictionary; None if the entry does not exist
335 """
336 return (
337 None
338 if self._info_obj is None
339 else cast(DictionaryObject, self._info_obj.get_object())
340 )
342 @_info.setter
343 def _info(self, value: Optional[Union[IndirectObject, DictionaryObject]]) -> None:
344 if value is None:
345 try:
346 self._objects[self._info_obj.indirect_reference.idnum - 1] = None # type: ignore
347 except (KeyError, AttributeError):
348 pass
349 self._info_obj = None
350 else:
351 if self._info_obj is None:
352 self._info_obj = self._add_object(DictionaryObject())
353 obj = cast(DictionaryObject, self._info_obj.get_object())
354 obj.clear()
355 obj.update(cast(DictionaryObject, value.get_object()))
357 @property
358 def xmp_metadata(self) -> Optional[XmpInformation]:
359 """XMP (Extensible Metadata Platform) data."""
360 return cast(XmpInformation, self.root_object.xmp_metadata)
362 @xmp_metadata.setter
363 def xmp_metadata(self, value: Union[XmpInformation, bytes, None]) -> None:
364 """XMP (Extensible Metadata Platform) data."""
365 if value is None:
366 if "/Metadata" in self.root_object:
367 del self.root_object["/Metadata"]
368 return
370 metadata = self.root_object.get("/Metadata", None)
371 if not isinstance(metadata, IndirectObject):
372 if metadata is not None:
373 del self.root_object["/Metadata"]
374 metadata_stream = StreamObject()
375 stream_reference = self._add_object(metadata_stream)
376 self.root_object[NameObject("/Metadata")] = stream_reference
377 else:
378 metadata_stream = cast(StreamObject, metadata.get_object())
380 if isinstance(value, XmpInformation):
381 bytes_data = value.stream.get_data()
382 else:
383 bytes_data = value
384 metadata_stream.set_data(bytes_data)
386 @property
387 def with_as_usage(self) -> bool:
388 deprecation_no_replacement("with_as_usage", "5.0")
389 return self._with_as_usage
391 @with_as_usage.setter
392 def with_as_usage(self, value: bool) -> None:
393 deprecation_no_replacement("with_as_usage", "5.0")
394 self._with_as_usage = value
396 def __enter__(self) -> "PdfWriter":
397 """Store how writer is initialized by 'with'."""
398 c: bool = self._cloned
399 t = self.temp_fileobj
400 self.__init__() # type: ignore
401 self._cloned = c
402 self._with_as_usage = True
403 self.fileobj = t # type: ignore
404 return self
406 def __exit__(
407 self,
408 exc_type: Optional[type[BaseException]],
409 exc: Optional[BaseException],
410 traceback: Optional[TracebackType],
411 ) -> None:
412 """Write data to the fileobj."""
413 if self.fileobj and not self._cloned:
414 self.write(self.fileobj)
416 @property
417 def pdf_header(self) -> str:
418 """
419 Read/Write property of the PDF header that is written.
421 This should be something like ``'%PDF-1.5'``. It is recommended to set
422 the lowest version that supports all features which are used within the
423 PDF file.
425 Note: `pdf_header` returns a string but accepts bytes or str for writing
426 """
427 return self._header.decode()
429 @pdf_header.setter
430 def pdf_header(self, new_header: Union[str, bytes]) -> None:
431 if isinstance(new_header, str):
432 new_header = new_header.encode()
433 self._header = new_header
435 def _add_object(self, obj: PdfObject) -> IndirectObject:
436 if (
437 getattr(obj, "indirect_reference", None) is not None
438 and obj.indirect_reference.pdf == self # type: ignore
439 ):
440 return obj.indirect_reference # type: ignore
441 # check for /Contents in Pages (/Contents in annotations are strings)
442 if isinstance(obj, DictionaryObject) and isinstance(
443 obj.get(PG.CONTENTS, None), (ArrayObject, DictionaryObject)
444 ):
445 obj[NameObject(PG.CONTENTS)] = self._add_object(obj[PG.CONTENTS])
446 self._objects.append(obj)
447 obj.indirect_reference = IndirectObject(len(self._objects), 0, self)
448 return obj.indirect_reference
450 def get_object(
451 self,
452 indirect_reference: Union[int, IndirectObject],
453 ) -> PdfObject:
454 if isinstance(indirect_reference, int):
455 obj = self._objects[indirect_reference - 1]
456 elif indirect_reference.pdf != self:
457 raise ValueError("PDF must be self")
458 else:
459 obj = self._objects[indirect_reference.idnum - 1]
460 assert obj is not None, "mypy"
461 return obj
463 def _replace_object(
464 self,
465 indirect_reference: Union[int, IndirectObject],
466 obj: PdfObject,
467 ) -> PdfObject:
468 if isinstance(indirect_reference, IndirectObject):
469 if indirect_reference.pdf != self:
470 raise ValueError("PDF must be self")
471 indirect_reference = indirect_reference.idnum
472 gen = self._objects[indirect_reference - 1].indirect_reference.generation # type: ignore
473 if (
474 getattr(obj, "indirect_reference", None) is not None
475 and obj.indirect_reference.pdf != self # type: ignore
476 ):
477 obj = obj.clone(self)
478 self._objects[indirect_reference - 1] = obj
479 obj.indirect_reference = IndirectObject(indirect_reference, gen, self)
481 assert isinstance(obj, PdfObject), "mypy"
482 return obj
484 def _add_page(
485 self,
486 page: PageObject,
487 index: int,
488 excluded_keys: Iterable[str] = (),
489 ) -> PageObject:
490 if not isinstance(page, PageObject) or page.get(PagesAttributes.TYPE, None) != CO.PAGE:
491 raise ValueError("Invalid page object")
492 assert self.flattened_pages is not None, "for mypy"
493 page_org = page
494 excluded_keys = list(excluded_keys)
495 excluded_keys += [PagesAttributes.PARENT, "/StructParents"]
496 # Acrobat does not accept two indirect references pointing on the same
497 # page; therefore in order to add multiple copies of the same
498 # page, we need to create a new dictionary for the page, however the
499 # objects below (including content) are not duplicated:
500 try: # delete an already existing page
501 del self._id_translated[id(page_org.indirect_reference.pdf)][ # type: ignore
502 page_org.indirect_reference.idnum # type: ignore
503 ]
504 except Exception:
505 pass
507 page = cast(
508 "PageObject", page_org.clone(self, False, excluded_keys).get_object()
509 )
510 if page_org.pdf is not None:
511 other = page_org.pdf.pdf_header
512 self.pdf_header = _get_max_pdf_version_header(self.pdf_header, other)
514 node, idx = self._get_page_in_node(index)
515 page[NameObject(PagesAttributes.PARENT)] = node.indirect_reference
517 if idx >= 0:
518 cast(ArrayObject, node[PagesAttributes.KIDS]).insert(idx, page.indirect_reference)
519 self.flattened_pages.insert(index, page)
520 else:
521 cast(ArrayObject, node[PagesAttributes.KIDS]).append(page.indirect_reference)
522 self.flattened_pages.append(page)
523 recurse = 0
524 while not is_null_or_none(node):
525 node = cast(DictionaryObject, node.get_object())
526 node[NameObject(PagesAttributes.COUNT)] = NumberObject(cast(int, node[PagesAttributes.COUNT]) + 1)
527 node = node.get(PagesAttributes.PARENT, None) # type: ignore[assignment] # TODO: Fix.
528 recurse += 1
529 if recurse > 1000:
530 raise PyPdfError("Too many recursive calls!")
532 if page_org.pdf is not None:
533 # the page may contain links to other pages, and those other
534 # pages may or may not already be added. we store the
535 # information we need, so that we can resolve the references
536 # later.
537 self._unresolved_links.extend(extract_links(page, page_org))
538 self._merged_in_pages[page_org.indirect_reference] = page.indirect_reference
540 return page
542 def set_need_appearances_writer(self, state: bool = True) -> None:
543 """
544 Sets the "NeedAppearances" flag in the PDF writer.
546 The "NeedAppearances" flag indicates whether the appearance dictionary
547 for form fields should be automatically generated by the PDF viewer or
548 if the embedded appearance should be used.
550 Args:
551 state: The actual value of the NeedAppearances flag.
553 Returns:
554 None
556 """
557 # See §12.7.2 and §7.7.2 for more information:
558 # https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
559 try:
560 # get the AcroForm tree
561 if CatalogDictionary.ACRO_FORM not in self._root_object:
562 self._root_object[
563 NameObject(CatalogDictionary.ACRO_FORM)
564 ] = self._add_object(DictionaryObject())
566 need_appearances = NameObject(InteractiveFormDictEntries.NeedAppearances)
567 cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])[
568 need_appearances
569 ] = BooleanObject(state)
570 except Exception as exc: # pragma: no cover
571 logger_warning(
572 f"set_need_appearances_writer({state}) catch : {exc}", __name__
573 )
575 def create_viewer_preferences(self) -> ViewerPreferences:
576 o = ViewerPreferences()
577 self._root_object[
578 NameObject(CatalogDictionary.VIEWER_PREFERENCES)
579 ] = self._add_object(o)
580 return o
582 def add_page(
583 self,
584 page: PageObject,
585 excluded_keys: Iterable[str] = (),
586 ) -> PageObject:
587 """
588 Add a page to this PDF file.
590 Recommended for advanced usage including the adequate excluded_keys.
592 The page is usually acquired from a :class:`PdfReader<pypdf.PdfReader>`
593 instance.
595 Args:
596 page: The page to add to the document. Should be
597 an instance of :class:`PageObject<pypdf._page.PageObject>`
598 excluded_keys:
600 Returns:
601 The added PageObject.
603 """
604 assert self.flattened_pages is not None, "mypy"
605 return self._add_page(page, len(self.flattened_pages), excluded_keys)
607 def insert_page(
608 self,
609 page: PageObject,
610 index: int = 0,
611 excluded_keys: Iterable[str] = (),
612 ) -> PageObject:
613 """
614 Insert a page in this PDF file. The page is usually acquired from a
615 :class:`PdfReader<pypdf.PdfReader>` instance.
617 Args:
618 page: The page to add to the document.
619 index: Position at which the page will be inserted.
620 excluded_keys:
622 Returns:
623 The added PageObject.
625 """
626 assert self.flattened_pages is not None, "mypy"
627 if index < 0:
628 index += len(self.flattened_pages)
629 if index < 0:
630 raise ValueError("Invalid index value")
631 if index >= len(self.flattened_pages):
632 return self.add_page(page, excluded_keys)
633 return self._add_page(page, index, excluded_keys)
635 def _get_page_number_by_indirect(
636 self, indirect_reference: Union[None, int, NullObject, IndirectObject]
637 ) -> Optional[int]:
638 """
639 Generate _page_id2num.
641 Args:
642 indirect_reference:
644 Returns:
645 The page number or None
647 """
648 # To provide same function as in PdfReader
649 if is_null_or_none(indirect_reference):
650 return None
651 assert indirect_reference is not None, "mypy"
652 if isinstance(indirect_reference, int):
653 indirect_reference = IndirectObject(indirect_reference, 0, self)
654 obj = indirect_reference.get_object()
655 if isinstance(obj, PageObject):
656 return obj.page_number
657 return None
659 def add_blank_page(
660 self, width: Optional[float] = None, height: Optional[float] = None
661 ) -> PageObject:
662 """
663 Append a blank page to this PDF file and return it.
665 If no page size is specified, use the size of the last page.
667 Args:
668 width: The width of the new page expressed in default user
669 space units.
670 height: The height of the new page expressed in default
671 user space units.
673 Returns:
674 The newly appended page.
676 Raises:
677 PageSizeNotDefinedError: if width and height are not defined
678 and previous page does not exist.
680 """
681 page = PageObject.create_blank_page(self, width, height)
682 return self.add_page(page)
684 def insert_blank_page(
685 self,
686 width: Optional[Union[float, decimal.Decimal]] = None,
687 height: Optional[Union[float, decimal.Decimal]] = None,
688 index: int = 0,
689 ) -> PageObject:
690 """
691 Insert a blank page to this PDF file and return it.
693 If no page size is specified for a dimension, use the size of the last page.
695 Args:
696 width: The width of the new page expressed in default user
697 space units.
698 height: The height of the new page expressed in default
699 user space units.
700 index: Position to add the page.
702 Returns:
703 The newly inserted page.
705 Raises:
706 PageSizeNotDefinedError: if width and height are not defined
707 and previous page does not exist.
709 """
710 if width is None or (height is None and index < self.get_num_pages()):
711 oldpage = self.pages[index]
712 width = oldpage.mediabox.width
713 height = oldpage.mediabox.height
714 page = PageObject.create_blank_page(self, width, height)
715 self.insert_page(page, index)
716 return page
718 @property
719 def open_destination(
720 self,
721 ) -> Union[None, Destination, TextStringObject, ByteStringObject]:
722 return super().open_destination
724 @open_destination.setter
725 def open_destination(self, dest: Union[None, str, Destination, PageObject]) -> None:
726 if dest is None:
727 try:
728 del self._root_object["/OpenAction"]
729 except KeyError:
730 pass
731 elif isinstance(dest, str):
732 self._root_object[NameObject("/OpenAction")] = TextStringObject(dest)
733 elif isinstance(dest, Destination):
734 self._root_object[NameObject("/OpenAction")] = dest.dest_array
735 elif isinstance(dest, PageObject):
736 self._root_object[NameObject("/OpenAction")] = Destination(
737 "Opening",
738 dest.indirect_reference
739 if dest.indirect_reference is not None
740 else NullObject(),
741 PAGE_FIT,
742 ).dest_array
744 def add_js(self, javascript: str) -> None:
745 """
746 Add JavaScript which will launch upon opening this PDF.
748 Args:
749 javascript: Your JavaScript.
751 Example:
752 This will launch the print window when the PDF is opened.
754 >>> from pypdf import PdfWriter
755 >>> output = PdfWriter()
756 >>> output.add_js("this.print({bUI:true,bSilent:false,bShrinkToFit:true});")
758 """
759 # Names / JavaScript preferred to be able to add multiple scripts
760 if "/Names" not in self._root_object:
761 self._root_object[NameObject(CA.NAMES)] = DictionaryObject()
762 names = cast(DictionaryObject, self._root_object[CA.NAMES])
763 if "/JavaScript" not in names:
764 names[NameObject("/JavaScript")] = DictionaryObject(
765 {NameObject("/Names"): ArrayObject()}
766 )
767 js_list = cast(
768 ArrayObject, cast(DictionaryObject, names["/JavaScript"])["/Names"]
769 )
770 # We need a name for parameterized JavaScript in the PDF file,
771 # but it can be anything.
772 js_list.append(create_string_object(str(uuid.uuid4())))
774 js = DictionaryObject(
775 {
776 NameObject(PagesAttributes.TYPE): NameObject("/Action"),
777 NameObject("/S"): NameObject("/JavaScript"),
778 NameObject("/JS"): TextStringObject(f"{javascript}"),
779 }
780 )
781 js_list.append(self._add_object(js))
783 def add_attachment(self, filename: str, data: Union[str, bytes]) -> "EmbeddedFile":
784 """
785 Embed a file inside the PDF.
787 Reference:
788 https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
789 Section 7.11.3
791 Args:
792 filename: The filename to display.
793 data: The data in the file.
795 Returns:
796 EmbeddedFile instance for the newly created embedded file.
798 """
799 return EmbeddedFile._create_new(self, filename, data)
801 def append_pages_from_reader(
802 self,
803 reader: PdfReader,
804 after_page_append: Optional[Callable[[PageObject], None]] = None,
805 ) -> None:
806 """
807 Copy pages from reader to writer. Includes an optional callback
808 parameter which is invoked after pages are appended to the writer.
810 ``append`` should be preferred.
812 Args:
813 reader: a PdfReader object from which to copy page
814 annotations to this writer object. The writer's annots
815 will then be updated.
816 after_page_append:
817 Callback function that is invoked after each page is appended to
818 the writer. Signature includes a reference to the appended page
819 (delegates to append_pages_from_reader). The single parameter of
820 the callback is a reference to the page just appended to the
821 document.
823 """
824 reader_num_pages = len(reader.pages)
825 # Copy pages from reader to writer
826 for reader_page_number in range(reader_num_pages):
827 reader_page = reader.pages[reader_page_number]
828 writer_page = self.add_page(reader_page)
829 # Trigger callback, pass writer page as parameter
830 if callable(after_page_append):
831 after_page_append(writer_page)
833 def _merge_content_stream_to_page(
834 self,
835 page: PageObject,
836 new_content_data: bytes,
837 ) -> None:
838 """
839 Combines existing content stream(s) with new content (as bytes).
841 Args:
842 page: The page to which the new content data will be added.
843 new_content_data: A binary-encoded new content stream, for
844 instance the commands to draw an XObject.
845 """
846 # First resolve the existing page content. This always is an IndirectObject:
847 # PDF Explained by John Whitington
848 # https://www.oreilly.com/library/view/pdf-explained/9781449321581/ch04.html
849 if NameObject("/Contents") in page:
850 existing_content_ref = page[NameObject("/Contents")]
851 existing_content = existing_content_ref.get_object()
853 if isinstance(existing_content, ArrayObject):
854 # Create a new StreamObject for the new_content_data
855 new_stream_obj = StreamObject()
856 new_stream_obj.set_data(new_content_data)
857 existing_content.append(self._add_object(new_stream_obj))
858 page[NameObject("/Contents")] = self._add_object(existing_content)
859 if isinstance(existing_content, StreamObject):
860 # Merge new content to existing StreamObject
861 merged_data = existing_content.get_data() + b"\n" + new_content_data
862 new_stream = StreamObject()
863 new_stream.set_data(merged_data)
864 page[NameObject("/Contents")] = self._add_object(new_stream)
865 else:
866 # If no existing content, then we have an empty page.
867 # Create a new StreamObject in a new /Contents entry.
868 new_stream = StreamObject()
869 new_stream.set_data(new_content_data)
870 page[NameObject("/Contents")] = self._add_object(new_stream)
872 def _add_apstream_object(
873 self,
874 page: PageObject,
875 appearance_stream_obj: StreamObject,
876 object_name: str,
877 x_offset: float,
878 y_offset: float,
879 ) -> None:
880 """
881 Adds an appearance stream to the page content in the form of
882 an XObject.
884 Args:
885 page: The page to which to add the appearance stream.
886 appearance_stream_obj: The appearance stream.
887 object_name: The name of the appearance stream.
888 x_offset: The horizontal offset for the appearance stream.
889 y_offset: The vertical offset for the appearance stream.
890 """
891 # Prepare XObject resource dictionary on the page. This currently
892 # only deals with font resources, but can easily be adapted to also
893 # include other resources.
894 pg_res = cast(DictionaryObject, page[PG.RESOURCES])
895 if "/Resources" in appearance_stream_obj:
896 ap_stream_res = cast(DictionaryObject, appearance_stream_obj["/Resources"])
897 ap_stream_font_dict = cast(DictionaryObject, ap_stream_res.get("/Font", DictionaryObject()))
898 if "/Font" not in pg_res:
899 pg_res[NameObject("/Font")] = DictionaryObject()
900 pg_font_res = cast(DictionaryObject, pg_res["/Font"])
901 # Merge fonts from the appearance stream into the page's font resources
902 for font_name, font_ref in ap_stream_font_dict.items():
903 if font_name not in pg_font_res:
904 pg_font_res[font_name] = font_ref
905 # Always add the resolved stream object to the writer to get a new IndirectObject.
906 # This ensures we have a valid IndirectObject managed by *this* writer.
907 xobject_ref = self._add_object(appearance_stream_obj)
908 xobject_name = NameObject(f"/Fm_{object_name}")._sanitize()
909 if "/XObject" not in pg_res:
910 pg_res[NameObject("/XObject")] = DictionaryObject()
911 pg_xo_res = cast(DictionaryObject, pg_res["/XObject"])
912 if xobject_name not in pg_xo_res:
913 pg_xo_res[xobject_name] = xobject_ref
914 else:
915 logger_warning(
916 f"XObject {xobject_name!r} already added to page resources. This might be an issue.",
917 __name__
918 )
919 xobject_cm = Transformation().translate(x_offset, y_offset)
920 xobject_drawing_commands = f"q\n{xobject_cm._to_cm()}\n{xobject_name} Do\nQ".encode()
921 self._merge_content_stream_to_page(page, xobject_drawing_commands)
923 FFBITS_NUL = FA.FfBits(0)
925 def update_page_form_field_values(
926 self,
927 page: Union[PageObject, list[PageObject], None],
928 fields: Mapping[str, Union[str, list[str], tuple[str, str, float]]],
929 flags: FA.FfBits = FFBITS_NUL,
930 auto_regenerate: Optional[bool] = True,
931 flatten: bool = False,
932 ) -> None:
933 """
934 Update the form field values for a given page from a fields dictionary.
936 Copy field texts and values from fields to page.
937 If the field links to a parent object, add the information to the parent.
939 Args:
940 page: `PageObject` - references **PDF writer's page** where the
941 annotations and field data will be updated.
942 `List[Pageobject]` - provides list of pages to be processed.
943 `None` - all pages.
944 fields: a Python dictionary of:
946 * field names (/T) as keys and text values (/V) as value
947 * field names (/T) as keys and list of text values (/V) for multiple choice list
948 * field names (/T) as keys and tuple of:
949 * text values (/V)
950 * font id (e.g. /F1, the font id must exist)
951 * font size (0 for autosize)
953 flags: A set of flags from :class:`~pypdf.constants.FieldDictionaryAttributes.FfBits`.
955 auto_regenerate: Set/unset the need_appearances flag;
956 the flag is unchanged if auto_regenerate is None.
958 flatten: Whether or not to flatten the annotation. If True, this adds the annotation's
959 appearance stream to the page contents. Note that this option does not remove the
960 annotation itself.
962 """
963 if CatalogDictionary.ACRO_FORM not in self._root_object:
964 raise PyPdfError("No /AcroForm dictionary in PDF of PdfWriter Object")
965 acro_form = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
966 if InteractiveFormDictEntries.Fields not in acro_form:
967 raise PyPdfError("No /Fields dictionary in PDF of PdfWriter Object")
968 if isinstance(auto_regenerate, bool):
969 self.set_need_appearances_writer(auto_regenerate)
970 # Iterate through pages, update field values
971 if page is None:
972 page = list(self.pages)
973 if isinstance(page, list):
974 for p in page:
975 if PG.ANNOTS in p: # just to prevent warnings
976 self.update_page_form_field_values(p, fields, flags, None, flatten=flatten)
977 return
978 if PG.ANNOTS not in page:
979 logger_warning("No fields to update on this page", __name__)
980 return
981 for annotation in page[PG.ANNOTS]: # type: ignore
982 annotation = cast(DictionaryObject, annotation.get_object())
983 if annotation.get("/Subtype", "") != "/Widget":
984 continue
985 if "/FT" in annotation and "/T" in annotation:
986 parent_annotation = annotation
987 else:
988 parent_annotation = annotation.get(
989 PG.PARENT, DictionaryObject()
990 ).get_object()
992 for field, value in fields.items():
993 rectangle = cast(RectangleObject, annotation[AA.Rect])
994 if not (
995 self._get_qualified_field_name(parent_annotation) == field
996 or parent_annotation.get("/T", None) == field
997 ):
998 continue
999 if (
1000 parent_annotation.get("/FT", None) == "/Ch"
1001 and "/I" in parent_annotation
1002 ):
1003 del parent_annotation["/I"]
1004 if flags:
1005 annotation[NameObject(FA.Ff)] = NumberObject(flags)
1006 # Set the field value
1007 if not (value is None and flatten): # Only change values if given by user and not flattening.
1008 if isinstance(value, list):
1009 lst = ArrayObject(TextStringObject(v) for v in value)
1010 parent_annotation[NameObject(FA.V)] = lst
1011 elif isinstance(value, tuple):
1012 annotation[NameObject(FA.V)] = TextStringObject(
1013 value[0],
1014 )
1015 else:
1016 parent_annotation[NameObject(FA.V)] = TextStringObject(value)
1017 # Get or create the field's appearance stream object
1018 if parent_annotation.get(FA.FT) == "/Btn":
1019 # Checkbox button (no /FT found in Radio widgets);
1020 # We can find the associated appearance stream object
1021 # within the annotation.
1022 v = NameObject(value)
1023 ap = cast(DictionaryObject, annotation[NameObject(AA.AP)])
1024 normal_ap = cast(DictionaryObject, ap["/N"])
1025 if v not in normal_ap:
1026 v = NameObject("/Off")
1027 appearance_stream_obj = normal_ap.get(v)
1028 # Other cases will be updated through the for loop
1029 annotation[NameObject(AA.AS)] = v
1030 annotation[NameObject(FA.V)] = v
1031 elif (
1032 parent_annotation.get(FA.FT) == "/Tx"
1033 or parent_annotation.get(FA.FT) == "/Ch"
1034 ):
1035 # Textbox; we need to generate the appearance stream object
1036 if isinstance(value, tuple):
1037 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1038 acro_form, parent_annotation, annotation, value[1], value[2]
1039 )
1040 else:
1041 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1042 acro_form, parent_annotation, annotation
1043 )
1044 # Add the appearance stream object
1045 if AA.AP not in annotation:
1046 annotation[NameObject(AA.AP)] = DictionaryObject(
1047 {NameObject("/N"): self._add_object(appearance_stream_obj)}
1048 )
1049 elif "/N" not in (ap:= cast(DictionaryObject, annotation[AA.AP])):
1050 cast(DictionaryObject, annotation[NameObject(AA.AP)])[
1051 NameObject("/N")
1052 ] = self._add_object(appearance_stream_obj)
1053 else: # [/AP][/N] exists
1054 n = annotation[AA.AP]["/N"].indirect_reference.idnum # type: ignore
1055 self._objects[n - 1] = appearance_stream_obj
1056 appearance_stream_obj.indirect_reference = IndirectObject(n, 0, self)
1057 elif (
1058 annotation.get(FA.FT) == "/Sig"
1059 ): # deprecated # not implemented yet
1060 logger_warning("Signature forms not implemented yet", __name__)
1061 if flatten and appearance_stream_obj is not None:
1062 self._add_apstream_object(page, appearance_stream_obj, field, rectangle[0], rectangle[1])
1064 def reattach_fields(
1065 self, page: Optional[PageObject] = None
1066 ) -> list[DictionaryObject]:
1067 """
1068 Parse annotations within the page looking for orphan fields and
1069 reattach then into the Fields Structure.
1071 Args:
1072 page: page to analyze.
1073 If none is provided, all pages will be analyzed.
1075 Returns:
1076 list of reattached fields.
1078 """
1079 lst = []
1080 if page is None:
1081 for p in self.pages:
1082 lst += self.reattach_fields(p)
1083 return lst
1085 try:
1086 af = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
1087 except KeyError:
1088 af = DictionaryObject()
1089 self._root_object[NameObject(CatalogDictionary.ACRO_FORM)] = af
1090 try:
1091 fields = cast(ArrayObject, af[InteractiveFormDictEntries.Fields])
1092 except KeyError:
1093 fields = ArrayObject()
1094 af[NameObject(InteractiveFormDictEntries.Fields)] = fields
1096 if "/Annots" not in page:
1097 return lst
1098 annotations = cast(ArrayObject, page["/Annots"])
1099 for idx, annotation in enumerate(annotations):
1100 is_indirect = isinstance(annotation, IndirectObject)
1101 annotation = cast(DictionaryObject, annotation.get_object())
1102 if annotation.get("/Subtype", "") == "/Widget" and "/FT" in annotation:
1103 if (
1104 "indirect_reference" in annotation.__dict__
1105 and annotation.indirect_reference in fields
1106 ):
1107 continue
1108 if not is_indirect:
1109 annotations[idx] = self._add_object(annotation)
1110 fields.append(annotation.indirect_reference)
1111 lst.append(annotation)
1112 return lst
1114 def clone_reader_document_root(self, reader: PdfReader) -> None:
1115 """
1116 Copy the reader document root to the writer and all sub-elements,
1117 including pages, threads, outlines,... For partial insertion, ``append``
1118 should be considered.
1120 Args:
1121 reader: PdfReader from which the document root should be copied.
1123 """
1124 self._info_obj = None
1125 if self.incremental:
1126 self._objects = [None] * (cast(int, reader.trailer["/Size"]) - 1)
1127 for i in range(len(self._objects)):
1128 o = reader.get_object(i + 1)
1129 if o is not None:
1130 self._objects[i] = o.replicate(self)
1131 else:
1132 self._objects.clear()
1133 self._root_object = reader.root_object.clone(self)
1134 self._pages = self._root_object.raw_get("/Pages")
1136 if len(self._objects) > cast(int, reader.trailer["/Size"]):
1137 if self.strict:
1138 raise PdfReadError(
1139 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}"
1140 )
1141 logger_warning(
1142 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}",
1143 __name__
1144 )
1146 # must be done here before rewriting
1147 if self.incremental:
1148 self._original_hash = [
1149 (obj.hash_bin() if obj is not None else 0) for obj in self._objects
1150 ]
1152 try:
1153 self._flatten()
1154 except IndexError:
1155 raise PdfReadError("Got index error while flattening.")
1157 assert self.flattened_pages is not None
1158 for p in self.flattened_pages:
1159 self._replace_object(cast(IndirectObject, p.indirect_reference).idnum, p)
1160 if not self.incremental:
1161 p[NameObject("/Parent")] = self._pages
1162 if not self.incremental:
1163 cast(DictionaryObject, self._pages.get_object())[
1164 NameObject("/Kids")
1165 ] = ArrayObject([p.indirect_reference for p in self.flattened_pages])
1167 def clone_document_from_reader(
1168 self,
1169 reader: PdfReader,
1170 after_page_append: Optional[Callable[[PageObject], None]] = None,
1171 ) -> None:
1172 """
1173 Create a copy (clone) of a document from a PDF file reader cloning
1174 section '/Root' and '/Info' and '/ID' of the pdf.
1176 Args:
1177 reader: PDF file reader instance from which the clone
1178 should be created.
1179 after_page_append:
1180 Callback function that is invoked after each page is appended to
1181 the writer. Signature includes a reference to the appended page
1182 (delegates to append_pages_from_reader). The single parameter of
1183 the callback is a reference to the page just appended to the
1184 document.
1186 """
1187 self.clone_reader_document_root(reader)
1188 inf = reader._info
1189 if self.incremental:
1190 if inf is not None:
1191 self._info_obj = cast(
1192 IndirectObject, inf.clone(self).indirect_reference
1193 )
1194 assert isinstance(self._info, DictionaryObject), "for mypy"
1195 self._original_hash[
1196 self._info_obj.indirect_reference.idnum - 1
1197 ] = self._info.hash_bin()
1198 elif inf is not None:
1199 self._info_obj = self._add_object(
1200 DictionaryObject(cast(DictionaryObject, inf.get_object()))
1201 )
1202 # else: _info_obj = None done in clone_reader_document_root()
1204 try:
1205 self._ID = cast(ArrayObject, reader._ID).clone(self)
1206 except AttributeError:
1207 pass
1209 if callable(after_page_append):
1210 for page in cast(
1211 ArrayObject, cast(DictionaryObject, self._pages.get_object())["/Kids"]
1212 ):
1213 after_page_append(page.get_object())
1215 def _compute_document_identifier(self) -> ByteStringObject:
1216 stream = BytesIO()
1217 self._write_pdf_structure(stream)
1218 stream.seek(0)
1219 return ByteStringObject(_rolling_checksum(stream).encode("utf8"))
1221 def generate_file_identifiers(self) -> None:
1222 """
1223 Generate an identifier for the PDF that will be written.
1225 The only point of this is ensuring uniqueness. Reproducibility is not
1226 required.
1227 When a file is first written, both identifiers shall be set to the same value.
1228 If both identifiers match when a file reference is resolved, it is very
1229 likely that the correct and unchanged file has been found. If only the first
1230 identifier matches, a different version of the correct file has been found.
1231 see §14.4 "File Identifiers".
1232 """
1233 if self._ID:
1234 id1 = self._ID[0]
1235 id2 = self._compute_document_identifier()
1236 else:
1237 id1 = self._compute_document_identifier()
1238 id2 = id1
1239 self._ID = ArrayObject((id1, id2))
1241 def encrypt(
1242 self,
1243 user_password: str,
1244 owner_password: Optional[str] = None,
1245 use_128bit: bool = True,
1246 permissions_flag: UserAccessPermissions = ALL_DOCUMENT_PERMISSIONS,
1247 *,
1248 algorithm: Optional[str] = None,
1249 ) -> None:
1250 """
1251 Encrypt this PDF file with the PDF Standard encryption handler.
1253 Args:
1254 user_password: The password which allows for opening
1255 and reading the PDF file with the restrictions provided.
1256 owner_password: The password which allows for
1257 opening the PDF files without any restrictions. By default,
1258 the owner password is the same as the user password.
1259 use_128bit: flag as to whether to use 128bit
1260 encryption. When false, 40bit encryption will be used.
1261 By default, this flag is on.
1262 permissions_flag: permissions as described in
1263 Table 3.20 of the PDF 1.7 specification. A bit value of 1 means
1264 the permission is granted.
1265 Hence an integer value of -1 will set all flags.
1266 Bit position 3 is for printing, 4 is for modifying content,
1267 5 and 6 control annotations, 9 for form fields,
1268 10 for extraction of text and graphics.
1269 algorithm: encrypt algorithm. Values may be one of "RC4-40", "RC4-128",
1270 "AES-128", "AES-256-R5", "AES-256". If it is valid,
1271 `use_128bit` will be ignored.
1273 """
1274 if owner_password is None:
1275 owner_password = user_password
1277 if algorithm is not None:
1278 try:
1279 alg = getattr(EncryptAlgorithm, algorithm.replace("-", "_"))
1280 except AttributeError:
1281 raise ValueError(f"Algorithm '{algorithm}' NOT supported")
1282 else:
1283 alg = EncryptAlgorithm.RC4_128
1284 if not use_128bit:
1285 alg = EncryptAlgorithm.RC4_40
1286 self.generate_file_identifiers()
1287 assert self._ID
1288 self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
1289 # in case call `encrypt` again
1290 entry = self._encryption.write_entry(user_password, owner_password)
1291 if self._encrypt_entry:
1292 # replace old encrypt_entry
1293 assert self._encrypt_entry.indirect_reference is not None
1294 entry.indirect_reference = self._encrypt_entry.indirect_reference
1295 self._objects[entry.indirect_reference.idnum - 1] = entry
1296 else:
1297 self._add_object(entry)
1298 self._encrypt_entry = entry
1300 def _resolve_links(self) -> None:
1301 """Patch up links that were added to the document earlier, to
1302 make sure they still point to the same pages.
1303 """
1304 for (new_link, old_link) in self._unresolved_links:
1305 old_page = old_link.find_referenced_page()
1306 if not old_page:
1307 continue
1308 new_page = self._merged_in_pages.get(old_page)
1309 if new_page is None:
1310 continue
1311 new_link.patch_reference(self, new_page)
1313 def write_stream(self, stream: StreamType) -> None:
1314 if hasattr(stream, "mode") and "b" not in stream.mode:
1315 logger_warning(
1316 f"File <{stream.name}> to write to is not in binary mode. "
1317 "It may not be written to correctly.",
1318 __name__,
1319 )
1320 self._resolve_links()
1322 if self.incremental:
1323 self._reader.stream.seek(0)
1324 stream.write(self._reader.stream.read(-1))
1325 if len(self.list_objects_in_increment()) > 0:
1326 self._write_increment(stream) # writes objs, xref stream and startxref
1327 else:
1328 object_positions, free_objects = self._write_pdf_structure(stream)
1329 xref_location = self._write_xref_table(
1330 stream, object_positions, free_objects
1331 )
1332 self._write_trailer(stream, xref_location)
1334 def write(self, stream: Union[Path, StrByteType]) -> tuple[bool, IO[Any]]:
1335 """
1336 Write the collection of pages added to this object out as a PDF file.
1338 Args:
1339 stream: An object to write the file to. The object can support
1340 the write method and the tell method, similar to a file object, or
1341 be a file path, just like the fileobj, just named it stream to keep
1342 existing workflow.
1344 Returns:
1345 A tuple (bool, IO).
1347 """
1348 my_file = False
1350 if stream == "":
1351 raise ValueError(f"Output({stream=}) is empty.")
1353 if isinstance(stream, (str, Path)):
1354 stream = FileIO(stream, "wb")
1355 my_file = True
1357 self.write_stream(stream)
1359 if my_file:
1360 stream.close()
1361 else:
1362 stream.flush()
1364 return my_file, stream
1366 def list_objects_in_increment(self) -> list[IndirectObject]:
1367 """
1368 For analysis or debugging.
1369 Provides the list of new or modified objects that will be written
1370 in the increment.
1371 Deleted objects will not be freed but will become orphans.
1373 Returns:
1374 List of new or modified IndirectObjects
1376 """
1377 original_hash_count = len(self._original_hash)
1378 return [
1379 cast(IndirectObject, obj).indirect_reference
1380 for i, obj in enumerate(self._objects)
1381 if (
1382 obj is not None
1383 and (
1384 i >= original_hash_count
1385 or obj.hash_bin() != self._original_hash[i]
1386 )
1387 )
1388 ]
1390 def _write_increment(self, stream: StreamType) -> None:
1391 object_positions = {}
1392 object_blocks = []
1393 current_start = -1
1394 current_stop = -2
1395 original_hash_count = len(self._original_hash)
1396 for i, obj in enumerate(self._objects):
1397 if obj is not None and (
1398 i >= original_hash_count
1399 or obj.hash_bin() != self._original_hash[i]
1400 ):
1401 idnum = i + 1
1402 assert isinstance(obj, PdfObject), "mypy"
1403 # first write new/modified object
1404 object_positions[idnum] = stream.tell()
1405 stream.write(f"{idnum} 0 obj\n".encode())
1406 """ encryption is not operational
1407 if self._encryption and obj != self._encrypt_entry:
1408 obj = self._encryption.encrypt_object(obj, idnum, 0)
1409 """
1410 obj.write_to_stream(stream)
1411 stream.write(b"\nendobj\n")
1413 # prepare xref
1414 if idnum != current_stop:
1415 if current_start > 0:
1416 object_blocks.append(
1417 [current_start, current_stop - current_start]
1418 )
1419 current_start = idnum
1420 current_stop = idnum + 1
1421 assert current_start > 0, "for pytest only"
1422 object_blocks.append([current_start, current_stop - current_start])
1423 # write incremented xref
1424 xref_location = stream.tell()
1425 xr_id = len(self._objects) + 1
1426 stream.write(f"{xr_id} 0 obj".encode())
1427 init_data = {
1428 NameObject("/Type"): NameObject("/XRef"),
1429 NameObject("/Size"): NumberObject(xr_id + 1),
1430 NameObject("/Root"): self.root_object.indirect_reference,
1431 NameObject("/Filter"): NameObject("/FlateDecode"),
1432 NameObject("/Index"): ArrayObject(
1433 [NumberObject(_it) for _su in object_blocks for _it in _su]
1434 ),
1435 NameObject("/W"): ArrayObject(
1436 [NumberObject(1), NumberObject(4), NumberObject(1)]
1437 ),
1438 "__streamdata__": b"",
1439 }
1440 if self._info is not None and (
1441 self._info.indirect_reference.idnum - 1 # type: ignore
1442 >= len(self._original_hash)
1443 or cast(IndirectObject, self._info).hash_bin() # kept for future
1444 != self._original_hash[
1445 self._info.indirect_reference.idnum - 1 # type: ignore
1446 ]
1447 ):
1448 init_data[NameObject(TK.INFO)] = self._info.indirect_reference
1449 init_data[NameObject(TK.PREV)] = NumberObject(self._reader._startxref)
1450 if self._ID:
1451 init_data[NameObject(TK.ID)] = self._ID
1452 xr = StreamObject.initialize_from_dictionary(init_data)
1453 xr.set_data(
1454 b"".join(
1455 [struct.pack(b">BIB", 1, _pos, 0) for _pos in object_positions.values()]
1456 )
1457 )
1458 xr.write_to_stream(stream)
1459 stream.write(f"\nendobj\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1461 def _write_pdf_structure(self, stream: StreamType) -> tuple[list[int], list[int]]:
1462 object_positions = []
1463 free_objects = []
1464 stream.write(self.pdf_header.encode() + b"\n")
1465 stream.write(b"%\xE2\xE3\xCF\xD3\n")
1467 for idnum, obj in enumerate(self._objects, start=1):
1468 if obj is not None:
1469 object_positions.append(stream.tell())
1470 stream.write(f"{idnum} 0 obj\n".encode())
1471 if self._encryption and obj != self._encrypt_entry:
1472 obj = self._encryption.encrypt_object(obj, idnum, 0)
1473 obj.write_to_stream(stream)
1474 stream.write(b"\nendobj\n")
1475 else:
1476 object_positions.append(-1)
1477 free_objects.append(idnum)
1478 free_objects.append(0) # add 0 to loop in accordance with specification
1479 return object_positions, free_objects
1481 def _write_xref_table(
1482 self, stream: StreamType, object_positions: list[int], free_objects: list[int]
1483 ) -> int:
1484 xref_location = stream.tell()
1485 stream.write(b"xref\n")
1486 stream.write(f"0 {len(self._objects) + 1}\n".encode())
1487 stream.write(f"{free_objects[0]:0>10} {65535:0>5} f \n".encode())
1488 free_idx = 1
1489 for offset in object_positions:
1490 if offset > 0:
1491 stream.write(f"{offset:0>10} {0:0>5} n \n".encode())
1492 else:
1493 stream.write(f"{free_objects[free_idx]:0>10} {1:0>5} f \n".encode())
1494 free_idx += 1
1495 return xref_location
1497 def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
1498 """
1499 Write the PDF trailer to the stream.
1501 To quote the PDF specification:
1502 [The] trailer [gives] the location of the cross-reference table and
1503 of certain special objects within the body of the file.
1504 """
1505 stream.write(b"trailer\n")
1506 trailer = DictionaryObject(
1507 {
1508 NameObject(TK.SIZE): NumberObject(len(self._objects) + 1),
1509 NameObject(TK.ROOT): self.root_object.indirect_reference,
1510 }
1511 )
1512 if self._info is not None:
1513 trailer[NameObject(TK.INFO)] = self._info.indirect_reference
1514 if self._ID is not None:
1515 trailer[NameObject(TK.ID)] = self._ID
1516 if self._encrypt_entry:
1517 trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
1518 trailer.write_to_stream(stream)
1519 stream.write(f"\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1521 @property
1522 def metadata(self) -> Optional[DocumentInformation]:
1523 """
1524 Retrieve/set the PDF file's document information dictionary, if it exists.
1526 Args:
1527 value: dict with the entries to be set. if None : remove the /Info entry from the pdf.
1529 Note that some PDF files use (XMP) metadata streams instead of document
1530 information dictionaries, and these metadata streams will not be
1531 accessed by this function, but by :meth:`~xmp_metadata`.
1533 """
1534 return super().metadata
1536 @metadata.setter
1537 def metadata(
1538 self,
1539 value: Optional[Union[DocumentInformation, DictionaryObject, dict[Any, Any]]],
1540 ) -> None:
1541 if value is None:
1542 self._info = None
1543 else:
1544 if self._info is not None:
1545 self._info.clear()
1547 self.add_metadata(value)
1549 def add_metadata(self, infos: dict[str, Any]) -> None:
1550 """
1551 Add custom metadata to the output.
1553 Args:
1554 infos: a Python dictionary where each key is a field
1555 and each value is your new metadata.
1557 """
1558 args = {}
1559 if isinstance(infos, PdfObject):
1560 infos = cast(DictionaryObject, infos.get_object())
1561 for key, value in list(infos.items()):
1562 if isinstance(value, PdfObject):
1563 value = value.get_object()
1564 args[NameObject(key)] = create_string_object(str(value))
1565 if self._info is None:
1566 self._info = DictionaryObject()
1567 self._info.update(args)
1569 def compress_identical_objects(
1570 self,
1571 remove_identicals: bool = True,
1572 remove_orphans: bool = True,
1573 ) -> None:
1574 """
1575 Parse the PDF file and merge objects that have the same hash.
1576 This will make objects common to multiple pages.
1577 Recommended to be used just before writing output.
1579 Args:
1580 remove_identicals: Remove identical objects.
1581 remove_orphans: Remove unreferenced objects.
1583 """
1585 def replace_in_obj(
1586 obj: PdfObject, crossref: dict[IndirectObject, IndirectObject]
1587 ) -> None:
1588 if isinstance(obj, DictionaryObject):
1589 key_val = obj.items()
1590 elif isinstance(obj, ArrayObject):
1591 key_val = enumerate(obj) # type: ignore
1592 else:
1593 return
1594 assert isinstance(obj, (DictionaryObject, ArrayObject))
1595 for k, v in key_val:
1596 if isinstance(v, IndirectObject):
1597 orphans[v.idnum - 1] = False
1598 if v in crossref:
1599 obj[k] = crossref[v]
1600 else:
1601 """the filtering on DictionaryObject and ArrayObject only
1602 will be performed within replace_in_obj"""
1603 replace_in_obj(v, crossref)
1605 # _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...])
1606 self._idnum_hash = {}
1607 orphans = [True] * len(self._objects)
1608 # look for similar objects
1609 for idx, obj in enumerate(self._objects):
1610 if is_null_or_none(obj):
1611 continue
1612 assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here.
1613 assert isinstance(obj.indirect_reference, IndirectObject)
1614 h = obj.hash_value()
1615 if remove_identicals and h in self._idnum_hash:
1616 self._idnum_hash[h][1].append(obj.indirect_reference)
1617 self._objects[idx] = None
1618 else:
1619 self._idnum_hash[h] = (obj.indirect_reference, [])
1621 # generate the dict converting others to 1st
1622 cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0}
1623 cnv_rev: dict[IndirectObject, IndirectObject] = {}
1624 for k, v in cnv.items():
1625 cnv_rev.update(zip(v, (k,) * len(v)))
1627 # replace reference to merged objects
1628 for obj in self._objects:
1629 if isinstance(obj, (DictionaryObject, ArrayObject)):
1630 replace_in_obj(obj, cnv_rev)
1632 # remove orphans (if applicable)
1633 orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore
1635 orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore
1637 try:
1638 orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore
1639 except AttributeError:
1640 pass
1641 for i in compress(range(len(self._objects)), orphans):
1642 self._objects[i] = None
1644 def get_reference(self, obj: PdfObject) -> IndirectObject:
1645 idnum = self._objects.index(obj) + 1
1646 ref = IndirectObject(idnum, 0, self)
1647 assert ref.get_object() == obj
1648 return ref
1650 def get_outline_root(self) -> TreeObject:
1651 if CO.OUTLINES in self._root_object:
1652 # Entries in the catalog dictionary
1653 outline = cast(TreeObject, self._root_object[CO.OUTLINES])
1654 if not isinstance(outline, TreeObject):
1655 t = TreeObject(outline)
1656 self._replace_object(outline.indirect_reference.idnum, t)
1657 outline = t
1658 idnum = self._objects.index(outline) + 1
1659 outline_ref = IndirectObject(idnum, 0, self)
1660 assert outline_ref.get_object() == outline
1661 else:
1662 outline = TreeObject()
1663 outline.update({})
1664 outline_ref = self._add_object(outline)
1665 self._root_object[NameObject(CO.OUTLINES)] = outline_ref
1667 return outline
1669 def get_threads_root(self) -> ArrayObject:
1670 """
1671 The list of threads.
1673 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1675 Returns:
1676 An array (possibly empty) of Dictionaries with an ``/F`` key,
1677 and optionally information about the thread in ``/I`` or ``/Metadata`` keys.
1679 """
1680 if CO.THREADS in self._root_object:
1681 # Entries in the catalog dictionary
1682 threads = cast(ArrayObject, self._root_object[CO.THREADS])
1683 else:
1684 threads = ArrayObject()
1685 self._root_object[NameObject(CO.THREADS)] = threads
1686 return threads
1688 @property
1689 def threads(self) -> ArrayObject:
1690 """
1691 Read-only property for the list of threads.
1693 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1695 Each element is a dictionary with an ``/F`` key, and optionally
1696 information about the thread in ``/I`` or ``/Metadata`` keys.
1697 """
1698 return self.get_threads_root()
1700 def add_outline_item_destination(
1701 self,
1702 page_destination: Union[IndirectObject, PageObject, TreeObject],
1703 parent: Union[None, TreeObject, IndirectObject] = None,
1704 before: Union[None, TreeObject, IndirectObject] = None,
1705 is_open: bool = True,
1706 ) -> IndirectObject:
1707 page_destination = cast(PageObject, page_destination.get_object())
1708 if isinstance(page_destination, PageObject):
1709 return self.add_outline_item_destination(
1710 Destination(
1711 f"page #{page_destination.page_number}",
1712 cast(IndirectObject, page_destination.indirect_reference),
1713 Fit.fit(),
1714 )
1715 )
1717 if parent is None:
1718 parent = self.get_outline_root()
1720 page_destination[NameObject("/%is_open%")] = BooleanObject(is_open)
1721 parent = cast(TreeObject, parent.get_object())
1722 page_destination_ref = self._add_object(page_destination)
1723 if before is not None:
1724 before = before.indirect_reference
1725 parent.insert_child(
1726 page_destination_ref,
1727 before,
1728 self,
1729 page_destination.inc_parent_counter_outline
1730 if is_open
1731 else (lambda x, y: 0), # noqa: ARG005
1732 )
1733 if "/Count" not in page_destination:
1734 page_destination[NameObject("/Count")] = NumberObject(0)
1736 return page_destination_ref
1738 def add_outline_item_dict(
1739 self,
1740 outline_item: OutlineItemType,
1741 parent: Union[None, TreeObject, IndirectObject] = None,
1742 before: Union[None, TreeObject, IndirectObject] = None,
1743 is_open: bool = True,
1744 ) -> IndirectObject:
1745 outline_item_object = TreeObject()
1746 outline_item_object.update(outline_item)
1748 """code currently unreachable
1749 if "/A" in outline_item:
1750 action = DictionaryObject()
1751 a_dict = cast(DictionaryObject, outline_item["/A"])
1752 for k, v in list(a_dict.items()):
1753 action[NameObject(str(k))] = v
1754 action_ref = self._add_object(action)
1755 outline_item_object[NameObject("/A")] = action_ref
1756 """
1757 return self.add_outline_item_destination(
1758 outline_item_object, parent, before, is_open
1759 )
1761 def add_outline_item(
1762 self,
1763 title: str,
1764 page_number: Union[None, PageObject, IndirectObject, int],
1765 parent: Union[None, TreeObject, IndirectObject] = None,
1766 before: Union[None, TreeObject, IndirectObject] = None,
1767 color: Optional[Union[tuple[float, float, float], str]] = None,
1768 bold: bool = False,
1769 italic: bool = False,
1770 fit: Fit = PAGE_FIT,
1771 is_open: bool = True,
1772 ) -> IndirectObject:
1773 """
1774 Add an outline item (commonly referred to as a "Bookmark") to the PDF file.
1776 Args:
1777 title: Title to use for this outline item.
1778 page_number: Page number this outline item will point to.
1779 parent: A reference to a parent outline item to create nested
1780 outline items.
1781 before:
1782 color: Color of the outline item's font as a red, green, blue tuple
1783 from 0.0 to 1.0 or as a Hex String (#RRGGBB)
1784 bold: Outline item font is bold
1785 italic: Outline item font is italic
1786 fit: The fit of the destination page.
1788 Returns:
1789 The added outline item as an indirect object.
1791 """
1792 page_ref: Union[None, NullObject, IndirectObject, NumberObject]
1793 if isinstance(italic, Fit): # it means that we are on the old params
1794 if fit is not None and page_number is None:
1795 page_number = fit
1796 return self.add_outline_item(
1797 title, page_number, parent, None, before, color, bold, italic, is_open=is_open
1798 )
1799 if page_number is None:
1800 action_ref = None
1801 else:
1802 if isinstance(page_number, IndirectObject):
1803 page_ref = page_number
1804 elif isinstance(page_number, PageObject):
1805 page_ref = page_number.indirect_reference
1806 elif isinstance(page_number, int):
1807 try:
1808 page_ref = self.pages[page_number].indirect_reference
1809 except IndexError:
1810 page_ref = NumberObject(page_number)
1811 if page_ref is None:
1812 logger_warning(
1813 f"can not find reference of page {page_number}",
1814 __name__,
1815 )
1816 page_ref = NullObject()
1817 dest = Destination(
1818 NameObject("/" + title + " outline item"),
1819 page_ref,
1820 fit,
1821 )
1823 action_ref = self._add_object(
1824 DictionaryObject(
1825 {
1826 NameObject(GoToActionArguments.D): dest.dest_array,
1827 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1828 }
1829 )
1830 )
1831 outline_item = self._add_object(
1832 _create_outline_item(action_ref, title, color, italic, bold)
1833 )
1835 if parent is None:
1836 parent = self.get_outline_root()
1837 return self.add_outline_item_destination(outline_item, parent, before, is_open)
1839 def add_outline(self) -> None:
1840 raise NotImplementedError(
1841 "This method is not yet implemented. Use :meth:`add_outline_item` instead."
1842 )
1844 def add_named_destination_array(
1845 self, title: TextStringObject, destination: Union[IndirectObject, ArrayObject]
1846 ) -> None:
1847 named_dest = self.get_named_dest_root()
1848 i = 0
1849 while i < len(named_dest):
1850 if title < named_dest[i]:
1851 named_dest.insert(i, destination)
1852 named_dest.insert(i, TextStringObject(title))
1853 return
1854 i += 2
1855 named_dest.extend([TextStringObject(title), destination])
1856 return
1858 def add_named_destination_object(
1859 self,
1860 page_destination: PdfObject,
1861 ) -> IndirectObject:
1862 page_destination_ref = self._add_object(page_destination.dest_array) # type: ignore
1863 self.add_named_destination_array(
1864 cast("TextStringObject", page_destination["/Title"]), page_destination_ref # type: ignore
1865 )
1867 return page_destination_ref
1869 def add_named_destination(
1870 self,
1871 title: str,
1872 page_number: int,
1873 ) -> IndirectObject:
1874 page_ref = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
1875 dest = DictionaryObject()
1876 dest.update(
1877 {
1878 NameObject(GoToActionArguments.D): ArrayObject(
1879 [page_ref, NameObject(TypFitArguments.FIT_H), NumberObject(826)]
1880 ),
1881 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1882 }
1883 )
1885 dest_ref = self._add_object(dest)
1886 if not isinstance(title, TextStringObject):
1887 title = TextStringObject(str(title))
1889 self.add_named_destination_array(title, dest_ref)
1890 return dest_ref
1892 def remove_links(self) -> None:
1893 """Remove links and annotations from this output."""
1894 for page in self.pages:
1895 self.remove_objects_from_page(page, ObjectDeletionFlag.ALL_ANNOTATIONS)
1897 def remove_annotations(
1898 self, subtypes: Optional[Union[AnnotationSubtype, Iterable[AnnotationSubtype]]]
1899 ) -> None:
1900 """
1901 Remove annotations by annotation subtype.
1903 Args:
1904 subtypes: subtype or list of subtypes to be removed.
1905 Examples are: "/Link", "/FileAttachment", "/Sound",
1906 "/Movie", "/Screen", ...
1907 If you want to remove all annotations, use subtypes=None.
1909 """
1910 for page in self.pages:
1911 self._remove_annots_from_page(page, subtypes)
1913 def _remove_annots_from_page(
1914 self,
1915 page: Union[IndirectObject, PageObject, DictionaryObject],
1916 subtypes: Optional[Iterable[str]],
1917 ) -> None:
1918 page = cast(DictionaryObject, page.get_object())
1919 if PG.ANNOTS in page:
1920 i = 0
1921 while i < len(cast(ArrayObject, page[PG.ANNOTS])):
1922 an = cast(ArrayObject, page[PG.ANNOTS])[i]
1923 obj = cast(DictionaryObject, an.get_object())
1924 if subtypes is None or cast(str, obj["/Subtype"]) in subtypes:
1925 if isinstance(an, IndirectObject):
1926 self._objects[an.idnum - 1] = NullObject() # to reduce PDF size
1927 del page[PG.ANNOTS][i] # type:ignore
1928 else:
1929 i += 1
1931 def remove_objects_from_page(
1932 self,
1933 page: Union[PageObject, DictionaryObject],
1934 to_delete: Union[ObjectDeletionFlag, Iterable[ObjectDeletionFlag]],
1935 text_filters: Optional[dict[str, Any]] = None
1936 ) -> None:
1937 """
1938 Remove objects specified by ``to_delete`` from the given page.
1940 Args:
1941 page: Page object to clean up.
1942 to_delete: Objects to be deleted; can be a ``ObjectDeletionFlag``
1943 or a list of ObjectDeletionFlag
1944 text_filters: Properties of text to be deleted, if applicable. Optional.
1945 This is a Python dictionary with the following properties:
1947 * font_ids: List of font resource IDs (such as /F1 or /T1_0) to be deleted.
1949 """
1950 if isinstance(to_delete, (list, tuple)):
1951 for to_d in to_delete:
1952 self.remove_objects_from_page(page, to_d)
1953 return None
1954 assert isinstance(to_delete, ObjectDeletionFlag)
1956 if to_delete & ObjectDeletionFlag.LINKS:
1957 return self._remove_annots_from_page(page, ("/Link",))
1958 if to_delete & ObjectDeletionFlag.ATTACHMENTS:
1959 return self._remove_annots_from_page(
1960 page, ("/FileAttachment", "/Sound", "/Movie", "/Screen")
1961 )
1962 if to_delete & ObjectDeletionFlag.OBJECTS_3D:
1963 return self._remove_annots_from_page(page, ("/3D",))
1964 if to_delete & ObjectDeletionFlag.ALL_ANNOTATIONS:
1965 return self._remove_annots_from_page(page, None)
1967 jump_operators = []
1968 if to_delete & ObjectDeletionFlag.DRAWING_IMAGES:
1969 jump_operators = (
1970 [
1971 b"w", b"J", b"j", b"M", b"d", b"i",
1972 b"W", b"W*",
1973 b"b", b"b*", b"B", b"B*", b"S", b"s", b"f", b"f*", b"F", b"n",
1974 b"m", b"l", b"c", b"v", b"y", b"h", b"re",
1975 b"sh"
1976 ]
1977 )
1978 if to_delete & ObjectDeletionFlag.TEXT:
1979 jump_operators = [b"Tj", b"TJ", b"'", b'"']
1981 def clean(
1982 content: ContentStream,
1983 images: list[str],
1984 forms: list[str],
1985 text_filters: Optional[dict[str, Any]] = None
1986 ) -> None:
1987 nonlocal jump_operators, to_delete
1989 font_id = None
1990 font_ids_to_delete = []
1991 if text_filters and to_delete & ObjectDeletionFlag.TEXT:
1992 font_ids_to_delete = text_filters.get("font_ids", [])
1994 i = 0
1995 while i < len(content.operations):
1996 operands, operator = content.operations[i]
1997 if operator == b"Tf":
1998 font_id = operands[0]
1999 if (
2000 (
2001 operator == b"INLINE IMAGE"
2002 and (to_delete & ObjectDeletionFlag.INLINE_IMAGES)
2003 )
2004 or (operator in jump_operators)
2005 or (
2006 operator == b"Do"
2007 and (to_delete & ObjectDeletionFlag.XOBJECT_IMAGES)
2008 and (operands[0] in images)
2009 )
2010 ):
2011 if (
2012 not to_delete & ObjectDeletionFlag.TEXT
2013 or (to_delete & ObjectDeletionFlag.TEXT and not text_filters)
2014 or (to_delete & ObjectDeletionFlag.TEXT and font_id in font_ids_to_delete)
2015 ):
2016 del content.operations[i]
2017 else:
2018 i += 1
2019 else:
2020 i += 1
2021 content.get_data() # this ensures ._data is rebuilt from the .operations
2023 def clean_forms(
2024 elt: DictionaryObject, stack: list[DictionaryObject]
2025 ) -> tuple[list[str], list[str]]:
2026 nonlocal to_delete
2027 # elt in recursive call is a new ContentStream object, so we have to check the indirect_reference
2028 if (elt in stack) or (
2029 hasattr(elt, "indirect_reference")
2030 and any(
2031 elt.indirect_reference == getattr(x, "indirect_reference", -1)
2032 for x in stack
2033 )
2034 ):
2035 # to prevent infinite looping
2036 return [], [] # pragma: no cover
2037 try:
2038 d = cast(
2039 dict[Any, Any],
2040 cast(DictionaryObject, elt["/Resources"])["/XObject"],
2041 )
2042 except KeyError:
2043 d = {}
2044 images = []
2045 forms = []
2046 for k, v in d.items():
2047 o = v.get_object()
2048 try:
2049 content: Any = None
2050 if (
2051 to_delete & ObjectDeletionFlag.XOBJECT_IMAGES
2052 and o["/Subtype"] == "/Image"
2053 ):
2054 content = NullObject() # to delete the image keeping the entry
2055 images.append(k)
2056 if o["/Subtype"] == "/Form":
2057 forms.append(k)
2058 if isinstance(o, ContentStream):
2059 content = o
2060 else:
2061 content = ContentStream(o, self)
2062 content.update(
2063 {
2064 k1: v1
2065 for k1, v1 in o.items()
2066 if k1 not in ["/Length", "/Filter", "/DecodeParms"]
2067 }
2068 )
2069 try:
2070 content.indirect_reference = o.indirect_reference
2071 except AttributeError: # pragma: no cover
2072 pass
2073 stack.append(elt)
2074 clean_forms(content, stack) # clean subforms
2075 if content is not None:
2076 if isinstance(v, IndirectObject):
2077 self._objects[v.idnum - 1] = content
2078 else:
2079 # should only occur in a PDF not respecting PDF spec
2080 # where streams must be indirected.
2081 d[k] = self._add_object(content) # pragma: no cover
2082 except (TypeError, KeyError):
2083 pass
2084 for im in images:
2085 del d[im] # for clean-up
2086 if isinstance(elt, StreamObject): # for /Form
2087 if not isinstance(elt, ContentStream): # pragma: no cover
2088 e = ContentStream(elt, self)
2089 e.update(elt.items())
2090 elt = e
2091 clean(elt, images, forms, text_filters) # clean the content
2092 return images, forms
2094 if not isinstance(page, PageObject):
2095 page = PageObject(self, page.indirect_reference) # pragma: no cover
2096 if "/Contents" in page:
2097 content = cast(ContentStream, page.get_contents())
2099 images, forms = clean_forms(page, [])
2101 clean(content, images, forms, text_filters)
2102 page.replace_contents(content)
2103 return [], [] # type: ignore[return-value]
2105 def remove_images(
2106 self,
2107 to_delete: ImageType = ImageType.ALL,
2108 ) -> None:
2109 """
2110 Remove images from this output.
2112 Args:
2113 to_delete: The type of images to be deleted
2114 (default = all images types)
2116 """
2117 if isinstance(to_delete, bool):
2118 to_delete = ImageType.ALL
2120 i = ObjectDeletionFlag.NONE
2122 for image in ("XOBJECT_IMAGES", "INLINE_IMAGES", "DRAWING_IMAGES"):
2123 if to_delete & ImageType[image]:
2124 i |= ObjectDeletionFlag[image]
2126 for page in self.pages:
2127 self.remove_objects_from_page(page, i)
2129 def remove_text(self, font_names: Optional[list[str]] = None) -> None:
2130 """
2131 Remove text from the PDF.
2133 Args:
2134 font_names: List of font names to remove, such as "Helvetica-Bold".
2135 Optional. If not specified, all text will be removed.
2136 """
2137 if not font_names:
2138 font_names = []
2140 for page in self.pages:
2141 resource_ids_to_remove = []
2143 # Content streams reference fonts and other resources with names like "/F1" or "/T1_0"
2144 # Font names need to be converted to resource names/IDs for easier removal
2145 if font_names:
2146 # Recursively loop through page objects to gather font info
2147 def get_font_info(
2148 obj: Any,
2149 font_info: Optional[dict[str, Any]] = None,
2150 key: Optional[str] = None
2151 ) -> dict[str, Any]:
2152 if font_info is None:
2153 font_info = {}
2154 if isinstance(obj, IndirectObject):
2155 obj = obj.get_object()
2156 if isinstance(obj, dict):
2157 if obj.get("/Type") == "/Font":
2158 font_name = obj.get("/BaseFont", "")
2159 # Normalize font names like "/RRXFFV+Palatino-Bold" to "Palatino-Bold"
2160 normalized_font_name = font_name.lstrip("/").split("+")[-1]
2161 if normalized_font_name not in font_info:
2162 font_info[normalized_font_name] = {
2163 "normalized_font_name": normalized_font_name,
2164 "resource_ids": [],
2165 }
2166 if key not in font_info[normalized_font_name]["resource_ids"]:
2167 font_info[normalized_font_name]["resource_ids"].append(key)
2168 for k in obj:
2169 font_info = get_font_info(obj[k], font_info, k)
2170 elif isinstance(obj, (list, ArrayObject)):
2171 for child_obj in obj:
2172 font_info = get_font_info(child_obj, font_info)
2173 return font_info
2175 # Add relevant resource names for removal
2176 font_info = get_font_info(page.get("/Resources"))
2177 for font_name in font_names:
2178 if font_name in font_info:
2179 resource_ids_to_remove.extend(font_info[font_name]["resource_ids"])
2181 text_filters = {}
2182 if font_names:
2183 text_filters["font_ids"] = resource_ids_to_remove
2184 self.remove_objects_from_page(page, ObjectDeletionFlag.TEXT, text_filters=text_filters)
2186 def add_uri(
2187 self,
2188 page_number: int,
2189 uri: str,
2190 rect: RectangleObject,
2191 border: Optional[ArrayObject] = None,
2192 ) -> None:
2193 """
2194 Add an URI from a rectangular area to the specified page.
2196 Args:
2197 page_number: index of the page on which to place the URI action.
2198 uri: URI of resource to link to.
2199 rect: :class:`RectangleObject<pypdf.generic.RectangleObject>` or
2200 array of four integers specifying the clickable rectangular area
2201 ``[xLL, yLL, xUR, yUR]``, or string in the form
2202 ``"[ xLL yLL xUR yUR ]"``.
2203 border: if provided, an array describing border-drawing
2204 properties. See the PDF spec for details. No border will be
2205 drawn if this argument is omitted.
2207 """
2208 page_link = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
2209 page_ref = cast(dict[str, Any], self.get_object(page_link))
2211 border_arr: BorderArrayType
2212 if border is not None:
2213 border_arr = [NumberObject(n) for n in border[:3]]
2214 if len(border) == 4:
2215 dash_pattern = ArrayObject([NumberObject(n) for n in border[3]])
2216 border_arr.append(dash_pattern)
2217 else:
2218 border_arr = [NumberObject(2), NumberObject(2), NumberObject(2)]
2220 if isinstance(rect, str):
2221 rect = NumberObject(rect)
2222 elif isinstance(rect, RectangleObject):
2223 pass
2224 else:
2225 rect = RectangleObject(rect)
2227 lnk2 = DictionaryObject()
2228 lnk2.update(
2229 {
2230 NameObject("/S"): NameObject("/URI"),
2231 NameObject("/URI"): TextStringObject(uri),
2232 }
2233 )
2234 lnk = DictionaryObject()
2235 lnk.update(
2236 {
2237 NameObject(AA.Type): NameObject("/Annot"),
2238 NameObject(AA.Subtype): NameObject("/Link"),
2239 NameObject(AA.P): page_link,
2240 NameObject(AA.Rect): rect,
2241 NameObject("/H"): NameObject("/I"),
2242 NameObject(AA.Border): ArrayObject(border_arr),
2243 NameObject("/A"): lnk2,
2244 }
2245 )
2246 lnk_ref = self._add_object(lnk)
2248 if PG.ANNOTS in page_ref:
2249 page_ref[PG.ANNOTS].append(lnk_ref)
2250 else:
2251 page_ref[NameObject(PG.ANNOTS)] = ArrayObject([lnk_ref])
2253 _valid_layouts = (
2254 "/NoLayout",
2255 "/SinglePage",
2256 "/OneColumn",
2257 "/TwoColumnLeft",
2258 "/TwoColumnRight",
2259 "/TwoPageLeft",
2260 "/TwoPageRight",
2261 )
2263 def _get_page_layout(self) -> Optional[LayoutType]:
2264 try:
2265 return cast(LayoutType, self._root_object["/PageLayout"])
2266 except KeyError:
2267 return None
2269 def _set_page_layout(self, layout: Union[NameObject, LayoutType]) -> None:
2270 """
2271 Set the page layout.
2273 Args:
2274 layout: The page layout to be used.
2276 .. list-table:: Valid ``layout`` arguments
2277 :widths: 50 200
2279 * - /NoLayout
2280 - Layout explicitly not specified
2281 * - /SinglePage
2282 - Show one page at a time
2283 * - /OneColumn
2284 - Show one column at a time
2285 * - /TwoColumnLeft
2286 - Show pages in two columns, odd-numbered pages on the left
2287 * - /TwoColumnRight
2288 - Show pages in two columns, odd-numbered pages on the right
2289 * - /TwoPageLeft
2290 - Show two pages at a time, odd-numbered pages on the left
2291 * - /TwoPageRight
2292 - Show two pages at a time, odd-numbered pages on the right
2294 """
2295 if not isinstance(layout, NameObject):
2296 if layout not in self._valid_layouts:
2297 logger_warning(
2298 f"Layout should be one of: {'', ''.join(self._valid_layouts)}",
2299 __name__,
2300 )
2301 layout = NameObject(layout)
2302 self._root_object.update({NameObject("/PageLayout"): layout})
2304 def set_page_layout(self, layout: LayoutType) -> None:
2305 """
2306 Set the page layout.
2308 Args:
2309 layout: The page layout to be used
2311 .. list-table:: Valid ``layout`` arguments
2312 :widths: 50 200
2314 * - /NoLayout
2315 - Layout explicitly not specified
2316 * - /SinglePage
2317 - Show one page at a time
2318 * - /OneColumn
2319 - Show one column at a time
2320 * - /TwoColumnLeft
2321 - Show pages in two columns, odd-numbered pages on the left
2322 * - /TwoColumnRight
2323 - Show pages in two columns, odd-numbered pages on the right
2324 * - /TwoPageLeft
2325 - Show two pages at a time, odd-numbered pages on the left
2326 * - /TwoPageRight
2327 - Show two pages at a time, odd-numbered pages on the right
2329 """
2330 self._set_page_layout(layout)
2332 @property
2333 def page_layout(self) -> Optional[LayoutType]:
2334 """
2335 Page layout property.
2337 .. list-table:: Valid ``layout`` values
2338 :widths: 50 200
2340 * - /NoLayout
2341 - Layout explicitly not specified
2342 * - /SinglePage
2343 - Show one page at a time
2344 * - /OneColumn
2345 - Show one column at a time
2346 * - /TwoColumnLeft
2347 - Show pages in two columns, odd-numbered pages on the left
2348 * - /TwoColumnRight
2349 - Show pages in two columns, odd-numbered pages on the right
2350 * - /TwoPageLeft
2351 - Show two pages at a time, odd-numbered pages on the left
2352 * - /TwoPageRight
2353 - Show two pages at a time, odd-numbered pages on the right
2354 """
2355 return self._get_page_layout()
2357 @page_layout.setter
2358 def page_layout(self, layout: LayoutType) -> None:
2359 self._set_page_layout(layout)
2361 _valid_modes = (
2362 "/UseNone",
2363 "/UseOutlines",
2364 "/UseThumbs",
2365 "/FullScreen",
2366 "/UseOC",
2367 "/UseAttachments",
2368 )
2370 def _get_page_mode(self) -> Optional[PagemodeType]:
2371 try:
2372 return cast(PagemodeType, self._root_object["/PageMode"])
2373 except KeyError:
2374 return None
2376 @property
2377 def page_mode(self) -> Optional[PagemodeType]:
2378 """
2379 Page mode property.
2381 .. list-table:: Valid ``mode`` values
2382 :widths: 50 200
2384 * - /UseNone
2385 - Do not show outline or thumbnails panels
2386 * - /UseOutlines
2387 - Show outline (aka bookmarks) panel
2388 * - /UseThumbs
2389 - Show page thumbnails panel
2390 * - /FullScreen
2391 - Fullscreen view
2392 * - /UseOC
2393 - Show Optional Content Group (OCG) panel
2394 * - /UseAttachments
2395 - Show attachments panel
2396 """
2397 return self._get_page_mode()
2399 @page_mode.setter
2400 def page_mode(self, mode: PagemodeType) -> None:
2401 if isinstance(mode, NameObject):
2402 mode_name: NameObject = mode
2403 else:
2404 if mode not in self._valid_modes:
2405 logger_warning(
2406 f"Mode should be one of: {', '.join(self._valid_modes)}", __name__
2407 )
2408 mode_name = NameObject(mode)
2409 self._root_object.update({NameObject("/PageMode"): mode_name})
2411 def add_annotation(
2412 self,
2413 page_number: Union[int, PageObject],
2414 annotation: dict[str, Any],
2415 ) -> DictionaryObject:
2416 """
2417 Add a single annotation to the page.
2418 The added annotation must be a new annotation.
2419 It cannot be recycled.
2421 Args:
2422 page_number: PageObject or page index.
2423 annotation: Annotation to be added (created with annotation).
2425 Returns:
2426 The inserted object.
2427 This can be used for popup creation, for example.
2429 """
2430 page = page_number
2431 if isinstance(page, int):
2432 page = self.pages[page]
2433 elif not isinstance(page, PageObject):
2434 raise TypeError("page: invalid type")
2436 to_add = cast(DictionaryObject, _pdf_objectify(annotation))
2437 to_add[NameObject("/P")] = page.indirect_reference
2439 if page.annotations is None:
2440 page[NameObject("/Annots")] = ArrayObject()
2441 assert page.annotations is not None
2443 # Internal link annotations need the correct object type for the
2444 # destination
2445 if to_add.get("/Subtype") == "/Link" and "/Dest" in to_add:
2446 tmp = cast(dict[Any, Any], to_add[NameObject("/Dest")])
2447 dest = Destination(
2448 NameObject("/LinkName"),
2449 tmp["target_page_index"],
2450 Fit(
2451 fit_type=tmp["fit"], fit_args=dict(tmp)["fit_args"]
2452 ), # I have no clue why this dict-hack is necessary
2453 )
2454 to_add[NameObject("/Dest")] = dest.dest_array
2456 page.annotations.append(self._add_object(to_add))
2458 if to_add.get("/Subtype") == "/Popup" and NameObject("/Parent") in to_add:
2459 cast(DictionaryObject, to_add["/Parent"].get_object())[
2460 NameObject("/Popup")
2461 ] = to_add.indirect_reference
2463 return to_add
2465 def clean_page(self, page: Union[PageObject, IndirectObject]) -> PageObject:
2466 """
2467 Perform some clean up in the page.
2468 Currently: convert NameObject named destination to TextStringObject
2469 (required for names/dests list)
2471 Args:
2472 page:
2474 Returns:
2475 The cleaned PageObject
2477 """
2478 page = cast("PageObject", page.get_object())
2479 for a in page.get("/Annots", []):
2480 a_obj = a.get_object()
2481 d = a_obj.get("/Dest", None)
2482 act = a_obj.get("/A", None)
2483 if isinstance(d, NameObject):
2484 a_obj[NameObject("/Dest")] = TextStringObject(d)
2485 elif act is not None:
2486 act = act.get_object()
2487 d = act.get("/D", None)
2488 if isinstance(d, NameObject):
2489 act[NameObject("/D")] = TextStringObject(d)
2490 return page
2492 def _create_stream(
2493 self, fileobj: Union[Path, StrByteType, PdfReader]
2494 ) -> tuple[IOBase, Optional[Encryption]]:
2495 # If the fileobj parameter is a string, assume it is a path
2496 # and create a file object at that location. If it is a file,
2497 # copy the file's contents into a BytesIO stream object; if
2498 # it is a PdfReader, copy that reader's stream into a
2499 # BytesIO stream.
2500 # If fileobj is none of the above types, it is not modified
2501 encryption_obj = None
2502 stream: IOBase
2503 if isinstance(fileobj, (str, Path)):
2504 with FileIO(fileobj, "rb") as f:
2505 stream = BytesIO(f.read())
2506 elif isinstance(fileobj, PdfReader):
2507 if fileobj._encryption:
2508 encryption_obj = fileobj._encryption
2509 orig_tell = fileobj.stream.tell()
2510 fileobj.stream.seek(0)
2511 stream = BytesIO(fileobj.stream.read())
2513 # reset the stream to its original location
2514 fileobj.stream.seek(orig_tell)
2515 elif hasattr(fileobj, "seek") and hasattr(fileobj, "read"):
2516 fileobj.seek(0)
2517 filecontent = fileobj.read()
2518 stream = BytesIO(filecontent)
2519 else:
2520 raise NotImplementedError(
2521 "Merging requires an object that PdfReader can parse. "
2522 "Typically, that is a Path or a string representing a Path, "
2523 "a file object, or an object implementing .seek and .read. "
2524 "Passing a PdfReader directly works as well."
2525 )
2526 return stream, encryption_obj
2528 def append(
2529 self,
2530 fileobj: Union[StrByteType, PdfReader, Path],
2531 outline_item: Union[
2532 str, None, PageRange, tuple[int, int], tuple[int, int, int], list[int]
2533 ] = None,
2534 pages: Union[
2535 None,
2536 PageRange,
2537 tuple[int, int],
2538 tuple[int, int, int],
2539 list[int],
2540 list[PageObject],
2541 ] = None,
2542 import_outline: bool = True,
2543 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = None,
2544 ) -> None:
2545 """
2546 Identical to the :meth:`merge()<merge>` method, but assumes you want to
2547 concatenate all pages onto the end of the file instead of specifying a
2548 position.
2550 Args:
2551 fileobj: A File Object or an object that supports the standard
2552 read and seek methods similar to a File Object. Could also be a
2553 string representing a path to a PDF file.
2554 outline_item: Optionally, you may specify a string to build an
2555 outline (aka 'bookmark') to identify the beginning of the
2556 included file.
2557 pages: Can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2558 or a ``(start, stop[, step])`` tuple
2559 or a list of pages to be processed
2560 to merge only the specified range of pages from the source
2561 document into the output document.
2562 import_outline: You may prevent the source document's
2563 outline (collection of outline items, previously referred to as
2564 'bookmarks') from being imported by specifying this as ``False``.
2565 excluded_fields: Provide the list of fields/keys to be ignored
2566 if ``/Annots`` is part of the list, the annotation will be ignored
2567 if ``/B`` is part of the list, the articles will be ignored
2569 """
2570 if excluded_fields is None:
2571 excluded_fields = ()
2572 if isinstance(outline_item, (tuple, list, PageRange)):
2573 if isinstance(pages, bool):
2574 if not isinstance(import_outline, bool):
2575 excluded_fields = import_outline
2576 import_outline = pages
2577 pages = outline_item
2578 self.merge(
2579 None,
2580 fileobj,
2581 None,
2582 pages,
2583 import_outline,
2584 excluded_fields,
2585 )
2586 else: # if isinstance(outline_item, str):
2587 self.merge(
2588 None,
2589 fileobj,
2590 outline_item,
2591 pages,
2592 import_outline,
2593 excluded_fields,
2594 )
2596 def merge(
2597 self,
2598 position: Optional[int],
2599 fileobj: Union[Path, StrByteType, PdfReader],
2600 outline_item: Optional[str] = None,
2601 pages: Optional[Union[PageRangeSpec, list[PageObject]]] = None,
2602 import_outline: bool = True,
2603 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = (),
2604 ) -> None:
2605 """
2606 Merge the pages from the given file into the output file at the
2607 specified page number.
2609 Args:
2610 position: The *page number* to insert this file. File will
2611 be inserted after the given number.
2612 fileobj: A File Object or an object that supports the standard
2613 read and seek methods similar to a File Object. Could also be a
2614 string representing a path to a PDF file.
2615 outline_item: Optionally, you may specify a string to build an outline
2616 (aka 'bookmark') to identify the
2617 beginning of the included file.
2618 pages: can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2619 or a ``(start, stop[, step])`` tuple
2620 or a list of pages to be processed
2621 to merge only the specified range of pages from the source
2622 document into the output document.
2623 import_outline: You may prevent the source document's
2624 outline (collection of outline items, previously referred to as
2625 'bookmarks') from being imported by specifying this as ``False``.
2626 excluded_fields: provide the list of fields/keys to be ignored
2627 if ``/Annots`` is part of the list, the annotation will be ignored
2628 if ``/B`` is part of the list, the articles will be ignored
2630 Raises:
2631 TypeError: The pages attribute is not configured properly
2633 """
2634 if isinstance(fileobj, PdfDocCommon):
2635 reader = fileobj
2636 else:
2637 stream, _encryption_obj = self._create_stream(fileobj)
2638 # Create a new PdfReader instance using the stream
2639 # (either file or BytesIO or StringIO) created above
2640 reader = PdfReader(stream, strict=False) # type: ignore[arg-type]
2642 if excluded_fields is None:
2643 excluded_fields = ()
2644 # Find the range of pages to merge.
2645 if pages is None:
2646 pages = list(range(len(reader.pages)))
2647 elif isinstance(pages, PageRange):
2648 pages = list(range(*pages.indices(len(reader.pages))))
2649 elif isinstance(pages, list):
2650 pass # keep unchanged
2651 elif isinstance(pages, tuple) and len(pages) <= 3:
2652 pages = list(range(*pages))
2653 elif not isinstance(pages, tuple):
2654 raise TypeError(
2655 '"pages" must be a tuple of (start, stop[, step]) or a list'
2656 )
2658 srcpages = {}
2659 for page in pages:
2660 if isinstance(page, PageObject):
2661 pg = page
2662 else:
2663 pg = reader.pages[page]
2664 assert pg.indirect_reference is not None
2665 if position is None:
2666 # numbers in the exclude list identifies that the exclusion is
2667 # only applicable to 1st level of cloning
2668 srcpages[pg.indirect_reference.idnum] = self.add_page(
2669 pg, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2670 )
2671 else:
2672 srcpages[pg.indirect_reference.idnum] = self.insert_page(
2673 pg, position, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2674 )
2675 position += 1
2676 srcpages[pg.indirect_reference.idnum].original_page = pg
2678 reader._named_destinations = (
2679 reader.named_destinations
2680 ) # need for the outline processing below
2682 arr: Any
2684 def _process_named_dests(dest: Any) -> None:
2685 arr = dest.dest_array
2686 if "/Names" in self._root_object and dest["/Title"] in cast(
2687 list[Any],
2688 cast(
2689 DictionaryObject,
2690 cast(DictionaryObject, self._root_object["/Names"]).get("/Dests", DictionaryObject()),
2691 ).get("/Names", DictionaryObject()),
2692 ):
2693 # already exists: should not duplicate it
2694 pass
2695 elif dest["/Page"] is None or isinstance(dest["/Page"], NullObject):
2696 pass
2697 elif isinstance(dest["/Page"], int):
2698 # the page reference is a page number normally not a PDF Reference
2699 # page numbers as int are normally accepted only in external goto
2700 try:
2701 p = reader.pages[dest["/Page"]]
2702 except IndexError:
2703 return
2704 assert p.indirect_reference is not None
2705 try:
2706 arr[NumberObject(0)] = NumberObject(
2707 srcpages[p.indirect_reference.idnum].page_number
2708 )
2709 self.add_named_destination_array(dest["/Title"], arr)
2710 except KeyError:
2711 pass
2712 elif dest["/Page"].indirect_reference.idnum in srcpages:
2713 arr[NumberObject(0)] = srcpages[
2714 dest["/Page"].indirect_reference.idnum
2715 ].indirect_reference
2716 self.add_named_destination_array(dest["/Title"], arr)
2718 for dest in reader._named_destinations.values():
2719 _process_named_dests(dest)
2721 outline_item_typ: TreeObject
2722 if outline_item is not None:
2723 outline_item_typ = cast(
2724 "TreeObject",
2725 self.add_outline_item(
2726 TextStringObject(outline_item),
2727 next(iter(srcpages.values())).indirect_reference,
2728 fit=PAGE_FIT,
2729 ).get_object(),
2730 )
2731 else:
2732 outline_item_typ = self.get_outline_root()
2734 _ro = reader.root_object
2735 if import_outline and CO.OUTLINES in _ro:
2736 outline = self._get_filtered_outline(
2737 _ro.get(CO.OUTLINES, None), srcpages, reader
2738 )
2739 self._insert_filtered_outline(
2740 outline, outline_item_typ, None
2741 ) # TODO: use before parameter
2743 if "/Annots" not in excluded_fields:
2744 for pag in srcpages.values():
2745 lst = self._insert_filtered_annotations(
2746 pag.original_page.get("/Annots", []), pag, srcpages, reader
2747 )
2748 if len(lst) > 0:
2749 pag[NameObject("/Annots")] = lst
2750 self.clean_page(pag)
2752 if "/AcroForm" in _ro and _ro["/AcroForm"] is not None:
2753 if "/AcroForm" not in self._root_object:
2754 self._root_object[NameObject("/AcroForm")] = self._add_object(
2755 cast(
2756 DictionaryObject,
2757 reader.root_object["/AcroForm"],
2758 ).clone(self, False, ("/Fields",))
2759 )
2760 arr = ArrayObject()
2761 else:
2762 arr = cast(
2763 ArrayObject,
2764 cast(DictionaryObject, self._root_object["/AcroForm"])["/Fields"],
2765 )
2766 trslat = self._id_translated[id(reader)]
2767 try:
2768 for f in reader.root_object["/AcroForm"]["/Fields"]: # type: ignore
2769 try:
2770 ind = IndirectObject(trslat[f.idnum], 0, self)
2771 if ind not in arr:
2772 arr.append(ind)
2773 except KeyError:
2774 # for trslat[] which mean the field has not be copied
2775 # through the page
2776 pass
2777 except KeyError: # for /Acroform or /Fields are not existing
2778 arr = self._add_object(ArrayObject())
2779 cast(DictionaryObject, self._root_object["/AcroForm"])[
2780 NameObject("/Fields")
2781 ] = arr
2783 if "/B" not in excluded_fields:
2784 self.add_filtered_articles("", srcpages, reader)
2786 def _add_articles_thread(
2787 self,
2788 thread: DictionaryObject, # thread entry from the reader's array of threads
2789 pages: dict[int, PageObject],
2790 reader: PdfReader,
2791 ) -> IndirectObject:
2792 """
2793 Clone the thread with only the applicable articles.
2795 Args:
2796 thread:
2797 pages:
2798 reader:
2800 Returns:
2801 The added thread as an indirect reference
2803 """
2804 nthread = thread.clone(
2805 self, force_duplicate=True, ignore_fields=("/F",)
2806 ) # use of clone to keep link between reader and writer
2807 self.threads.append(nthread.indirect_reference)
2808 first_article = cast("DictionaryObject", thread["/F"])
2809 current_article: Optional[DictionaryObject] = first_article
2810 new_article: Optional[DictionaryObject] = None
2811 while current_article is not None:
2812 pag = self._get_cloned_page(
2813 cast("PageObject", current_article["/P"]), pages, reader
2814 )
2815 if pag is not None:
2816 if new_article is None:
2817 new_article = cast(
2818 "DictionaryObject",
2819 self._add_object(DictionaryObject()).get_object(),
2820 )
2821 new_first = new_article
2822 nthread[NameObject("/F")] = new_article.indirect_reference
2823 else:
2824 new_article2 = cast(
2825 "DictionaryObject",
2826 self._add_object(
2827 DictionaryObject(
2828 {NameObject("/V"): new_article.indirect_reference}
2829 )
2830 ).get_object(),
2831 )
2832 new_article[NameObject("/N")] = new_article2.indirect_reference
2833 new_article = new_article2
2834 new_article[NameObject("/P")] = pag
2835 new_article[NameObject("/T")] = nthread.indirect_reference
2836 new_article[NameObject("/R")] = current_article["/R"]
2837 pag_obj = cast("PageObject", pag.get_object())
2838 if "/B" not in pag_obj:
2839 pag_obj[NameObject("/B")] = ArrayObject()
2840 cast("ArrayObject", pag_obj["/B"]).append(
2841 new_article.indirect_reference
2842 )
2843 current_article = cast("DictionaryObject", current_article["/N"])
2844 if current_article == first_article:
2845 new_article[NameObject("/N")] = new_first.indirect_reference # type: ignore
2846 new_first[NameObject("/V")] = new_article.indirect_reference # type: ignore
2847 current_article = None
2848 assert nthread.indirect_reference is not None
2849 return nthread.indirect_reference
2851 def add_filtered_articles(
2852 self,
2853 fltr: Union[
2854 Pattern[Any], str
2855 ], # thread entry from the reader's array of threads
2856 pages: dict[int, PageObject],
2857 reader: PdfReader,
2858 ) -> None:
2859 """
2860 Add articles matching the defined criteria.
2862 Args:
2863 fltr:
2864 pages:
2865 reader:
2867 """
2868 if isinstance(fltr, str):
2869 fltr = re.compile(fltr)
2870 elif not isinstance(fltr, Pattern):
2871 fltr = re.compile("")
2872 for p in pages.values():
2873 pp = p.original_page
2874 for a in pp.get("/B", ()):
2875 a_obj = a.get_object()
2876 if is_null_or_none(a_obj):
2877 continue
2878 thr = a_obj.get("/T")
2879 if thr is None:
2880 continue
2881 thr = thr.get_object()
2882 if thr.indirect_reference.idnum not in self._id_translated[
2883 id(reader)
2884 ] and fltr.search((thr.get("/I", {})).get("/Title", "")):
2885 self._add_articles_thread(thr, pages, reader)
2887 def _get_cloned_page(
2888 self,
2889 page: Union[None, IndirectObject, PageObject, NullObject],
2890 pages: dict[int, PageObject],
2891 reader: PdfReader,
2892 ) -> Optional[IndirectObject]:
2893 if isinstance(page, NullObject):
2894 return None
2895 if isinstance(page, DictionaryObject) and page.get("/Type", "") == "/Page":
2896 _i = page.indirect_reference
2897 elif isinstance(page, IndirectObject):
2898 _i = page
2899 try:
2900 return pages[_i.idnum].indirect_reference # type: ignore
2901 except Exception:
2902 return None
2904 def _insert_filtered_annotations(
2905 self,
2906 annots: Union[IndirectObject, list[DictionaryObject], None],
2907 page: PageObject,
2908 pages: dict[int, PageObject],
2909 reader: PdfReader,
2910 ) -> list[Destination]:
2911 outlist = ArrayObject()
2912 if isinstance(annots, IndirectObject):
2913 annots = cast("list[Any]", annots.get_object())
2914 if annots is None:
2915 return outlist
2916 if not isinstance(annots, list):
2917 logger_warning(f"Expected list of annotations, got {annots} of type {annots.__class__.__name__}.", __name__)
2918 return outlist
2919 for an in annots:
2920 ano = cast("DictionaryObject", an.get_object())
2921 if (
2922 ano["/Subtype"] != "/Link"
2923 or "/A" not in ano
2924 or cast("DictionaryObject", ano["/A"])["/S"] != "/GoTo"
2925 or "/Dest" in ano
2926 ):
2927 if "/Dest" not in ano:
2928 outlist.append(self._add_object(ano.clone(self)))
2929 else:
2930 d = ano["/Dest"]
2931 if isinstance(d, str):
2932 # it is a named dest
2933 if str(d) in self.get_named_dest_root():
2934 outlist.append(ano.clone(self).indirect_reference)
2935 else:
2936 d = cast("ArrayObject", d)
2937 p = self._get_cloned_page(d[0], pages, reader)
2938 if p is not None:
2939 anc = ano.clone(self, ignore_fields=("/Dest",))
2940 anc[NameObject("/Dest")] = ArrayObject([p, *d[1:]])
2941 outlist.append(self._add_object(anc))
2942 else:
2943 d = cast("DictionaryObject", ano["/A"]).get("/D", NullObject())
2944 if d is None or isinstance(d, NullObject):
2945 continue
2946 if isinstance(d, str):
2947 # it is a named dest
2948 if str(d) in self.get_named_dest_root():
2949 outlist.append(ano.clone(self).indirect_reference)
2950 else:
2951 d = cast("ArrayObject", d)
2952 p = self._get_cloned_page(d[0], pages, reader)
2953 if p is not None:
2954 anc = ano.clone(self, ignore_fields=("/D",))
2955 cast("DictionaryObject", anc["/A"])[
2956 NameObject("/D")
2957 ] = ArrayObject([p, *d[1:]])
2958 outlist.append(self._add_object(anc))
2959 return outlist
2961 def _get_filtered_outline(
2962 self,
2963 node: Any,
2964 pages: dict[int, PageObject],
2965 reader: PdfReader,
2966 ) -> list[Destination]:
2967 """
2968 Extract outline item entries that are part of the specified page set.
2970 Args:
2971 node:
2972 pages:
2973 reader:
2975 Returns:
2976 A list of destination objects.
2978 """
2979 new_outline = []
2980 if node is None:
2981 node = NullObject()
2982 node = node.get_object()
2983 if is_null_or_none(node):
2984 node = DictionaryObject()
2985 if node.get("/Type", "") == "/Outlines" or "/Title" not in node:
2986 node = node.get("/First", None)
2987 if node is not None:
2988 node = node.get_object()
2989 new_outline += self._get_filtered_outline(node, pages, reader)
2990 else:
2991 v: Union[None, IndirectObject, NullObject]
2992 while node is not None:
2993 node = node.get_object()
2994 o = cast("Destination", reader._build_outline_item(node))
2995 v = self._get_cloned_page(cast("PageObject", o["/Page"]), pages, reader)
2996 if v is None:
2997 v = NullObject()
2998 o[NameObject("/Page")] = v
2999 if "/First" in node:
3000 o._filtered_children = self._get_filtered_outline(
3001 node["/First"], pages, reader
3002 )
3003 else:
3004 o._filtered_children = []
3005 if (
3006 not isinstance(o["/Page"], NullObject)
3007 or len(o._filtered_children) > 0
3008 ):
3009 new_outline.append(o)
3010 node = node.get("/Next", None)
3011 return new_outline
3013 def _clone_outline(self, dest: Destination) -> TreeObject:
3014 n_ol = TreeObject()
3015 self._add_object(n_ol)
3016 n_ol[NameObject("/Title")] = TextStringObject(dest["/Title"])
3017 if not isinstance(dest["/Page"], NullObject):
3018 if dest.node is not None and "/A" in dest.node:
3019 n_ol[NameObject("/A")] = dest.node["/A"].clone(self)
3020 else:
3021 n_ol[NameObject("/Dest")] = dest.dest_array
3022 # TODO: /SE
3023 if dest.node is not None:
3024 n_ol[NameObject("/F")] = NumberObject(dest.node.get("/F", 0))
3025 n_ol[NameObject("/C")] = ArrayObject(
3026 dest.node.get(
3027 "/C", [FloatObject(0.0), FloatObject(0.0), FloatObject(0.0)]
3028 )
3029 )
3030 return n_ol
3032 def _insert_filtered_outline(
3033 self,
3034 outlines: list[Destination],
3035 parent: Union[TreeObject, IndirectObject],
3036 before: Union[None, TreeObject, IndirectObject] = None,
3037 ) -> None:
3038 for dest in outlines:
3039 # TODO: can be improved to keep A and SE entries (ignored for the moment)
3040 # with np=self.add_outline_item_destination(dest,parent,before)
3041 if dest.get("/Type", "") == "/Outlines" or "/Title" not in dest:
3042 np = parent
3043 else:
3044 np = self._clone_outline(dest)
3045 cast(TreeObject, parent.get_object()).insert_child(np, before, self)
3046 self._insert_filtered_outline(dest._filtered_children, np, None)
3048 def close(self) -> None:
3049 """Implemented for API harmonization."""
3050 return
3052 def find_outline_item(
3053 self,
3054 outline_item: dict[str, Any],
3055 root: Optional[OutlineType] = None,
3056 ) -> Optional[list[int]]:
3057 if root is None:
3058 o = self.get_outline_root()
3059 else:
3060 o = cast("TreeObject", root)
3062 i = 0
3063 while o is not None:
3064 if (
3065 o.indirect_reference == outline_item
3066 or o.get("/Title", None) == outline_item
3067 ):
3068 return [i]
3069 if "/First" in o:
3070 res = self.find_outline_item(
3071 outline_item, cast(OutlineType, o["/First"])
3072 )
3073 if res:
3074 return ([i] if "/Title" in o else []) + res
3075 if "/Next" in o:
3076 i += 1
3077 o = cast(TreeObject, o["/Next"])
3078 else:
3079 return None
3080 raise PyPdfError("This line is theoretically unreachable.") # pragma: no cover
3082 def reset_translation(
3083 self, reader: Union[None, PdfReader, IndirectObject] = None
3084 ) -> None:
3085 """
3086 Reset the translation table between reader and the writer object.
3088 Late cloning will create new independent objects.
3090 Args:
3091 reader: PdfReader or IndirectObject referencing a PdfReader object.
3092 if set to None or omitted, all tables will be reset.
3094 """
3095 if reader is None:
3096 self._id_translated = {}
3097 elif isinstance(reader, PdfReader):
3098 try:
3099 del self._id_translated[id(reader)]
3100 except Exception:
3101 pass
3102 elif isinstance(reader, IndirectObject):
3103 try:
3104 del self._id_translated[id(reader.pdf)]
3105 except Exception:
3106 pass
3107 else:
3108 raise Exception("invalid parameter {reader}")
3110 def set_page_label(
3111 self,
3112 page_index_from: int,
3113 page_index_to: int,
3114 style: Optional[PageLabelStyle] = None,
3115 prefix: Optional[str] = None,
3116 start: Optional[int] = 0,
3117 ) -> None:
3118 """
3119 Set a page label to a range of pages.
3121 Page indexes must be given starting from 0.
3122 Labels must have a style, a prefix or both.
3123 If a range is not assigned any page label, a decimal label starting from 1 is applied.
3125 Args:
3126 page_index_from: page index of the beginning of the range starting from 0
3127 page_index_to: page index of the beginning of the range starting from 0
3128 style: The numbering style to be used for the numeric portion of each page label:
3130 * ``/D`` Decimal Arabic numerals
3131 * ``/R`` Uppercase Roman numerals
3132 * ``/r`` Lowercase Roman numerals
3133 * ``/A`` Uppercase letters (A to Z for the first 26 pages,
3134 AA to ZZ for the next 26, and so on)
3135 * ``/a`` Lowercase letters (a to z for the first 26 pages,
3136 aa to zz for the next 26, and so on)
3138 prefix: The label prefix for page labels in this range.
3139 start: The value of the numeric portion for the first page label
3140 in the range.
3141 Subsequent pages are numbered sequentially from this value,
3142 which must be greater than or equal to 1.
3143 Default value: 1.
3145 """
3146 if style is None and prefix is None:
3147 raise ValueError("At least one of style and prefix must be given")
3148 if page_index_from < 0:
3149 raise ValueError("page_index_from must be greater or equal than 0")
3150 if page_index_to < page_index_from:
3151 raise ValueError(
3152 "page_index_to must be greater or equal than page_index_from"
3153 )
3154 if page_index_to >= len(self.pages):
3155 raise ValueError("page_index_to exceeds number of pages")
3156 if start is not None and start != 0 and start < 1:
3157 raise ValueError("If given, start must be greater or equal than one")
3159 self._set_page_label(page_index_from, page_index_to, style, prefix, start)
3161 def _set_page_label(
3162 self,
3163 page_index_from: int,
3164 page_index_to: int,
3165 style: Optional[PageLabelStyle] = None,
3166 prefix: Optional[str] = None,
3167 start: Optional[int] = 0,
3168 ) -> None:
3169 """
3170 Set a page label to a range of pages.
3172 Page indexes must be given starting from 0.
3173 Labels must have a style, a prefix or both.
3174 If a range is not assigned any page label a decimal label starting from 1 is applied.
3176 Args:
3177 page_index_from: page index of the beginning of the range starting from 0
3178 page_index_to: page index of the beginning of the range starting from 0
3179 style: The numbering style to be used for the numeric portion of each page label:
3180 /D Decimal Arabic numerals
3181 /R Uppercase Roman numerals
3182 /r Lowercase Roman numerals
3183 /A Uppercase letters (A to Z for the first 26 pages,
3184 AA to ZZ for the next 26, and so on)
3185 /a Lowercase letters (a to z for the first 26 pages,
3186 aa to zz for the next 26, and so on)
3187 prefix: The label prefix for page labels in this range.
3188 start: The value of the numeric portion for the first page label
3189 in the range.
3190 Subsequent pages are numbered sequentially from this value,
3191 which must be greater than or equal to 1. Default value: 1.
3193 """
3194 default_page_label = DictionaryObject()
3195 default_page_label[NameObject("/S")] = NameObject("/D")
3197 new_page_label = DictionaryObject()
3198 if style is not None:
3199 new_page_label[NameObject("/S")] = NameObject(style)
3200 if prefix is not None:
3201 new_page_label[NameObject("/P")] = TextStringObject(prefix)
3202 if start != 0:
3203 new_page_label[NameObject("/St")] = NumberObject(start)
3205 if NameObject(CatalogDictionary.PAGE_LABELS) not in self._root_object:
3206 nums = ArrayObject()
3207 nums_insert(NumberObject(0), default_page_label, nums)
3208 page_labels = TreeObject()
3209 page_labels[NameObject("/Nums")] = nums
3210 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3212 page_labels = cast(
3213 TreeObject, self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)]
3214 )
3215 nums = cast(ArrayObject, page_labels[NameObject("/Nums")])
3217 nums_insert(NumberObject(page_index_from), new_page_label, nums)
3218 nums_clear_range(NumberObject(page_index_from), page_index_to, nums)
3219 next_label_pos, *_ = nums_next(NumberObject(page_index_from), nums)
3220 if next_label_pos != page_index_to + 1 and page_index_to + 1 < len(self.pages):
3221 nums_insert(NumberObject(page_index_to + 1), default_page_label, nums)
3223 page_labels[NameObject("/Nums")] = nums
3224 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3226 def _repr_mimebundle_(
3227 self,
3228 include: Union[None, Iterable[str]] = None,
3229 exclude: Union[None, Iterable[str]] = None,
3230 ) -> dict[str, Any]:
3231 """
3232 Integration into Jupyter Notebooks.
3234 This method returns a dictionary that maps a mime-type to its
3235 representation.
3237 .. seealso::
3239 https://ipython.readthedocs.io/en/stable/config/integrating.html
3240 """
3241 pdf_data = BytesIO()
3242 self.write(pdf_data)
3243 data = {
3244 "application/pdf": pdf_data,
3245 }
3247 if include is not None:
3248 # Filter representations based on include list
3249 data = {k: v for k, v in data.items() if k in include}
3251 if exclude is not None:
3252 # Remove representations based on exclude list
3253 data = {k: v for k, v in data.items() if k not in exclude}
3255 return data
3258def _pdf_objectify(obj: Union[dict[str, Any], str, float, list[Any]]) -> PdfObject:
3259 if isinstance(obj, PdfObject):
3260 return obj
3261 if isinstance(obj, dict):
3262 to_add = DictionaryObject()
3263 for key, value in obj.items():
3264 to_add[NameObject(key)] = _pdf_objectify(value)
3265 return to_add
3266 if isinstance(obj, str):
3267 if obj.startswith("/"):
3268 return NameObject(obj)
3269 return TextStringObject(obj)
3270 if isinstance(obj, (float, int)):
3271 return FloatObject(obj)
3272 if isinstance(obj, list):
3273 return ArrayObject(_pdf_objectify(i) for i in obj)
3274 raise NotImplementedError(
3275 f"{type(obj)=} could not be cast to a PdfObject"
3276 )
3279def _create_outline_item(
3280 action_ref: Union[None, IndirectObject],
3281 title: str,
3282 color: Union[tuple[float, float, float], str, None],
3283 italic: bool,
3284 bold: bool,
3285) -> TreeObject:
3286 outline_item = TreeObject()
3287 if action_ref is not None:
3288 outline_item[NameObject("/A")] = action_ref
3289 outline_item.update(
3290 {
3291 NameObject("/Title"): create_string_object(title),
3292 }
3293 )
3294 if color:
3295 if isinstance(color, str):
3296 color = hex_to_rgb(color)
3297 outline_item.update(
3298 {NameObject("/C"): ArrayObject([FloatObject(c) for c in color])}
3299 )
3300 if italic or bold:
3301 format_flag = 0
3302 if italic:
3303 format_flag += OutlineFontFlag.italic
3304 if bold:
3305 format_flag += OutlineFontFlag.bold
3306 outline_item.update({NameObject("/F"): NumberObject(format_flag)})
3307 return outline_item