Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_writer.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com>
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15# * The name of the author may not be used to endorse or promote products
16# derived from this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
30import decimal
31import enum
32import hashlib
33import re
34import struct
35import sys
36import uuid
37from collections.abc import Iterable, Mapping
38from io import BytesIO, FileIO, IOBase
39from itertools import compress
40from pathlib import Path
41from re import Pattern
42from types import TracebackType
43from typing import (
44 IO,
45 Any,
46 Callable,
47 Optional,
48 Union,
49 cast,
50)
52if sys.version_info >= (3, 11):
53 from typing import Self
54else:
55 from typing_extensions import Self
57from ._doc_common import DocumentInformation, PdfDocCommon
58from ._encryption import EncryptAlgorithm, Encryption
59from ._page import PageObject, Transformation
60from ._page_labels import nums_clear_range, nums_insert, nums_next
61from ._reader import PdfReader
62from ._utils import (
63 StrByteType,
64 StreamType,
65 _get_max_pdf_version_header,
66 deprecation_no_replacement,
67 logger_warning,
68)
69from .constants import AnnotationDictionaryAttributes as AA
70from .constants import CatalogAttributes as CA
71from .constants import (
72 CatalogDictionary,
73 GoToActionArguments,
74 ImageType,
75 InteractiveFormDictEntries,
76 OutlineFontFlag,
77 PageLabelStyle,
78 PagesAttributes,
79 TypFitArguments,
80 UserAccessPermissions,
81)
82from .constants import Core as CO
83from .constants import FieldDictionaryAttributes as FA
84from .constants import PageAttributes as PG
85from .constants import TrailerKeys as TK
86from .errors import PdfReadError, PyPdfError
87from .generic import (
88 PAGE_FIT,
89 ArrayObject,
90 BooleanObject,
91 ByteStringObject,
92 ContentStream,
93 Destination,
94 DictionaryObject,
95 EmbeddedFile,
96 Fit,
97 FloatObject,
98 IndirectObject,
99 NameObject,
100 NullObject,
101 NumberObject,
102 PdfObject,
103 RectangleObject,
104 ReferenceLink,
105 StreamObject,
106 TextStringObject,
107 TreeObject,
108 ViewerPreferences,
109 create_string_object,
110 extract_links,
111 hex_to_rgb,
112 is_null_or_none,
113)
114from .generic._appearance_stream import TextStreamAppearance
115from .pagerange import PageRange, PageRangeSpec
116from .types import (
117 AnnotationSubtype,
118 BorderArrayType,
119 LayoutType,
120 OutlineItemType,
121 OutlineType,
122 PagemodeType,
123)
124from .xmp import XmpInformation
126ALL_DOCUMENT_PERMISSIONS = UserAccessPermissions.all()
129class ObjectDeletionFlag(enum.IntFlag):
130 NONE = 0
131 TEXT = enum.auto()
132 LINKS = enum.auto()
133 ATTACHMENTS = enum.auto()
134 OBJECTS_3D = enum.auto()
135 ALL_ANNOTATIONS = enum.auto()
136 XOBJECT_IMAGES = enum.auto()
137 INLINE_IMAGES = enum.auto()
138 DRAWING_IMAGES = enum.auto()
139 IMAGES = XOBJECT_IMAGES | INLINE_IMAGES | DRAWING_IMAGES
142def _rolling_checksum(stream: BytesIO, blocksize: int = 65536) -> str:
143 hash = hashlib.md5(usedforsecurity=False)
144 for block in iter(lambda: stream.read(blocksize), b""):
145 hash.update(block)
146 return hash.hexdigest()
149class PdfWriter(PdfDocCommon):
150 """
151 Write a PDF file out, given pages produced by another class or through
152 cloning a PDF file during initialization.
154 Typically data is added from a :class:`PdfReader<pypdf.PdfReader>`.
156 Args:
157 clone_from: identical to fileobj (for compatibility)
159 incremental: If true, loads the document and set the PdfWriter in incremental mode.
161 When writing incrementally, the original document is written first and new/modified
162 content is appended. To be used for signed document/forms to keep signature valid.
164 full: If true, loads all the objects (always full if incremental = True).
165 This parameter may allow loading large PDFs.
167 strict: If true, pypdf will raise an exception if a PDF does not follow the specification.
168 If false, pypdf will try to be forgiving and do something reasonable, but it will log
169 a warning message. It is a best-effort approach.
171 """
173 def __init__(
174 self,
175 fileobj: Union[None, PdfReader, StrByteType, Path] = "",
176 clone_from: Union[None, PdfReader, StrByteType, Path] = None,
177 incremental: bool = False,
178 full: bool = False,
179 strict: bool = False,
180 ) -> None:
181 self.strict = strict
182 """
183 If true, pypdf will raise an exception if a PDF does not follow the specification.
184 If false, pypdf will try to be forgiving and do something reasonable, but it will log
185 a warning message. It is a best-effort approach.
186 """
188 self.incremental = incremental or full
189 """
190 Returns if the PdfWriter object has been started in incremental mode.
191 """
193 self._objects: list[Optional[PdfObject]] = []
194 """
195 The indirect objects in the PDF.
196 For the incremental case, it will be filled with None
197 in clone_reader_document_root.
198 """
200 self._original_hash: list[int] = []
201 """
202 List of hashes after import; used to identify changes.
203 """
205 self._idnum_hash: dict[bytes, tuple[IndirectObject, list[IndirectObject]]] = {}
206 """
207 Maps hash values of indirect objects to the list of IndirectObjects.
208 This is used for compression.
209 """
211 self._id_translated: dict[int, dict[int, int]] = {}
212 """List of already translated IDs.
213 dict[id(pdf)][(idnum, generation)]
214 """
216 self._info_obj: Optional[PdfObject]
217 """The PDF files's document information dictionary,
218 defined by Info in the PDF file's trailer dictionary."""
220 self._ID: Union[ArrayObject, None] = None
221 """The PDF file identifier,
222 defined by the ID in the PDF file's trailer dictionary."""
224 self._unresolved_links: list[tuple[ReferenceLink, ReferenceLink]] = []
225 "Tracks links in pages added to the writer for resolving later."
226 self._merged_in_pages: dict[Optional[IndirectObject], Optional[IndirectObject]] = {}
227 "Tracks pages added to the writer and what page they turned into."
229 if self.incremental:
230 if isinstance(fileobj, (str, Path)):
231 with open(fileobj, "rb") as f:
232 fileobj = BytesIO(f.read(-1))
233 if isinstance(fileobj, BytesIO):
234 fileobj = PdfReader(fileobj)
235 if not isinstance(fileobj, PdfReader):
236 raise PyPdfError("Invalid type for incremental mode")
237 self._reader = fileobj # prev content is in _reader.stream
238 self._header = fileobj.pdf_header.encode()
239 self._readonly = True # TODO: to be analysed
240 else:
241 self._header = b"%PDF-1.3"
242 self._info_obj = self._add_object(
243 DictionaryObject(
244 {NameObject("/Producer"): create_string_object("pypdf")}
245 )
246 )
248 def _get_clone_from(
249 fileobj: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
250 clone_from: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
251 ) -> Union[None, PdfReader, str, Path, IO[Any], BytesIO]:
252 if isinstance(fileobj, (str, Path, IO, BytesIO)) and (
253 fileobj == "" or clone_from is not None
254 ):
255 return clone_from
256 cloning = True
257 if isinstance(fileobj, (str, Path)) and (
258 not Path(str(fileobj)).exists()
259 or Path(str(fileobj)).stat().st_size == 0
260 ):
261 cloning = False
262 if isinstance(fileobj, (IOBase, BytesIO)):
263 t = fileobj.tell()
264 if fileobj.seek(0, 2) == 0:
265 cloning = False
266 fileobj.seek(t, 0)
267 if cloning:
268 clone_from = fileobj
269 return clone_from
271 clone_from = _get_clone_from(fileobj, clone_from)
272 # To prevent overwriting
273 self.temp_fileobj = fileobj
274 self.fileobj = ""
275 self._with_as_usage = False
276 self._cloned = False
277 # The root of our page tree node
278 pages = DictionaryObject(
279 {
280 NameObject(PagesAttributes.TYPE): NameObject("/Pages"),
281 NameObject(PagesAttributes.COUNT): NumberObject(0),
282 NameObject(PagesAttributes.KIDS): ArrayObject(),
283 }
284 )
285 self.flattened_pages = []
286 self._encryption: Optional[Encryption] = None
287 self._encrypt_entry: Optional[DictionaryObject] = None
289 if clone_from is not None:
290 if not isinstance(clone_from, PdfReader):
291 clone_from = PdfReader(clone_from)
292 self.clone_document_from_reader(clone_from)
293 self._cloned = True
294 else:
295 self._pages = self._add_object(pages)
296 self._root_object = DictionaryObject(
297 {
298 NameObject(PagesAttributes.TYPE): NameObject(CO.CATALOG),
299 NameObject(CO.PAGES): self._pages,
300 }
301 )
302 self._add_object(self._root_object)
303 if full and not incremental:
304 self.incremental = False
305 if isinstance(self._ID, list):
306 if isinstance(self._ID[0], TextStringObject):
307 self._ID[0] = ByteStringObject(self._ID[0].get_original_bytes())
308 if isinstance(self._ID[1], TextStringObject):
309 self._ID[1] = ByteStringObject(self._ID[1].get_original_bytes())
311 # for commonality
312 @property
313 def is_encrypted(self) -> bool:
314 """
315 Read-only boolean property showing whether this PDF file is encrypted.
317 Note that this property, if true, will remain true even after the
318 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
319 """
320 return False
322 @property
323 def root_object(self) -> DictionaryObject:
324 """
325 Provide direct access to PDF Structure.
327 Note:
328 Recommended only for read access.
330 """
331 return self._root_object
333 @property
334 def _info(self) -> Optional[DictionaryObject]:
335 """
336 Provide access to "/Info". Standardized with PdfReader.
338 Returns:
339 /Info Dictionary; None if the entry does not exist
341 """
342 return (
343 None
344 if self._info_obj is None
345 else cast(DictionaryObject, self._info_obj.get_object())
346 )
348 @_info.setter
349 def _info(self, value: Optional[Union[IndirectObject, DictionaryObject]]) -> None:
350 if value is None:
351 try:
352 self._objects[self._info_obj.indirect_reference.idnum - 1] = None # type: ignore
353 except (KeyError, AttributeError):
354 pass
355 self._info_obj = None
356 else:
357 if self._info_obj is None:
358 self._info_obj = self._add_object(DictionaryObject())
359 obj = cast(DictionaryObject, self._info_obj.get_object())
360 obj.clear()
361 obj.update(cast(DictionaryObject, value.get_object()))
363 @property
364 def xmp_metadata(self) -> Optional[XmpInformation]:
365 """XMP (Extensible Metadata Platform) data."""
366 return cast(XmpInformation, self.root_object.xmp_metadata)
368 @xmp_metadata.setter
369 def xmp_metadata(self, value: Union[XmpInformation, bytes, None]) -> None:
370 """XMP (Extensible Metadata Platform) data."""
371 if value is None:
372 if "/Metadata" in self.root_object:
373 del self.root_object["/Metadata"]
374 return
376 metadata = self.root_object.get("/Metadata", None)
377 if not isinstance(metadata, IndirectObject):
378 if metadata is not None:
379 del self.root_object["/Metadata"]
380 metadata_stream = StreamObject()
381 stream_reference = self._add_object(metadata_stream)
382 self.root_object[NameObject("/Metadata")] = stream_reference
383 else:
384 metadata_stream = cast(StreamObject, metadata.get_object())
386 if isinstance(value, XmpInformation):
387 bytes_data = value.stream.get_data()
388 else:
389 bytes_data = value
390 metadata_stream.set_data(bytes_data)
392 @property
393 def with_as_usage(self) -> bool:
394 deprecation_no_replacement("with_as_usage", "5.0")
395 return self._with_as_usage
397 @with_as_usage.setter
398 def with_as_usage(self, value: bool) -> None:
399 deprecation_no_replacement("with_as_usage", "5.0")
400 self._with_as_usage = value
402 def __enter__(self) -> Self:
403 """Store how writer is initialized by 'with'."""
404 c: bool = self._cloned
405 t = self.temp_fileobj
406 self.__init__() # type: ignore
407 self._cloned = c
408 self._with_as_usage = True
409 self.fileobj = t # type: ignore
410 return self
412 def __exit__(
413 self,
414 exc_type: Optional[type[BaseException]],
415 exc: Optional[BaseException],
416 traceback: Optional[TracebackType],
417 ) -> None:
418 """Write data to the fileobj."""
419 if self.fileobj and not self._cloned:
420 self.write(self.fileobj)
422 @property
423 def pdf_header(self) -> str:
424 """
425 Read/Write property of the PDF header that is written.
427 This should be something like ``'%PDF-1.5'``. It is recommended to set
428 the lowest version that supports all features which are used within the
429 PDF file.
431 Note: `pdf_header` returns a string but accepts bytes or str for writing
432 """
433 return self._header.decode()
435 @pdf_header.setter
436 def pdf_header(self, new_header: Union[str, bytes]) -> None:
437 if isinstance(new_header, str):
438 new_header = new_header.encode()
439 self._header = new_header
441 def _add_object(self, obj: PdfObject) -> IndirectObject:
442 if (
443 getattr(obj, "indirect_reference", None) is not None
444 and obj.indirect_reference.pdf == self # type: ignore
445 ):
446 return obj.indirect_reference # type: ignore
447 # check for /Contents in Pages (/Contents in annotations are strings)
448 if isinstance(obj, DictionaryObject) and isinstance(
449 obj.get(PG.CONTENTS, None), (ArrayObject, DictionaryObject)
450 ):
451 obj[NameObject(PG.CONTENTS)] = self._add_object(obj[PG.CONTENTS])
452 self._objects.append(obj)
453 obj.indirect_reference = IndirectObject(len(self._objects), 0, self)
454 return obj.indirect_reference
456 def get_object(
457 self,
458 indirect_reference: Union[int, IndirectObject],
459 ) -> PdfObject:
460 if isinstance(indirect_reference, int):
461 obj = self._objects[indirect_reference - 1]
462 elif indirect_reference.pdf != self:
463 raise ValueError("PDF must be self")
464 else:
465 obj = self._objects[indirect_reference.idnum - 1]
466 assert obj is not None, "mypy"
467 return obj
469 def _replace_object(
470 self,
471 indirect_reference: Union[int, IndirectObject],
472 obj: PdfObject,
473 ) -> PdfObject:
474 if isinstance(indirect_reference, IndirectObject):
475 if indirect_reference.pdf != self:
476 raise ValueError("PDF must be self")
477 indirect_reference = indirect_reference.idnum
478 gen = self._objects[indirect_reference - 1].indirect_reference.generation # type: ignore
479 if (
480 getattr(obj, "indirect_reference", None) is not None
481 and obj.indirect_reference.pdf != self # type: ignore
482 ):
483 obj = obj.clone(self)
484 self._objects[indirect_reference - 1] = obj
485 obj.indirect_reference = IndirectObject(indirect_reference, gen, self)
487 assert isinstance(obj, PdfObject), "mypy"
488 return obj
490 def _add_page(
491 self,
492 page: PageObject,
493 index: int,
494 excluded_keys: Iterable[str] = (),
495 ) -> PageObject:
496 if not isinstance(page, PageObject) or page.get(PagesAttributes.TYPE, None) != CO.PAGE:
497 raise ValueError("Invalid page object")
498 assert self.flattened_pages is not None, "for mypy"
499 page_org = page
500 excluded_keys = list(excluded_keys)
501 excluded_keys += [PagesAttributes.PARENT, "/StructParents"]
502 # Acrobat does not accept two indirect references pointing on the same
503 # page; therefore in order to add multiple copies of the same
504 # page, we need to create a new dictionary for the page, however the
505 # objects below (including content) are not duplicated:
506 try: # delete an already existing page
507 del self._id_translated[id(page_org.indirect_reference.pdf)][ # type: ignore
508 page_org.indirect_reference.idnum # type: ignore
509 ]
510 except Exception:
511 pass
513 page = cast(
514 "PageObject", page_org.clone(self, False, excluded_keys).get_object()
515 )
516 if page_org.pdf is not None:
517 other = page_org.pdf.pdf_header
518 self.pdf_header = _get_max_pdf_version_header(self.pdf_header, other)
520 node, idx = self._get_page_in_node(index)
521 page[NameObject(PagesAttributes.PARENT)] = node.indirect_reference
523 if idx >= 0:
524 cast(ArrayObject, node[PagesAttributes.KIDS]).insert(idx, page.indirect_reference)
525 self.flattened_pages.insert(index, page)
526 else:
527 cast(ArrayObject, node[PagesAttributes.KIDS]).append(page.indirect_reference)
528 self.flattened_pages.append(page)
529 recurse = 0
530 while not is_null_or_none(node):
531 node = cast(DictionaryObject, node.get_object())
532 node[NameObject(PagesAttributes.COUNT)] = NumberObject(cast(int, node[PagesAttributes.COUNT]) + 1)
533 node = node.get(PagesAttributes.PARENT, None) # type: ignore[assignment] # TODO: Fix.
534 recurse += 1
535 if recurse > 1000:
536 raise PyPdfError("Too many recursive calls!")
538 if page_org.pdf is not None:
539 # the page may contain links to other pages, and those other
540 # pages may or may not already be added. we store the
541 # information we need, so that we can resolve the references
542 # later.
543 self._unresolved_links.extend(extract_links(page, page_org))
544 self._merged_in_pages[page_org.indirect_reference] = page.indirect_reference
546 return page
548 def set_need_appearances_writer(self, state: bool = True) -> None:
549 """
550 Sets the "NeedAppearances" flag in the PDF writer.
552 The "NeedAppearances" flag indicates whether the appearance dictionary
553 for form fields should be automatically generated by the PDF viewer or
554 if the embedded appearance should be used.
556 Args:
557 state: The actual value of the NeedAppearances flag.
559 Returns:
560 None
562 """
563 # See §12.7.2 and §7.7.2 for more information:
564 # https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
565 try:
566 # get the AcroForm tree
567 if CatalogDictionary.ACRO_FORM not in self._root_object:
568 self._root_object[
569 NameObject(CatalogDictionary.ACRO_FORM)
570 ] = self._add_object(DictionaryObject())
572 need_appearances = NameObject(InteractiveFormDictEntries.NeedAppearances)
573 cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])[
574 need_appearances
575 ] = BooleanObject(state)
576 except Exception as exc: # pragma: no cover
577 logger_warning(
578 f"set_need_appearances_writer({state}) catch : {exc}", __name__
579 )
581 def create_viewer_preferences(self) -> ViewerPreferences:
582 o = ViewerPreferences()
583 self._root_object[
584 NameObject(CatalogDictionary.VIEWER_PREFERENCES)
585 ] = self._add_object(o)
586 return o
588 def add_page(
589 self,
590 page: PageObject,
591 excluded_keys: Iterable[str] = (),
592 ) -> PageObject:
593 """
594 Add a page to this PDF file.
596 Recommended for advanced usage including the adequate excluded_keys.
598 The page is usually acquired from a :class:`PdfReader<pypdf.PdfReader>`
599 instance.
601 Args:
602 page: The page to add to the document. Should be
603 an instance of :class:`PageObject<pypdf._page.PageObject>`
604 excluded_keys:
606 Returns:
607 The added PageObject.
609 """
610 assert self.flattened_pages is not None, "mypy"
611 return self._add_page(page, len(self.flattened_pages), excluded_keys)
613 def insert_page(
614 self,
615 page: PageObject,
616 index: int = 0,
617 excluded_keys: Iterable[str] = (),
618 ) -> PageObject:
619 """
620 Insert a page in this PDF file. The page is usually acquired from a
621 :class:`PdfReader<pypdf.PdfReader>` instance.
623 Args:
624 page: The page to add to the document.
625 index: Position at which the page will be inserted.
626 excluded_keys:
628 Returns:
629 The added PageObject.
631 """
632 assert self.flattened_pages is not None, "mypy"
633 if index < 0:
634 index += len(self.flattened_pages)
635 if index < 0:
636 raise ValueError("Invalid index value")
637 if index >= len(self.flattened_pages):
638 return self.add_page(page, excluded_keys)
639 return self._add_page(page, index, excluded_keys)
641 def _get_page_number_by_indirect(
642 self, indirect_reference: Union[None, int, NullObject, IndirectObject]
643 ) -> Optional[int]:
644 """
645 Generate _page_id2num.
647 Args:
648 indirect_reference:
650 Returns:
651 The page number or None
653 """
654 # To provide same function as in PdfReader
655 if is_null_or_none(indirect_reference):
656 return None
657 assert indirect_reference is not None, "mypy"
658 if isinstance(indirect_reference, int):
659 indirect_reference = IndirectObject(indirect_reference, 0, self)
660 obj = indirect_reference.get_object()
661 if isinstance(obj, PageObject):
662 return obj.page_number
663 return None
665 def add_blank_page(
666 self, width: Optional[float] = None, height: Optional[float] = None
667 ) -> PageObject:
668 """
669 Append a blank page to this PDF file and return it.
671 If no page size is specified, use the size of the last page.
673 Args:
674 width: The width of the new page expressed in default user
675 space units.
676 height: The height of the new page expressed in default
677 user space units.
679 Returns:
680 The newly appended page.
682 Raises:
683 PageSizeNotDefinedError: if width and height are not defined
684 and previous page does not exist.
686 """
687 page = PageObject.create_blank_page(self, width, height)
688 return self.add_page(page)
690 def insert_blank_page(
691 self,
692 width: Optional[Union[float, decimal.Decimal]] = None,
693 height: Optional[Union[float, decimal.Decimal]] = None,
694 index: int = 0,
695 ) -> PageObject:
696 """
697 Insert a blank page to this PDF file and return it.
699 If no page size is specified for a dimension, use the size of the last page.
701 Args:
702 width: The width of the new page expressed in default user
703 space units.
704 height: The height of the new page expressed in default
705 user space units.
706 index: Position to add the page.
708 Returns:
709 The newly inserted page.
711 Raises:
712 PageSizeNotDefinedError: if width and height are not defined
713 and previous page does not exist.
715 """
716 if width is None or (height is None and index < self.get_num_pages()):
717 oldpage = self.pages[index]
718 width = oldpage.mediabox.width
719 height = oldpage.mediabox.height
720 page = PageObject.create_blank_page(self, width, height)
721 self.insert_page(page, index)
722 return page
724 @property
725 def open_destination(
726 self,
727 ) -> Union[None, Destination, TextStringObject, ByteStringObject]:
728 return super().open_destination
730 @open_destination.setter
731 def open_destination(self, dest: Union[None, str, Destination, PageObject]) -> None:
732 if dest is None:
733 try:
734 del self._root_object["/OpenAction"]
735 except KeyError:
736 pass
737 elif isinstance(dest, str):
738 self._root_object[NameObject("/OpenAction")] = TextStringObject(dest)
739 elif isinstance(dest, Destination):
740 self._root_object[NameObject("/OpenAction")] = dest.dest_array
741 elif isinstance(dest, PageObject):
742 self._root_object[NameObject("/OpenAction")] = Destination(
743 "Opening",
744 dest.indirect_reference
745 if dest.indirect_reference is not None
746 else NullObject(),
747 PAGE_FIT,
748 ).dest_array
750 def add_js(self, javascript: str) -> None:
751 """
752 Add JavaScript which will launch upon opening this PDF.
754 Args:
755 javascript: Your JavaScript.
757 Example:
758 This will launch the print window when the PDF is opened.
760 >>> from pypdf import PdfWriter
761 >>> output = PdfWriter()
762 >>> output.add_js("this.print({bUI:true,bSilent:false,bShrinkToFit:true});")
764 """
765 # Names / JavaScript preferred to be able to add multiple scripts
766 if "/Names" not in self._root_object:
767 self._root_object[NameObject(CA.NAMES)] = DictionaryObject()
768 names = cast(DictionaryObject, self._root_object[CA.NAMES])
769 if "/JavaScript" not in names:
770 names[NameObject("/JavaScript")] = DictionaryObject(
771 {NameObject("/Names"): ArrayObject()}
772 )
773 js_list = cast(
774 ArrayObject, cast(DictionaryObject, names["/JavaScript"])["/Names"]
775 )
776 # We need a name for parameterized JavaScript in the PDF file,
777 # but it can be anything.
778 js_list.append(create_string_object(str(uuid.uuid4())))
780 js = DictionaryObject(
781 {
782 NameObject(PagesAttributes.TYPE): NameObject("/Action"),
783 NameObject("/S"): NameObject("/JavaScript"),
784 NameObject("/JS"): TextStringObject(f"{javascript}"),
785 }
786 )
787 js_list.append(self._add_object(js))
789 def add_attachment(self, filename: str, data: Union[str, bytes]) -> "EmbeddedFile":
790 """
791 Embed a file inside the PDF.
793 Reference:
794 https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
795 Section 7.11.3
797 Args:
798 filename: The filename to display.
799 data: The data in the file.
801 Returns:
802 EmbeddedFile instance for the newly created embedded file.
804 """
805 return EmbeddedFile._create_new(self, filename, data)
807 def append_pages_from_reader(
808 self,
809 reader: PdfReader,
810 after_page_append: Optional[Callable[[PageObject], None]] = None,
811 ) -> None:
812 """
813 Copy pages from reader to writer. Includes an optional callback
814 parameter which is invoked after pages are appended to the writer.
816 ``append`` should be preferred.
818 Args:
819 reader: a PdfReader object from which to copy page
820 annotations to this writer object. The writer's annots
821 will then be updated.
822 after_page_append:
823 Callback function that is invoked after each page is appended to
824 the writer. Signature includes a reference to the appended page
825 (delegates to append_pages_from_reader). The single parameter of
826 the callback is a reference to the page just appended to the
827 document.
829 """
830 reader_num_pages = len(reader.pages)
831 # Copy pages from reader to writer
832 for reader_page_number in range(reader_num_pages):
833 reader_page = reader.pages[reader_page_number]
834 writer_page = self.add_page(reader_page)
835 # Trigger callback, pass writer page as parameter
836 if callable(after_page_append):
837 after_page_append(writer_page)
839 def _merge_content_stream_to_page(
840 self,
841 page: PageObject,
842 new_content_data: bytes,
843 ) -> None:
844 """
845 Combines existing content stream(s) with new content (as bytes).
847 Args:
848 page: The page to which the new content data will be added.
849 new_content_data: A binary-encoded new content stream, for
850 instance the commands to draw an XObject.
851 """
852 # First resolve the existing page content. This always is an IndirectObject:
853 # PDF Explained by John Whitington
854 # https://www.oreilly.com/library/view/pdf-explained/9781449321581/ch04.html
855 if NameObject("/Contents") in page:
856 existing_content_ref = page[NameObject("/Contents")]
857 existing_content = existing_content_ref.get_object()
859 if isinstance(existing_content, ArrayObject):
860 # Create a new StreamObject for the new_content_data
861 new_stream_obj = StreamObject()
862 new_stream_obj.set_data(new_content_data)
863 existing_content.append(self._add_object(new_stream_obj))
864 page[NameObject("/Contents")] = self._add_object(existing_content)
865 if isinstance(existing_content, StreamObject):
866 # Merge new content to existing StreamObject
867 merged_data = existing_content.get_data() + b"\n" + new_content_data
868 new_stream = StreamObject()
869 new_stream.set_data(merged_data)
870 page[NameObject("/Contents")] = self._add_object(new_stream)
871 else:
872 # If no existing content, then we have an empty page.
873 # Create a new StreamObject in a new /Contents entry.
874 new_stream = StreamObject()
875 new_stream.set_data(new_content_data)
876 page[NameObject("/Contents")] = self._add_object(new_stream)
878 def _add_apstream_object(
879 self,
880 page: PageObject,
881 appearance_stream_obj: StreamObject,
882 object_name: str,
883 x_offset: float,
884 y_offset: float,
885 ) -> None:
886 """
887 Adds an appearance stream to the page content in the form of
888 an XObject.
890 Args:
891 page: The page to which to add the appearance stream.
892 appearance_stream_obj: The appearance stream.
893 object_name: The name of the appearance stream.
894 x_offset: The horizontal offset for the appearance stream.
895 y_offset: The vertical offset for the appearance stream.
896 """
897 # Prepare XObject resource dictionary on the page. This currently
898 # only deals with font resources, but can easily be adapted to also
899 # include other resources.
900 pg_res = cast(DictionaryObject, page[PG.RESOURCES])
901 if "/Resources" in appearance_stream_obj:
902 ap_stream_res = cast(DictionaryObject, appearance_stream_obj["/Resources"])
903 ap_stream_font_dict = cast(DictionaryObject, ap_stream_res.get("/Font", DictionaryObject()))
904 if "/Font" not in pg_res:
905 font_dict_ref = self._add_object(DictionaryObject())
906 pg_res[NameObject("/Font")] = font_dict_ref
907 pg_font_res = cast(DictionaryObject, pg_res["/Font"].get_object())
908 # Merge fonts from the appearance stream into the page's font resources
909 for font_name, font_res in ap_stream_font_dict.items():
910 if font_name not in pg_font_res:
911 font_res_ref = self._add_object(font_res)
912 pg_font_res[font_name] = font_res_ref
913 # Always add the resolved stream object to the writer to get a new IndirectObject.
914 # This ensures we have a valid IndirectObject managed by *this* writer.
915 xobject_ref = self._add_object(appearance_stream_obj)
916 xobject_name = NameObject(f"/Fm_{object_name}")._sanitize()
917 if "/XObject" not in pg_res:
918 pg_res[NameObject("/XObject")] = DictionaryObject()
919 pg_xo_res = cast(DictionaryObject, pg_res["/XObject"])
920 if xobject_name not in pg_xo_res:
921 pg_xo_res[xobject_name] = xobject_ref
922 else:
923 logger_warning(
924 f"XObject {xobject_name!r} already added to page resources. This might be an issue.",
925 __name__
926 )
927 xobject_cm = Transformation().translate(x_offset, y_offset)
928 xobject_drawing_commands = f"q\n{xobject_cm._to_cm()}\n{xobject_name} Do\nQ".encode()
929 self._merge_content_stream_to_page(page, xobject_drawing_commands)
931 FFBITS_NUL = FA.FfBits(0)
933 def update_page_form_field_values(
934 self,
935 page: Union[PageObject, list[PageObject], None],
936 fields: Mapping[str, Union[str, list[str], tuple[str, str, float]]],
937 flags: FA.FfBits = FFBITS_NUL,
938 auto_regenerate: Optional[bool] = True,
939 flatten: bool = False,
940 ) -> None:
941 """
942 Update the form field values for a given page from a fields dictionary.
944 Copy field texts and values from fields to page.
945 If the field links to a parent object, add the information to the parent.
947 Args:
948 page: `PageObject` - references **PDF writer's page** where the
949 annotations and field data will be updated.
950 `List[Pageobject]` - provides list of pages to be processed.
951 `None` - all pages.
952 fields: a Python dictionary of:
954 * field names (/T) as keys and text values (/V) as value
955 * field names (/T) as keys and list of text values (/V) for multiple choice list
956 * field names (/T) as keys and tuple of:
957 * text values (/V)
958 * font id (e.g. /F1, the font id must exist)
959 * font size (0 for autosize)
961 flags: A set of flags from :class:`~pypdf.constants.FieldDictionaryAttributes.FfBits`.
963 auto_regenerate: Set/unset the need_appearances flag;
964 the flag is unchanged if auto_regenerate is None.
966 flatten: Whether or not to flatten the annotation. If True, this adds the annotation's
967 appearance stream to the page contents. Note that this option does not remove the
968 annotation itself.
970 """
971 if CatalogDictionary.ACRO_FORM not in self._root_object:
972 raise PyPdfError("No /AcroForm dictionary in PDF of PdfWriter Object")
973 acro_form = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
974 if InteractiveFormDictEntries.Fields not in acro_form:
975 raise PyPdfError("No /Fields dictionary in PDF of PdfWriter Object")
976 if isinstance(auto_regenerate, bool):
977 self.set_need_appearances_writer(auto_regenerate)
978 # Iterate through pages, update field values
979 if page is None:
980 page = list(self.pages)
981 if isinstance(page, list):
982 for p in page:
983 if PG.ANNOTS in p: # just to prevent warnings
984 self.update_page_form_field_values(p, fields, flags, None, flatten=flatten)
985 return
986 if PG.ANNOTS not in page:
987 logger_warning("No fields to update on this page", __name__)
988 return
989 appearance_stream_obj: Optional[StreamObject] = None
991 for annotation in page[PG.ANNOTS]: # type: ignore
992 annotation = cast(DictionaryObject, annotation.get_object())
993 if annotation.get("/Subtype", "") != "/Widget":
994 continue
995 if "/FT" in annotation and "/T" in annotation:
996 parent_annotation = annotation
997 else:
998 parent_annotation = annotation.get(
999 PG.PARENT, DictionaryObject()
1000 ).get_object()
1002 for field, value in fields.items():
1003 rectangle = cast(RectangleObject, annotation[AA.Rect])
1004 if not (
1005 self._get_qualified_field_name(parent_annotation) == field
1006 or parent_annotation.get("/T", None) == field
1007 ):
1008 continue
1009 if (
1010 parent_annotation.get("/FT", None) == "/Ch"
1011 and "/I" in parent_annotation
1012 ):
1013 del parent_annotation["/I"]
1014 if flags:
1015 annotation[NameObject(FA.Ff)] = NumberObject(flags)
1016 # Set the field value
1017 if not (value is None and flatten): # Only change values if given by user and not flattening.
1018 if isinstance(value, list):
1019 lst = ArrayObject(TextStringObject(v) for v in value)
1020 parent_annotation[NameObject(FA.V)] = lst
1021 elif isinstance(value, tuple):
1022 annotation[NameObject(FA.V)] = TextStringObject(
1023 value[0],
1024 )
1025 else:
1026 parent_annotation[NameObject(FA.V)] = TextStringObject(value)
1027 # Get or create the field's appearance stream object
1028 if parent_annotation.get(FA.FT) == "/Btn":
1029 # Checkbox button (no /FT found in Radio widgets);
1030 # We can find the associated appearance stream object
1031 # within the annotation.
1032 v = NameObject(value)
1033 ap = cast(DictionaryObject, annotation[NameObject(AA.AP)])
1034 normal_ap = cast(DictionaryObject, ap["/N"])
1035 if v not in normal_ap:
1036 v = NameObject("/Off")
1037 appearance_stream_obj = normal_ap.get(v)
1038 # Other cases will be updated through the for loop
1039 annotation[NameObject(AA.AS)] = v
1040 annotation[NameObject(FA.V)] = v
1041 elif (
1042 parent_annotation.get(FA.FT) == "/Tx"
1043 or parent_annotation.get(FA.FT) == "/Ch"
1044 ):
1045 # Textbox; we need to generate the appearance stream object
1046 if isinstance(value, tuple):
1047 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1048 acro_form, parent_annotation, annotation, value[1], value[2]
1049 )
1050 else:
1051 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1052 acro_form, parent_annotation, annotation
1053 )
1054 # Add the appearance stream object
1055 if AA.AP not in annotation:
1056 annotation[NameObject(AA.AP)] = DictionaryObject(
1057 {NameObject("/N"): self._add_object(appearance_stream_obj)}
1058 )
1059 elif "/N" not in (ap:= cast(DictionaryObject, annotation[AA.AP])):
1060 cast(DictionaryObject, annotation[NameObject(AA.AP)])[
1061 NameObject("/N")
1062 ] = self._add_object(appearance_stream_obj)
1063 else: # [/AP][/N] exists
1064 n = annotation[AA.AP]["/N"].indirect_reference.idnum # type: ignore
1065 self._objects[n - 1] = appearance_stream_obj
1066 appearance_stream_obj.indirect_reference = IndirectObject(n, 0, self)
1067 elif (
1068 annotation.get(FA.FT) == "/Sig"
1069 ): # deprecated # not implemented yet
1070 logger_warning("Signature forms not implemented yet", __name__)
1071 if flatten and appearance_stream_obj is not None:
1072 self._add_apstream_object(page, appearance_stream_obj, field, rectangle[0], rectangle[1])
1074 def reattach_fields(
1075 self, page: Optional[PageObject] = None
1076 ) -> list[DictionaryObject]:
1077 """
1078 Parse annotations within the page looking for orphan fields and
1079 reattach then into the Fields Structure.
1081 Args:
1082 page: page to analyze.
1083 If none is provided, all pages will be analyzed.
1085 Returns:
1086 list of reattached fields.
1088 """
1089 lst = []
1090 if page is None:
1091 for p in self.pages:
1092 lst += self.reattach_fields(p)
1093 return lst
1095 try:
1096 af = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
1097 except KeyError:
1098 af = DictionaryObject()
1099 self._root_object[NameObject(CatalogDictionary.ACRO_FORM)] = af
1100 try:
1101 fields = cast(ArrayObject, af[InteractiveFormDictEntries.Fields])
1102 except KeyError:
1103 fields = ArrayObject()
1104 af[NameObject(InteractiveFormDictEntries.Fields)] = fields
1106 if "/Annots" not in page:
1107 return lst
1108 annotations = cast(ArrayObject, page["/Annots"])
1109 for idx, annotation in enumerate(annotations):
1110 is_indirect = isinstance(annotation, IndirectObject)
1111 annotation = cast(DictionaryObject, annotation.get_object())
1112 if annotation.get("/Subtype", "") == "/Widget" and "/FT" in annotation:
1113 if (
1114 "indirect_reference" in annotation.__dict__
1115 and annotation.indirect_reference in fields
1116 ):
1117 continue
1118 if not is_indirect:
1119 annotations[idx] = self._add_object(annotation)
1120 fields.append(annotation.indirect_reference)
1121 lst.append(annotation)
1122 return lst
1124 def clone_reader_document_root(self, reader: PdfReader) -> None:
1125 """
1126 Copy the reader document root to the writer and all sub-elements,
1127 including pages, threads, outlines,... For partial insertion, ``append``
1128 should be considered.
1130 Args:
1131 reader: PdfReader from which the document root should be copied.
1133 """
1134 self._info_obj = None
1135 if self.incremental:
1136 self._objects = [None] * (cast(int, reader.trailer["/Size"]) - 1)
1137 for i in range(len(self._objects)):
1138 o = reader.get_object(i + 1)
1139 if o is not None:
1140 self._objects[i] = o.replicate(self)
1141 else:
1142 self._objects.clear()
1143 self._root_object = reader.root_object.clone(self)
1144 self._pages = self._root_object.raw_get("/Pages")
1146 if len(self._objects) > cast(int, reader.trailer["/Size"]):
1147 if self.strict:
1148 raise PdfReadError(
1149 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}"
1150 )
1151 logger_warning(
1152 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}",
1153 __name__
1154 )
1156 # must be done here before rewriting
1157 if self.incremental:
1158 self._original_hash = [
1159 (obj.hash_bin() if obj is not None else 0) for obj in self._objects
1160 ]
1162 try:
1163 self._flatten()
1164 except IndexError:
1165 raise PdfReadError("Got index error while flattening.")
1167 assert self.flattened_pages is not None
1168 for p in self.flattened_pages:
1169 self._replace_object(cast(IndirectObject, p.indirect_reference).idnum, p)
1170 if not self.incremental:
1171 p[NameObject("/Parent")] = self._pages
1172 if not self.incremental:
1173 cast(DictionaryObject, self._pages.get_object())[
1174 NameObject("/Kids")
1175 ] = ArrayObject([p.indirect_reference for p in self.flattened_pages])
1177 def clone_document_from_reader(
1178 self,
1179 reader: PdfReader,
1180 after_page_append: Optional[Callable[[PageObject], None]] = None,
1181 ) -> None:
1182 """
1183 Create a copy (clone) of a document from a PDF file reader cloning
1184 section '/Root' and '/Info' and '/ID' of the pdf.
1186 Args:
1187 reader: PDF file reader instance from which the clone
1188 should be created.
1189 after_page_append:
1190 Callback function that is invoked after each page is appended to
1191 the writer. Signature includes a reference to the appended page
1192 (delegates to append_pages_from_reader). The single parameter of
1193 the callback is a reference to the page just appended to the
1194 document.
1196 """
1197 self.clone_reader_document_root(reader)
1198 inf = reader._info
1199 if self.incremental:
1200 if inf is not None:
1201 self._info_obj = cast(
1202 IndirectObject, inf.clone(self).indirect_reference
1203 )
1204 assert isinstance(self._info, DictionaryObject), "for mypy"
1205 self._original_hash[
1206 self._info_obj.indirect_reference.idnum - 1
1207 ] = self._info.hash_bin()
1208 elif inf is not None:
1209 self._info_obj = self._add_object(
1210 DictionaryObject(cast(DictionaryObject, inf.get_object()))
1211 )
1212 # else: _info_obj = None done in clone_reader_document_root()
1214 try:
1215 self._ID = cast(ArrayObject, reader._ID).clone(self)
1216 except AttributeError:
1217 pass
1219 if callable(after_page_append):
1220 for page in cast(
1221 ArrayObject, cast(DictionaryObject, self._pages.get_object())["/Kids"]
1222 ):
1223 after_page_append(page.get_object())
1225 def _compute_document_identifier(self) -> ByteStringObject:
1226 stream = BytesIO()
1227 self._write_pdf_structure(stream)
1228 stream.seek(0)
1229 return ByteStringObject(_rolling_checksum(stream).encode("utf8"))
1231 def generate_file_identifiers(self) -> None:
1232 """
1233 Generate an identifier for the PDF that will be written.
1235 The only point of this is ensuring uniqueness. Reproducibility is not
1236 required.
1237 When a file is first written, both identifiers shall be set to the same value.
1238 If both identifiers match when a file reference is resolved, it is very
1239 likely that the correct and unchanged file has been found. If only the first
1240 identifier matches, a different version of the correct file has been found.
1241 see §14.4 "File Identifiers".
1242 """
1243 if self._ID:
1244 id1 = self._ID[0]
1245 id2 = self._compute_document_identifier()
1246 else:
1247 id1 = self._compute_document_identifier()
1248 id2 = id1
1249 self._ID = ArrayObject((id1, id2))
1251 def encrypt(
1252 self,
1253 user_password: str,
1254 owner_password: Optional[str] = None,
1255 use_128bit: bool = True,
1256 permissions_flag: UserAccessPermissions = ALL_DOCUMENT_PERMISSIONS,
1257 *,
1258 algorithm: Optional[str] = None,
1259 ) -> None:
1260 """
1261 Encrypt this PDF file with the PDF Standard encryption handler.
1263 Args:
1264 user_password: The password which allows for opening
1265 and reading the PDF file with the restrictions provided.
1266 owner_password: The password which allows for
1267 opening the PDF files without any restrictions. By default,
1268 the owner password is the same as the user password.
1269 use_128bit: flag as to whether to use 128bit
1270 encryption. When false, 40bit encryption will be used.
1271 By default, this flag is on.
1272 permissions_flag: permissions as described in
1273 Table 3.20 of the PDF 1.7 specification. A bit value of 1 means
1274 the permission is granted.
1275 Hence an integer value of -1 will set all flags.
1276 Bit position 3 is for printing, 4 is for modifying content,
1277 5 and 6 control annotations, 9 for form fields,
1278 10 for extraction of text and graphics.
1279 algorithm: encrypt algorithm. Values may be one of "RC4-40", "RC4-128",
1280 "AES-128", "AES-256-R5", "AES-256". If it is valid,
1281 `use_128bit` will be ignored.
1283 """
1284 if owner_password is None:
1285 owner_password = user_password
1287 if algorithm is not None:
1288 try:
1289 alg = getattr(EncryptAlgorithm, algorithm.replace("-", "_"))
1290 except AttributeError:
1291 raise ValueError(f"Algorithm '{algorithm}' NOT supported")
1292 else:
1293 alg = EncryptAlgorithm.RC4_128
1294 if not use_128bit:
1295 alg = EncryptAlgorithm.RC4_40
1296 self.generate_file_identifiers()
1297 assert self._ID
1298 self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
1299 # in case call `encrypt` again
1300 entry = self._encryption.write_entry(user_password, owner_password)
1301 if self._encrypt_entry:
1302 # replace old encrypt_entry
1303 assert self._encrypt_entry.indirect_reference is not None
1304 entry.indirect_reference = self._encrypt_entry.indirect_reference
1305 self._objects[entry.indirect_reference.idnum - 1] = entry
1306 else:
1307 self._add_object(entry)
1308 self._encrypt_entry = entry
1310 def _resolve_links(self) -> None:
1311 """Patch up links that were added to the document earlier, to
1312 make sure they still point to the same pages.
1313 """
1314 for (new_link, old_link) in self._unresolved_links:
1315 old_page = old_link.find_referenced_page()
1316 if not old_page:
1317 continue
1318 new_page = self._merged_in_pages.get(old_page)
1319 if new_page is None:
1320 continue
1321 new_link.patch_reference(self, new_page)
1323 def write_stream(self, stream: StreamType) -> None:
1324 if hasattr(stream, "mode") and "b" not in stream.mode:
1325 logger_warning(
1326 f"File <{stream.name}> to write to is not in binary mode. "
1327 "It may not be written to correctly.",
1328 __name__,
1329 )
1330 self._resolve_links()
1332 if self.incremental:
1333 self._reader.stream.seek(0)
1334 stream.write(self._reader.stream.read(-1))
1335 if len(self.list_objects_in_increment()) > 0:
1336 self._write_increment(stream) # writes objs, xref stream and startxref
1337 else:
1338 object_positions, free_objects = self._write_pdf_structure(stream)
1339 xref_location = self._write_xref_table(
1340 stream, object_positions, free_objects
1341 )
1342 self._write_trailer(stream, xref_location)
1344 def write(self, stream: Union[Path, StrByteType]) -> tuple[bool, IO[Any]]:
1345 """
1346 Write the collection of pages added to this object out as a PDF file.
1348 Args:
1349 stream: An object to write the file to. The object can support
1350 the write method and the tell method, similar to a file object, or
1351 be a file path, just like the fileobj, just named it stream to keep
1352 existing workflow.
1354 Returns:
1355 A tuple (bool, IO).
1357 """
1358 my_file = False
1360 if stream == "":
1361 raise ValueError(f"Output({stream=}) is empty.")
1363 if isinstance(stream, (str, Path)):
1364 stream = FileIO(stream, "wb")
1365 my_file = True
1367 self.write_stream(stream)
1369 if my_file:
1370 stream.close()
1371 else:
1372 stream.flush()
1374 return my_file, stream
1376 def list_objects_in_increment(self) -> list[IndirectObject]:
1377 """
1378 For analysis or debugging.
1379 Provides the list of new or modified objects that will be written
1380 in the increment.
1381 Deleted objects will not be freed but will become orphans.
1383 Returns:
1384 List of new or modified IndirectObjects
1386 """
1387 original_hash_count = len(self._original_hash)
1388 return [
1389 cast(IndirectObject, obj).indirect_reference
1390 for i, obj in enumerate(self._objects)
1391 if (
1392 obj is not None
1393 and (
1394 i >= original_hash_count
1395 or obj.hash_bin() != self._original_hash[i]
1396 )
1397 )
1398 ]
1400 def _write_increment(self, stream: StreamType) -> None:
1401 object_positions = {}
1402 object_blocks = []
1403 current_start = -1
1404 current_stop = -2
1405 original_hash_count = len(self._original_hash)
1406 for i, obj in enumerate(self._objects):
1407 if obj is not None and (
1408 i >= original_hash_count
1409 or obj.hash_bin() != self._original_hash[i]
1410 ):
1411 idnum = i + 1
1412 assert isinstance(obj, PdfObject), "mypy"
1413 # first write new/modified object
1414 object_positions[idnum] = stream.tell()
1415 stream.write(f"{idnum} 0 obj\n".encode())
1416 """ encryption is not operational
1417 if self._encryption and obj != self._encrypt_entry:
1418 obj = self._encryption.encrypt_object(obj, idnum, 0)
1419 """
1420 obj.write_to_stream(stream)
1421 stream.write(b"\nendobj\n")
1423 # prepare xref
1424 if idnum != current_stop:
1425 if current_start > 0:
1426 object_blocks.append(
1427 [current_start, current_stop - current_start]
1428 )
1429 current_start = idnum
1430 current_stop = idnum + 1
1431 assert current_start > 0, "for pytest only"
1432 object_blocks.append([current_start, current_stop - current_start])
1433 # write incremented xref
1434 xref_location = stream.tell()
1435 xr_id = len(self._objects) + 1
1436 stream.write(f"{xr_id} 0 obj".encode())
1437 init_data = {
1438 NameObject("/Type"): NameObject("/XRef"),
1439 NameObject("/Size"): NumberObject(xr_id + 1),
1440 NameObject("/Root"): self.root_object.indirect_reference,
1441 NameObject("/Filter"): NameObject("/FlateDecode"),
1442 NameObject("/Index"): ArrayObject(
1443 [NumberObject(_it) for _su in object_blocks for _it in _su]
1444 ),
1445 NameObject("/W"): ArrayObject(
1446 [NumberObject(1), NumberObject(4), NumberObject(1)]
1447 ),
1448 "__streamdata__": b"",
1449 }
1450 if self._info is not None and (
1451 self._info.indirect_reference.idnum - 1 # type: ignore
1452 >= len(self._original_hash)
1453 or cast(IndirectObject, self._info).hash_bin() # kept for future
1454 != self._original_hash[
1455 self._info.indirect_reference.idnum - 1 # type: ignore
1456 ]
1457 ):
1458 init_data[NameObject(TK.INFO)] = self._info.indirect_reference
1459 init_data[NameObject(TK.PREV)] = NumberObject(self._reader._startxref)
1460 if self._ID:
1461 init_data[NameObject(TK.ID)] = self._ID
1462 xr = StreamObject.initialize_from_dictionary(init_data)
1463 xr.set_data(
1464 b"".join(
1465 [struct.pack(b">BIB", 1, _pos, 0) for _pos in object_positions.values()]
1466 )
1467 )
1468 xr.write_to_stream(stream)
1469 stream.write(f"\nendobj\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1471 def _write_pdf_structure(self, stream: StreamType) -> tuple[list[int], list[int]]:
1472 object_positions = []
1473 free_objects = []
1474 stream.write(self.pdf_header.encode() + b"\n")
1475 stream.write(b"%\xE2\xE3\xCF\xD3\n")
1477 for idnum, obj in enumerate(self._objects, start=1):
1478 if obj is not None:
1479 object_positions.append(stream.tell())
1480 stream.write(f"{idnum} 0 obj\n".encode())
1481 if self._encryption and obj != self._encrypt_entry:
1482 obj = self._encryption.encrypt_object(obj, idnum, 0)
1483 obj.write_to_stream(stream)
1484 stream.write(b"\nendobj\n")
1485 else:
1486 object_positions.append(-1)
1487 free_objects.append(idnum)
1488 free_objects.append(0) # add 0 to loop in accordance with specification
1489 return object_positions, free_objects
1491 def _write_xref_table(
1492 self, stream: StreamType, object_positions: list[int], free_objects: list[int]
1493 ) -> int:
1494 xref_location = stream.tell()
1495 stream.write(b"xref\n")
1496 stream.write(f"0 {len(self._objects) + 1}\n".encode())
1497 stream.write(f"{free_objects[0]:0>10} {65535:0>5} f \n".encode())
1498 free_idx = 1
1499 for offset in object_positions:
1500 if offset > 0:
1501 stream.write(f"{offset:0>10} {0:0>5} n \n".encode())
1502 else:
1503 stream.write(f"{free_objects[free_idx]:0>10} {1:0>5} f \n".encode())
1504 free_idx += 1
1505 return xref_location
1507 def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
1508 """
1509 Write the PDF trailer to the stream.
1511 To quote the PDF specification:
1512 [The] trailer [gives] the location of the cross-reference table and
1513 of certain special objects within the body of the file.
1514 """
1515 stream.write(b"trailer\n")
1516 trailer = DictionaryObject(
1517 {
1518 NameObject(TK.SIZE): NumberObject(len(self._objects) + 1),
1519 NameObject(TK.ROOT): self.root_object.indirect_reference,
1520 }
1521 )
1522 if self._info is not None:
1523 trailer[NameObject(TK.INFO)] = self._info.indirect_reference
1524 if self._ID is not None:
1525 trailer[NameObject(TK.ID)] = self._ID
1526 if self._encrypt_entry:
1527 trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
1528 trailer.write_to_stream(stream)
1529 stream.write(f"\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1531 @property
1532 def metadata(self) -> Optional[DocumentInformation]:
1533 """
1534 Retrieve/set the PDF file's document information dictionary, if it exists.
1536 Args:
1537 value: dict with the entries to be set. if None : remove the /Info entry from the pdf.
1539 Note that some PDF files use (XMP) metadata streams instead of document
1540 information dictionaries, and these metadata streams will not be
1541 accessed by this function, but by :meth:`~xmp_metadata`.
1543 """
1544 return super().metadata
1546 @metadata.setter
1547 def metadata(
1548 self,
1549 value: Optional[Union[DocumentInformation, DictionaryObject, dict[Any, Any]]],
1550 ) -> None:
1551 if value is None:
1552 self._info = None
1553 else:
1554 if self._info is not None:
1555 self._info.clear()
1557 self.add_metadata(value)
1559 def add_metadata(self, infos: dict[str, Any]) -> None:
1560 """
1561 Add custom metadata to the output.
1563 Args:
1564 infos: a Python dictionary where each key is a field
1565 and each value is your new metadata.
1567 """
1568 args = {}
1569 if isinstance(infos, PdfObject):
1570 infos = cast(DictionaryObject, infos.get_object())
1571 for key, value in list(infos.items()):
1572 if isinstance(value, PdfObject):
1573 value = value.get_object()
1574 args[NameObject(key)] = create_string_object(str(value))
1575 if self._info is None:
1576 self._info = DictionaryObject()
1577 self._info.update(args)
1579 def compress_identical_objects(
1580 self,
1581 remove_identicals: bool = True,
1582 remove_orphans: bool = True,
1583 ) -> None:
1584 """
1585 Parse the PDF file and merge objects that have the same hash.
1586 This will make objects common to multiple pages.
1587 Recommended to be used just before writing output.
1589 Args:
1590 remove_identicals: Remove identical objects.
1591 remove_orphans: Remove unreferenced objects.
1593 """
1595 def replace_in_obj(
1596 obj: PdfObject, crossref: dict[IndirectObject, IndirectObject]
1597 ) -> None:
1598 if isinstance(obj, DictionaryObject):
1599 key_val = obj.items()
1600 elif isinstance(obj, ArrayObject):
1601 key_val = enumerate(obj) # type: ignore
1602 else:
1603 return
1604 assert isinstance(obj, (DictionaryObject, ArrayObject))
1605 for k, v in key_val:
1606 if isinstance(v, IndirectObject):
1607 orphans[v.idnum - 1] = False
1608 if v in crossref:
1609 obj[k] = crossref[v]
1610 else:
1611 """the filtering on DictionaryObject and ArrayObject only
1612 will be performed within replace_in_obj"""
1613 replace_in_obj(v, crossref)
1615 # _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...])
1616 self._idnum_hash = {}
1617 orphans = [True] * len(self._objects)
1618 # look for similar objects
1619 for idx, obj in enumerate(self._objects):
1620 if is_null_or_none(obj):
1621 continue
1622 assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here.
1623 assert isinstance(obj.indirect_reference, IndirectObject)
1624 h = obj.hash_value()
1625 if remove_identicals and h in self._idnum_hash:
1626 self._idnum_hash[h][1].append(obj.indirect_reference)
1627 self._objects[idx] = None
1628 else:
1629 self._idnum_hash[h] = (obj.indirect_reference, [])
1631 # generate the dict converting others to 1st
1632 cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0}
1633 cnv_rev: dict[IndirectObject, IndirectObject] = {}
1634 for k, v in cnv.items():
1635 cnv_rev.update(zip(v, (k,) * len(v)))
1637 # replace reference to merged objects
1638 for obj in self._objects:
1639 if isinstance(obj, (DictionaryObject, ArrayObject)):
1640 replace_in_obj(obj, cnv_rev)
1642 # remove orphans (if applicable)
1643 orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore
1645 if not is_null_or_none(self._info):
1646 orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore
1648 try:
1649 orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore
1650 except AttributeError:
1651 pass
1652 for i in compress(range(len(self._objects)), orphans):
1653 self._objects[i] = None
1655 def get_reference(self, obj: PdfObject) -> IndirectObject:
1656 idnum = self._objects.index(obj) + 1
1657 ref = IndirectObject(idnum, 0, self)
1658 assert ref.get_object() == obj
1659 return ref
1661 def get_outline_root(self) -> TreeObject:
1662 if CO.OUTLINES in self._root_object:
1663 # Entries in the catalog dictionary
1664 outline = cast(TreeObject, self._root_object[CO.OUTLINES])
1665 if not isinstance(outline, TreeObject):
1666 t = TreeObject(outline)
1667 self._replace_object(outline.indirect_reference.idnum, t)
1668 outline = t
1669 idnum = self._objects.index(outline) + 1
1670 outline_ref = IndirectObject(idnum, 0, self)
1671 assert outline_ref.get_object() == outline
1672 else:
1673 outline = TreeObject()
1674 outline.update({})
1675 outline_ref = self._add_object(outline)
1676 self._root_object[NameObject(CO.OUTLINES)] = outline_ref
1678 return outline
1680 def get_threads_root(self) -> ArrayObject:
1681 """
1682 The list of threads.
1684 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1686 Returns:
1687 An array (possibly empty) of Dictionaries with an ``/F`` key,
1688 and optionally information about the thread in ``/I`` or ``/Metadata`` keys.
1690 """
1691 if CO.THREADS in self._root_object:
1692 # Entries in the catalog dictionary
1693 threads = cast(ArrayObject, self._root_object[CO.THREADS])
1694 else:
1695 threads = ArrayObject()
1696 self._root_object[NameObject(CO.THREADS)] = threads
1697 return threads
1699 @property
1700 def threads(self) -> ArrayObject:
1701 """
1702 Read-only property for the list of threads.
1704 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1706 Each element is a dictionary with an ``/F`` key, and optionally
1707 information about the thread in ``/I`` or ``/Metadata`` keys.
1708 """
1709 return self.get_threads_root()
1711 def add_outline_item_destination(
1712 self,
1713 page_destination: Union[IndirectObject, PageObject, TreeObject],
1714 parent: Union[None, TreeObject, IndirectObject] = None,
1715 before: Union[None, TreeObject, IndirectObject] = None,
1716 is_open: bool = True,
1717 ) -> IndirectObject:
1718 page_destination = cast(PageObject, page_destination.get_object())
1719 if isinstance(page_destination, PageObject):
1720 return self.add_outline_item_destination(
1721 Destination(
1722 f"page #{page_destination.page_number}",
1723 cast(IndirectObject, page_destination.indirect_reference),
1724 Fit.fit(),
1725 )
1726 )
1728 if parent is None:
1729 parent = self.get_outline_root()
1731 page_destination[NameObject("/%is_open%")] = BooleanObject(is_open)
1732 parent = cast(TreeObject, parent.get_object())
1733 page_destination_ref = self._add_object(page_destination)
1734 if before is not None:
1735 before = before.indirect_reference
1736 parent.insert_child(
1737 page_destination_ref,
1738 before,
1739 self,
1740 page_destination.inc_parent_counter_outline
1741 if is_open
1742 else (lambda x, y: 0), # noqa: ARG005
1743 )
1744 if "/Count" not in page_destination:
1745 page_destination[NameObject("/Count")] = NumberObject(0)
1747 return page_destination_ref
1749 def add_outline_item_dict(
1750 self,
1751 outline_item: OutlineItemType,
1752 parent: Union[None, TreeObject, IndirectObject] = None,
1753 before: Union[None, TreeObject, IndirectObject] = None,
1754 is_open: bool = True,
1755 ) -> IndirectObject:
1756 outline_item_object = TreeObject()
1757 outline_item_object.update(outline_item)
1759 """code currently unreachable
1760 if "/A" in outline_item:
1761 action = DictionaryObject()
1762 a_dict = cast(DictionaryObject, outline_item["/A"])
1763 for k, v in list(a_dict.items()):
1764 action[NameObject(str(k))] = v
1765 action_ref = self._add_object(action)
1766 outline_item_object[NameObject("/A")] = action_ref
1767 """
1768 return self.add_outline_item_destination(
1769 outline_item_object, parent, before, is_open
1770 )
1772 def add_outline_item(
1773 self,
1774 title: str,
1775 page_number: Union[None, PageObject, IndirectObject, int],
1776 parent: Union[None, TreeObject, IndirectObject] = None,
1777 before: Union[None, TreeObject, IndirectObject] = None,
1778 color: Optional[Union[tuple[float, float, float], str]] = None,
1779 bold: bool = False,
1780 italic: bool = False,
1781 fit: Fit = PAGE_FIT,
1782 is_open: bool = True,
1783 ) -> IndirectObject:
1784 """
1785 Add an outline item (commonly referred to as a "Bookmark") to the PDF file.
1787 Args:
1788 title: Title to use for this outline item.
1789 page_number: Page number this outline item will point to.
1790 parent: A reference to a parent outline item to create nested
1791 outline items.
1792 before:
1793 color: Color of the outline item's font as a red, green, blue tuple
1794 from 0.0 to 1.0 or as a Hex String (#RRGGBB)
1795 bold: Outline item font is bold
1796 italic: Outline item font is italic
1797 fit: The fit of the destination page.
1799 Returns:
1800 The added outline item as an indirect object.
1802 """
1803 page_ref: Union[None, NullObject, IndirectObject, NumberObject]
1804 if isinstance(italic, Fit): # it means that we are on the old params
1805 if fit is not None and page_number is None:
1806 page_number = fit
1807 return self.add_outline_item(
1808 title, page_number, parent, None, before, color, bold, italic, is_open=is_open
1809 )
1810 if page_number is None:
1811 action_ref = None
1812 else:
1813 if isinstance(page_number, IndirectObject):
1814 page_ref = page_number
1815 elif isinstance(page_number, PageObject):
1816 page_ref = page_number.indirect_reference
1817 elif isinstance(page_number, int):
1818 try:
1819 page_ref = self.pages[page_number].indirect_reference
1820 except IndexError:
1821 page_ref = NumberObject(page_number)
1822 if page_ref is None:
1823 logger_warning(
1824 f"can not find reference of page {page_number}",
1825 __name__,
1826 )
1827 page_ref = NullObject()
1828 dest = Destination(
1829 NameObject("/" + title + " outline item"),
1830 page_ref,
1831 fit,
1832 )
1834 action_ref = self._add_object(
1835 DictionaryObject(
1836 {
1837 NameObject(GoToActionArguments.D): dest.dest_array,
1838 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1839 }
1840 )
1841 )
1842 outline_item = self._add_object(
1843 _create_outline_item(action_ref, title, color, italic, bold)
1844 )
1846 if parent is None:
1847 parent = self.get_outline_root()
1848 return self.add_outline_item_destination(outline_item, parent, before, is_open)
1850 def add_outline(self) -> None:
1851 raise NotImplementedError(
1852 "This method is not yet implemented. Use :meth:`add_outline_item` instead."
1853 )
1855 def add_named_destination_array(
1856 self, title: TextStringObject, destination: Union[IndirectObject, ArrayObject]
1857 ) -> None:
1858 named_dest = self.get_named_dest_root()
1859 i = 0
1860 while i < len(named_dest):
1861 if title < named_dest[i]:
1862 named_dest.insert(i, destination)
1863 named_dest.insert(i, TextStringObject(title))
1864 return
1865 i += 2
1866 named_dest.extend([TextStringObject(title), destination])
1867 return
1869 def add_named_destination_object(
1870 self,
1871 page_destination: PdfObject,
1872 ) -> IndirectObject:
1873 page_destination_ref = self._add_object(page_destination.dest_array) # type: ignore
1874 self.add_named_destination_array(
1875 cast("TextStringObject", page_destination["/Title"]), page_destination_ref # type: ignore
1876 )
1878 return page_destination_ref
1880 def add_named_destination(
1881 self,
1882 title: str,
1883 page_number: int,
1884 ) -> IndirectObject:
1885 page_ref = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
1886 dest = DictionaryObject()
1887 dest.update(
1888 {
1889 NameObject(GoToActionArguments.D): ArrayObject(
1890 [page_ref, NameObject(TypFitArguments.FIT_H), NumberObject(826)]
1891 ),
1892 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1893 }
1894 )
1896 dest_ref = self._add_object(dest)
1897 if not isinstance(title, TextStringObject):
1898 title = TextStringObject(str(title))
1900 self.add_named_destination_array(title, dest_ref)
1901 return dest_ref
1903 def remove_links(self) -> None:
1904 """Remove links and annotations from this output."""
1905 for page in self.pages:
1906 self.remove_objects_from_page(page, ObjectDeletionFlag.ALL_ANNOTATIONS)
1908 def remove_annotations(
1909 self, subtypes: Optional[Union[AnnotationSubtype, Iterable[AnnotationSubtype]]]
1910 ) -> None:
1911 """
1912 Remove annotations by annotation subtype.
1914 Args:
1915 subtypes: subtype or list of subtypes to be removed.
1916 Examples are: "/Link", "/FileAttachment", "/Sound",
1917 "/Movie", "/Screen", ...
1918 If you want to remove all annotations, use subtypes=None.
1920 """
1921 for page in self.pages:
1922 self._remove_annots_from_page(page, subtypes)
1924 def _remove_annots_from_page(
1925 self,
1926 page: Union[IndirectObject, PageObject, DictionaryObject],
1927 subtypes: Optional[Iterable[str]],
1928 ) -> None:
1929 page = cast(DictionaryObject, page.get_object())
1930 if PG.ANNOTS in page:
1931 i = 0
1932 while i < len(cast(ArrayObject, page[PG.ANNOTS])):
1933 an = cast(ArrayObject, page[PG.ANNOTS])[i]
1934 obj = cast(DictionaryObject, an.get_object())
1935 if subtypes is None or cast(str, obj["/Subtype"]) in subtypes:
1936 if isinstance(an, IndirectObject):
1937 self._objects[an.idnum - 1] = NullObject() # to reduce PDF size
1938 del page[PG.ANNOTS][i] # type:ignore
1939 else:
1940 i += 1
1942 def remove_objects_from_page(
1943 self,
1944 page: Union[PageObject, DictionaryObject],
1945 to_delete: Union[ObjectDeletionFlag, Iterable[ObjectDeletionFlag]],
1946 text_filters: Optional[dict[str, Any]] = None
1947 ) -> None:
1948 """
1949 Remove objects specified by ``to_delete`` from the given page.
1951 Args:
1952 page: Page object to clean up.
1953 to_delete: Objects to be deleted; can be a ``ObjectDeletionFlag``
1954 or a list of ObjectDeletionFlag
1955 text_filters: Properties of text to be deleted, if applicable. Optional.
1956 This is a Python dictionary with the following properties:
1958 * font_ids: List of font resource IDs (such as /F1 or /T1_0) to be deleted.
1960 """
1961 if isinstance(to_delete, (list, tuple)):
1962 for to_d in to_delete:
1963 self.remove_objects_from_page(page, to_d)
1964 return None
1965 assert isinstance(to_delete, ObjectDeletionFlag)
1967 if to_delete & ObjectDeletionFlag.LINKS:
1968 return self._remove_annots_from_page(page, ("/Link",))
1969 if to_delete & ObjectDeletionFlag.ATTACHMENTS:
1970 return self._remove_annots_from_page(
1971 page, ("/FileAttachment", "/Sound", "/Movie", "/Screen")
1972 )
1973 if to_delete & ObjectDeletionFlag.OBJECTS_3D:
1974 return self._remove_annots_from_page(page, ("/3D",))
1975 if to_delete & ObjectDeletionFlag.ALL_ANNOTATIONS:
1976 return self._remove_annots_from_page(page, None)
1978 jump_operators = []
1979 if to_delete & ObjectDeletionFlag.DRAWING_IMAGES:
1980 jump_operators = [
1981 b"w", b"J", b"j", b"M", b"d", b"i",
1982 b"W", b"W*",
1983 b"b", b"b*", b"B", b"B*", b"S", b"s", b"f", b"f*", b"F", b"n",
1984 b"m", b"l", b"c", b"v", b"y", b"h", b"re",
1985 b"sh"
1986 ]
1987 if to_delete & ObjectDeletionFlag.TEXT:
1988 jump_operators = [b"Tj", b"TJ", b"'", b'"']
1990 if not isinstance(page, PageObject):
1991 page = PageObject(self, page.indirect_reference) # pragma: no cover
1992 if "/Contents" in page:
1993 content = cast(ContentStream, page.get_contents())
1995 images, forms = self._remove_objects_from_page__clean_forms(
1996 elt=page, stack=[], jump_operators=jump_operators, to_delete=to_delete, text_filters=text_filters,
1997 )
1999 self._remove_objects_from_page__clean(
2000 content=content, images=images, forms=forms,
2001 jump_operators=jump_operators, to_delete=to_delete,
2002 text_filters=text_filters
2003 )
2004 page.replace_contents(content)
2005 return [], [] # type: ignore[return-value]
2007 def _remove_objects_from_page__clean(
2008 self,
2009 content: ContentStream,
2010 images: list[str],
2011 forms: list[str],
2012 jump_operators: list[bytes],
2013 to_delete: ObjectDeletionFlag,
2014 text_filters: Optional[dict[str, Any]] = None,
2015 ) -> None:
2016 font_id = None
2017 font_ids_to_delete = []
2018 if text_filters and to_delete & ObjectDeletionFlag.TEXT:
2019 font_ids_to_delete = text_filters.get("font_ids", [])
2021 i = 0
2022 while i < len(content.operations):
2023 operands, operator = content.operations[i]
2024 if operator == b"Tf":
2025 font_id = operands[0]
2026 if (
2027 (
2028 operator == b"INLINE IMAGE"
2029 and (to_delete & ObjectDeletionFlag.INLINE_IMAGES)
2030 )
2031 or (operator in jump_operators)
2032 or (
2033 operator == b"Do"
2034 and (to_delete & ObjectDeletionFlag.XOBJECT_IMAGES)
2035 and (operands[0] in images)
2036 )
2037 ):
2038 if (
2039 not to_delete & ObjectDeletionFlag.TEXT
2040 or (to_delete & ObjectDeletionFlag.TEXT and not text_filters)
2041 or (to_delete & ObjectDeletionFlag.TEXT and font_id in font_ids_to_delete)
2042 ):
2043 del content.operations[i]
2044 else:
2045 i += 1
2046 else:
2047 i += 1
2048 content.get_data() # this ensures ._data is rebuilt from the .operations
2050 def _remove_objects_from_page__clean_forms(
2051 self,
2052 elt: DictionaryObject,
2053 stack: list[DictionaryObject],
2054 jump_operators: list[bytes],
2055 to_delete: ObjectDeletionFlag,
2056 text_filters: Optional[dict[str, Any]] = None,
2057 ) -> tuple[list[str], list[str]]:
2058 # elt in recursive call is a new ContentStream object, so we have to check the indirect_reference
2059 if (elt in stack) or (
2060 hasattr(elt, "indirect_reference") and any(
2061 elt.indirect_reference == getattr(x, "indirect_reference", -1)
2062 for x in stack
2063 )
2064 ):
2065 # to prevent infinite looping
2066 return [], [] # pragma: no cover
2067 try:
2068 d = cast(
2069 dict[Any, Any],
2070 cast(DictionaryObject, elt["/Resources"])["/XObject"],
2071 )
2072 except KeyError:
2073 d = {}
2074 images = []
2075 forms = []
2076 for k, v in d.items():
2077 o = v.get_object()
2078 try:
2079 content: Any = None
2080 if (
2081 to_delete & ObjectDeletionFlag.XOBJECT_IMAGES
2082 and o["/Subtype"] == "/Image"
2083 ):
2084 content = NullObject() # to delete the image keeping the entry
2085 images.append(k)
2086 if o["/Subtype"] == "/Form":
2087 forms.append(k)
2088 if isinstance(o, ContentStream):
2089 content = o
2090 else:
2091 content = ContentStream(o, self)
2092 content.update(
2093 {
2094 k1: v1
2095 for k1, v1 in o.items()
2096 if k1 not in ["/Length", "/Filter", "/DecodeParms"]
2097 }
2098 )
2099 try:
2100 content.indirect_reference = o.indirect_reference
2101 except AttributeError: # pragma: no cover
2102 pass
2103 stack.append(elt)
2105 # clean subforms
2106 self._remove_objects_from_page__clean_forms(
2107 elt=content, stack=stack, jump_operators=jump_operators, to_delete=to_delete,
2108 text_filters=text_filters,
2109 )
2110 if content is not None:
2111 if isinstance(v, IndirectObject):
2112 self._objects[v.idnum - 1] = content
2113 else:
2114 # should only occur in a PDF not respecting PDF spec
2115 # where streams must be indirected.
2116 d[k] = self._add_object(content) # pragma: no cover
2117 except (TypeError, KeyError):
2118 pass
2119 for im in images:
2120 del d[im] # for clean-up
2121 if isinstance(elt, StreamObject): # for /Form
2122 if not isinstance(elt, ContentStream): # pragma: no cover
2123 e = ContentStream(elt, self)
2124 e.update(elt.items())
2125 elt = e
2126 # clean the content
2127 self._remove_objects_from_page__clean(
2128 content=elt, images=images, forms=forms, jump_operators=jump_operators,
2129 to_delete=to_delete, text_filters=text_filters
2130 )
2131 return images, forms
2133 def remove_images(
2134 self,
2135 to_delete: ImageType = ImageType.ALL,
2136 ) -> None:
2137 """
2138 Remove images from this output.
2140 Args:
2141 to_delete: The type of images to be deleted
2142 (default = all images types)
2144 """
2145 if isinstance(to_delete, bool):
2146 to_delete = ImageType.ALL
2148 i = ObjectDeletionFlag.NONE
2150 for image in ("XOBJECT_IMAGES", "INLINE_IMAGES", "DRAWING_IMAGES"):
2151 if to_delete & ImageType[image]:
2152 i |= ObjectDeletionFlag[image]
2154 for page in self.pages:
2155 self.remove_objects_from_page(page, i)
2157 def remove_text(self, font_names: Optional[list[str]] = None) -> None:
2158 """
2159 Remove text from the PDF.
2161 Args:
2162 font_names: List of font names to remove, such as "Helvetica-Bold".
2163 Optional. If not specified, all text will be removed.
2164 """
2165 if not font_names:
2166 font_names = []
2168 for page in self.pages:
2169 resource_ids_to_remove = []
2171 # Content streams reference fonts and other resources with names like "/F1" or "/T1_0"
2172 # Font names need to be converted to resource names/IDs for easier removal
2173 if font_names:
2174 # Recursively loop through page objects to gather font info
2175 def get_font_info(
2176 obj: Any,
2177 font_info: Optional[dict[str, Any]] = None,
2178 key: Optional[str] = None
2179 ) -> dict[str, Any]:
2180 if font_info is None:
2181 font_info = {}
2182 if isinstance(obj, IndirectObject):
2183 obj = obj.get_object()
2184 if isinstance(obj, dict):
2185 if obj.get("/Type") == "/Font":
2186 font_name = obj.get("/BaseFont", "")
2187 # Normalize font names like "/RRXFFV+Palatino-Bold" to "Palatino-Bold"
2188 normalized_font_name = font_name.lstrip("/").split("+")[-1]
2189 if normalized_font_name not in font_info:
2190 font_info[normalized_font_name] = {
2191 "normalized_font_name": normalized_font_name,
2192 "resource_ids": [],
2193 }
2194 if key not in font_info[normalized_font_name]["resource_ids"]:
2195 font_info[normalized_font_name]["resource_ids"].append(key)
2196 for k in obj:
2197 font_info = get_font_info(obj[k], font_info, k)
2198 elif isinstance(obj, (list, ArrayObject)):
2199 for child_obj in obj:
2200 font_info = get_font_info(child_obj, font_info)
2201 return font_info
2203 # Add relevant resource names for removal
2204 font_info = get_font_info(page.get("/Resources"))
2205 for font_name in font_names:
2206 if font_name in font_info:
2207 resource_ids_to_remove.extend(font_info[font_name]["resource_ids"])
2209 text_filters = {}
2210 if font_names:
2211 text_filters["font_ids"] = resource_ids_to_remove
2212 self.remove_objects_from_page(page, ObjectDeletionFlag.TEXT, text_filters=text_filters)
2214 def add_uri(
2215 self,
2216 page_number: int,
2217 uri: str,
2218 rect: RectangleObject,
2219 border: Optional[ArrayObject] = None,
2220 ) -> None:
2221 """
2222 Add an URI from a rectangular area to the specified page.
2224 Args:
2225 page_number: index of the page on which to place the URI action.
2226 uri: URI of resource to link to.
2227 rect: :class:`RectangleObject<pypdf.generic.RectangleObject>` or
2228 array of four integers specifying the clickable rectangular area
2229 ``[xLL, yLL, xUR, yUR]``, or string in the form
2230 ``"[ xLL yLL xUR yUR ]"``.
2231 border: if provided, an array describing border-drawing
2232 properties. See the PDF spec for details. No border will be
2233 drawn if this argument is omitted.
2235 """
2236 page_link = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
2237 page_ref = cast(dict[str, Any], self.get_object(page_link))
2239 border_arr: BorderArrayType
2240 if border is not None:
2241 border_arr = [NumberObject(n) for n in border[:3]]
2242 if len(border) == 4:
2243 dash_pattern = ArrayObject([NumberObject(n) for n in border[3]])
2244 border_arr.append(dash_pattern)
2245 else:
2246 border_arr = [NumberObject(2), NumberObject(2), NumberObject(2)]
2248 if isinstance(rect, str):
2249 rect = NumberObject(rect)
2250 elif isinstance(rect, RectangleObject):
2251 pass
2252 else:
2253 rect = RectangleObject(rect)
2255 lnk2 = DictionaryObject()
2256 lnk2.update(
2257 {
2258 NameObject("/S"): NameObject("/URI"),
2259 NameObject("/URI"): TextStringObject(uri),
2260 }
2261 )
2262 lnk = DictionaryObject()
2263 lnk.update(
2264 {
2265 NameObject(AA.Type): NameObject("/Annot"),
2266 NameObject(AA.Subtype): NameObject("/Link"),
2267 NameObject(AA.P): page_link,
2268 NameObject(AA.Rect): rect,
2269 NameObject("/H"): NameObject("/I"),
2270 NameObject(AA.Border): ArrayObject(border_arr),
2271 NameObject("/A"): lnk2,
2272 }
2273 )
2274 lnk_ref = self._add_object(lnk)
2276 if PG.ANNOTS in page_ref:
2277 page_ref[PG.ANNOTS].append(lnk_ref)
2278 else:
2279 page_ref[NameObject(PG.ANNOTS)] = ArrayObject([lnk_ref])
2281 _valid_layouts = (
2282 "/NoLayout",
2283 "/SinglePage",
2284 "/OneColumn",
2285 "/TwoColumnLeft",
2286 "/TwoColumnRight",
2287 "/TwoPageLeft",
2288 "/TwoPageRight",
2289 )
2291 def _get_page_layout(self) -> Optional[LayoutType]:
2292 try:
2293 return cast(LayoutType, self._root_object["/PageLayout"])
2294 except KeyError:
2295 return None
2297 def _set_page_layout(self, layout: Union[NameObject, LayoutType]) -> None:
2298 """
2299 Set the page layout.
2301 Args:
2302 layout: The page layout to be used.
2304 .. list-table:: Valid ``layout`` arguments
2305 :widths: 50 200
2307 * - /NoLayout
2308 - Layout explicitly not specified
2309 * - /SinglePage
2310 - Show one page at a time
2311 * - /OneColumn
2312 - Show one column at a time
2313 * - /TwoColumnLeft
2314 - Show pages in two columns, odd-numbered pages on the left
2315 * - /TwoColumnRight
2316 - Show pages in two columns, odd-numbered pages on the right
2317 * - /TwoPageLeft
2318 - Show two pages at a time, odd-numbered pages on the left
2319 * - /TwoPageRight
2320 - Show two pages at a time, odd-numbered pages on the right
2322 """
2323 if not isinstance(layout, NameObject):
2324 if layout not in self._valid_layouts:
2325 logger_warning(
2326 f"Layout should be one of: {'', ''.join(self._valid_layouts)}",
2327 __name__,
2328 )
2329 layout = NameObject(layout)
2330 self._root_object.update({NameObject("/PageLayout"): layout})
2332 def set_page_layout(self, layout: LayoutType) -> None:
2333 """
2334 Set the page layout.
2336 Args:
2337 layout: The page layout to be used
2339 .. list-table:: Valid ``layout`` arguments
2340 :widths: 50 200
2342 * - /NoLayout
2343 - Layout explicitly not specified
2344 * - /SinglePage
2345 - Show one page at a time
2346 * - /OneColumn
2347 - Show one column at a time
2348 * - /TwoColumnLeft
2349 - Show pages in two columns, odd-numbered pages on the left
2350 * - /TwoColumnRight
2351 - Show pages in two columns, odd-numbered pages on the right
2352 * - /TwoPageLeft
2353 - Show two pages at a time, odd-numbered pages on the left
2354 * - /TwoPageRight
2355 - Show two pages at a time, odd-numbered pages on the right
2357 """
2358 self._set_page_layout(layout)
2360 @property
2361 def page_layout(self) -> Optional[LayoutType]:
2362 """
2363 Page layout property.
2365 .. list-table:: Valid ``layout`` values
2366 :widths: 50 200
2368 * - /NoLayout
2369 - Layout explicitly not specified
2370 * - /SinglePage
2371 - Show one page at a time
2372 * - /OneColumn
2373 - Show one column at a time
2374 * - /TwoColumnLeft
2375 - Show pages in two columns, odd-numbered pages on the left
2376 * - /TwoColumnRight
2377 - Show pages in two columns, odd-numbered pages on the right
2378 * - /TwoPageLeft
2379 - Show two pages at a time, odd-numbered pages on the left
2380 * - /TwoPageRight
2381 - Show two pages at a time, odd-numbered pages on the right
2382 """
2383 return self._get_page_layout()
2385 @page_layout.setter
2386 def page_layout(self, layout: LayoutType) -> None:
2387 self._set_page_layout(layout)
2389 _valid_modes = (
2390 "/UseNone",
2391 "/UseOutlines",
2392 "/UseThumbs",
2393 "/FullScreen",
2394 "/UseOC",
2395 "/UseAttachments",
2396 )
2398 def _get_page_mode(self) -> Optional[PagemodeType]:
2399 try:
2400 return cast(PagemodeType, self._root_object["/PageMode"])
2401 except KeyError:
2402 return None
2404 @property
2405 def page_mode(self) -> Optional[PagemodeType]:
2406 """
2407 Page mode property.
2409 .. list-table:: Valid ``mode`` values
2410 :widths: 50 200
2412 * - /UseNone
2413 - Do not show outline or thumbnails panels
2414 * - /UseOutlines
2415 - Show outline (aka bookmarks) panel
2416 * - /UseThumbs
2417 - Show page thumbnails panel
2418 * - /FullScreen
2419 - Fullscreen view
2420 * - /UseOC
2421 - Show Optional Content Group (OCG) panel
2422 * - /UseAttachments
2423 - Show attachments panel
2424 """
2425 return self._get_page_mode()
2427 @page_mode.setter
2428 def page_mode(self, mode: PagemodeType) -> None:
2429 if isinstance(mode, NameObject):
2430 mode_name: NameObject = mode
2431 else:
2432 if mode not in self._valid_modes:
2433 logger_warning(
2434 f"Mode should be one of: {', '.join(self._valid_modes)}", __name__
2435 )
2436 mode_name = NameObject(mode)
2437 self._root_object.update({NameObject("/PageMode"): mode_name})
2439 def add_annotation(
2440 self,
2441 page_number: Union[int, PageObject],
2442 annotation: dict[str, Any],
2443 ) -> DictionaryObject:
2444 """
2445 Add a single annotation to the page.
2446 The added annotation must be a new annotation.
2447 It cannot be recycled.
2449 Args:
2450 page_number: PageObject or page index.
2451 annotation: Annotation to be added (created with annotation).
2453 Returns:
2454 The inserted object.
2455 This can be used for popup creation, for example.
2457 """
2458 page = page_number
2459 if isinstance(page, int):
2460 page = self.pages[page]
2461 elif not isinstance(page, PageObject):
2462 raise TypeError("page: invalid type")
2464 to_add = cast(DictionaryObject, _pdf_objectify(annotation))
2465 to_add[NameObject("/P")] = page.indirect_reference
2467 if page.annotations is None:
2468 page[NameObject("/Annots")] = ArrayObject()
2469 assert page.annotations is not None
2471 # Internal link annotations need the correct object type for the
2472 # destination
2473 if to_add.get("/Subtype") == "/Link" and "/Dest" in to_add:
2474 tmp = cast(dict[Any, Any], to_add[NameObject("/Dest")])
2475 dest = Destination(
2476 NameObject("/LinkName"),
2477 tmp["target_page_index"],
2478 Fit(
2479 fit_type=tmp["fit"], fit_args=dict(tmp)["fit_args"]
2480 ), # I have no clue why this dict-hack is necessary
2481 )
2482 to_add[NameObject("/Dest")] = dest.dest_array
2484 page.annotations.append(self._add_object(to_add))
2486 if to_add.get("/Subtype") == "/Popup" and NameObject("/Parent") in to_add:
2487 cast(DictionaryObject, to_add["/Parent"].get_object())[
2488 NameObject("/Popup")
2489 ] = to_add.indirect_reference
2491 return to_add
2493 def clean_page(self, page: Union[PageObject, IndirectObject]) -> PageObject:
2494 """
2495 Perform some clean up in the page.
2496 Currently: convert NameObject named destination to TextStringObject
2497 (required for names/dests list)
2499 Args:
2500 page:
2502 Returns:
2503 The cleaned PageObject
2505 """
2506 page = cast("PageObject", page.get_object())
2507 for a in page.get("/Annots", []):
2508 a_obj = a.get_object()
2509 d = a_obj.get("/Dest", None)
2510 act = a_obj.get("/A", None)
2511 if isinstance(d, NameObject):
2512 a_obj[NameObject("/Dest")] = TextStringObject(d)
2513 elif act is not None:
2514 act = act.get_object()
2515 d = act.get("/D", None)
2516 if isinstance(d, NameObject):
2517 act[NameObject("/D")] = TextStringObject(d)
2518 return page
2520 def _create_stream(
2521 self, fileobj: Union[Path, StrByteType, PdfReader]
2522 ) -> tuple[IOBase, Optional[Encryption]]:
2523 # If the fileobj parameter is a string, assume it is a path
2524 # and create a file object at that location. If it is a file,
2525 # copy the file's contents into a BytesIO stream object; if
2526 # it is a PdfReader, copy that reader's stream into a
2527 # BytesIO stream.
2528 # If fileobj is none of the above types, it is not modified
2529 encryption_obj = None
2530 stream: IOBase
2531 if isinstance(fileobj, (str, Path)):
2532 with FileIO(fileobj, "rb") as f:
2533 stream = BytesIO(f.read())
2534 elif isinstance(fileobj, PdfReader):
2535 if fileobj._encryption:
2536 encryption_obj = fileobj._encryption
2537 orig_tell = fileobj.stream.tell()
2538 fileobj.stream.seek(0)
2539 stream = BytesIO(fileobj.stream.read())
2541 # reset the stream to its original location
2542 fileobj.stream.seek(orig_tell)
2543 elif hasattr(fileobj, "seek") and hasattr(fileobj, "read"):
2544 fileobj.seek(0)
2545 filecontent = fileobj.read()
2546 stream = BytesIO(filecontent)
2547 else:
2548 raise NotImplementedError(
2549 "Merging requires an object that PdfReader can parse. "
2550 "Typically, that is a Path or a string representing a Path, "
2551 "a file object, or an object implementing .seek and .read. "
2552 "Passing a PdfReader directly works as well."
2553 )
2554 return stream, encryption_obj
2556 def append(
2557 self,
2558 fileobj: Union[StrByteType, PdfReader, Path],
2559 outline_item: Union[
2560 str, None, PageRange, tuple[int, int], tuple[int, int, int], list[int]
2561 ] = None,
2562 pages: Union[
2563 None,
2564 PageRange,
2565 tuple[int, int],
2566 tuple[int, int, int],
2567 list[int],
2568 list[PageObject],
2569 ] = None,
2570 import_outline: bool = True,
2571 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = None,
2572 ) -> None:
2573 """
2574 Identical to the :meth:`merge()<merge>` method, but assumes you want to
2575 concatenate all pages onto the end of the file instead of specifying a
2576 position.
2578 Args:
2579 fileobj: A File Object or an object that supports the standard
2580 read and seek methods similar to a File Object. Could also be a
2581 string representing a path to a PDF file.
2582 outline_item: Optionally, you may specify a string to build an
2583 outline (aka 'bookmark') to identify the beginning of the
2584 included file.
2585 pages: Can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2586 or a ``(start, stop[, step])`` tuple
2587 or a list of pages to be processed
2588 to merge only the specified range of pages from the source
2589 document into the output document.
2590 import_outline: You may prevent the source document's
2591 outline (collection of outline items, previously referred to as
2592 'bookmarks') from being imported by specifying this as ``False``.
2593 excluded_fields: Provide the list of fields/keys to be ignored
2594 if ``/Annots`` is part of the list, the annotation will be ignored
2595 if ``/B`` is part of the list, the articles will be ignored
2597 """
2598 if excluded_fields is None:
2599 excluded_fields = ()
2600 if isinstance(outline_item, (tuple, list, PageRange)):
2601 if isinstance(pages, bool):
2602 if not isinstance(import_outline, bool):
2603 excluded_fields = import_outline
2604 import_outline = pages
2605 pages = outline_item
2606 self.merge(
2607 None,
2608 fileobj,
2609 None,
2610 pages,
2611 import_outline,
2612 excluded_fields,
2613 )
2614 else: # if isinstance(outline_item, str):
2615 self.merge(
2616 None,
2617 fileobj,
2618 outline_item,
2619 pages,
2620 import_outline,
2621 excluded_fields,
2622 )
2624 def merge(
2625 self,
2626 position: Optional[int],
2627 fileobj: Union[Path, StrByteType, PdfReader],
2628 outline_item: Optional[str] = None,
2629 pages: Optional[Union[PageRangeSpec, list[PageObject]]] = None,
2630 import_outline: bool = True,
2631 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = (),
2632 ) -> None:
2633 """
2634 Merge the pages from the given file into the output file at the
2635 specified page number.
2637 Args:
2638 position: The *page number* to insert this file. File will
2639 be inserted after the given number.
2640 fileobj: A File Object or an object that supports the standard
2641 read and seek methods similar to a File Object. Could also be a
2642 string representing a path to a PDF file.
2643 outline_item: Optionally, you may specify a string to build an outline
2644 (aka 'bookmark') to identify the
2645 beginning of the included file.
2646 pages: can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2647 or a ``(start, stop[, step])`` tuple
2648 or a list of pages to be processed
2649 to merge only the specified range of pages from the source
2650 document into the output document.
2651 import_outline: You may prevent the source document's
2652 outline (collection of outline items, previously referred to as
2653 'bookmarks') from being imported by specifying this as ``False``.
2654 excluded_fields: provide the list of fields/keys to be ignored
2655 if ``/Annots`` is part of the list, the annotation will be ignored
2656 if ``/B`` is part of the list, the articles will be ignored
2658 Raises:
2659 TypeError: The pages attribute is not configured properly
2661 """
2662 if isinstance(fileobj, PdfDocCommon):
2663 reader = fileobj
2664 else:
2665 stream, _encryption_obj = self._create_stream(fileobj)
2666 # Create a new PdfReader instance using the stream
2667 # (either file or BytesIO or StringIO) created above
2668 reader = PdfReader(stream, strict=False) # type: ignore[arg-type]
2670 if excluded_fields is None:
2671 excluded_fields = ()
2672 # Find the range of pages to merge.
2673 if pages is None:
2674 pages = list(range(len(reader.pages)))
2675 elif isinstance(pages, PageRange):
2676 pages = list(range(*pages.indices(len(reader.pages))))
2677 elif isinstance(pages, list):
2678 pass # keep unchanged
2679 elif isinstance(pages, tuple) and len(pages) <= 3:
2680 pages = list(range(*pages))
2681 elif not isinstance(pages, tuple):
2682 raise TypeError(
2683 '"pages" must be a tuple of (start, stop[, step]) or a list'
2684 )
2686 srcpages = {}
2687 for page in pages:
2688 if isinstance(page, PageObject):
2689 pg = page
2690 else:
2691 pg = reader.pages[page]
2692 assert pg.indirect_reference is not None
2693 if position is None:
2694 # numbers in the exclude list identifies that the exclusion is
2695 # only applicable to 1st level of cloning
2696 srcpages[pg.indirect_reference.idnum] = self.add_page(
2697 pg, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2698 )
2699 else:
2700 srcpages[pg.indirect_reference.idnum] = self.insert_page(
2701 pg, position, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2702 )
2703 position += 1
2704 srcpages[pg.indirect_reference.idnum].original_page = pg
2706 reader._named_destinations = (
2707 reader.named_destinations
2708 ) # need for the outline processing below
2710 arr: Any
2712 for dest in reader._named_destinations.values():
2713 self._merge__process_named_dests(dest=dest, reader=reader, srcpages=srcpages)
2715 outline_item_typ: TreeObject
2716 if outline_item is not None:
2717 outline_item_typ = cast(
2718 "TreeObject",
2719 self.add_outline_item(
2720 TextStringObject(outline_item),
2721 next(iter(srcpages.values())).indirect_reference,
2722 fit=PAGE_FIT,
2723 ).get_object(),
2724 )
2725 else:
2726 outline_item_typ = self.get_outline_root()
2728 _ro = reader.root_object
2729 if import_outline and CO.OUTLINES in _ro:
2730 outline = self._get_filtered_outline(
2731 _ro.get(CO.OUTLINES, None), srcpages, reader
2732 )
2733 self._insert_filtered_outline(
2734 outline, outline_item_typ, None
2735 ) # TODO: use before parameter
2737 if "/Annots" not in excluded_fields:
2738 for pag in srcpages.values():
2739 lst = self._insert_filtered_annotations(
2740 pag.original_page.get("/Annots", []), pag, srcpages, reader
2741 )
2742 if len(lst) > 0:
2743 pag[NameObject("/Annots")] = lst
2744 self.clean_page(pag)
2746 if "/AcroForm" in _ro and not is_null_or_none(_ro["/AcroForm"]):
2747 if "/AcroForm" not in self._root_object:
2748 self._root_object[NameObject("/AcroForm")] = self._add_object(
2749 cast(
2750 DictionaryObject,
2751 reader.root_object["/AcroForm"],
2752 ).clone(self, False, ("/Fields",))
2753 )
2754 arr = ArrayObject()
2755 else:
2756 arr = cast(
2757 ArrayObject,
2758 cast(DictionaryObject, self._root_object["/AcroForm"])["/Fields"],
2759 )
2760 trslat = self._id_translated[id(reader)]
2761 try:
2762 for f in reader.root_object["/AcroForm"]["/Fields"]: # type: ignore
2763 try:
2764 ind = IndirectObject(trslat[f.idnum], 0, self)
2765 if ind not in arr:
2766 arr.append(ind)
2767 except KeyError:
2768 # for trslat[] which mean the field has not be copied
2769 # through the page
2770 pass
2771 except KeyError: # for /Acroform or /Fields are not existing
2772 arr = self._add_object(ArrayObject())
2773 cast(DictionaryObject, self._root_object["/AcroForm"])[
2774 NameObject("/Fields")
2775 ] = arr
2777 if "/B" not in excluded_fields:
2778 self.add_filtered_articles("", srcpages, reader)
2780 def _merge__process_named_dests(self, dest: Any, reader: PdfDocCommon, srcpages: dict[int, PageObject]) -> None:
2781 arr: Any = dest.dest_array
2782 if "/Names" in self._root_object and dest["/Title"] in cast(
2783 list[Any],
2784 cast(
2785 DictionaryObject,
2786 cast(DictionaryObject, self._root_object["/Names"]).get("/Dests", DictionaryObject()),
2787 ).get("/Names", DictionaryObject()),
2788 ):
2789 # already exists: should not duplicate it
2790 pass
2791 elif dest["/Page"] is None or isinstance(dest["/Page"], NullObject):
2792 pass
2793 elif isinstance(dest["/Page"], int):
2794 # the page reference is a page number normally not a PDF Reference
2795 # page numbers as int are normally accepted only in external goto
2796 try:
2797 p = reader.pages[dest["/Page"]]
2798 except IndexError:
2799 return
2800 assert p.indirect_reference is not None
2801 try:
2802 arr[NumberObject(0)] = NumberObject(
2803 srcpages[p.indirect_reference.idnum].page_number
2804 )
2805 self.add_named_destination_array(dest["/Title"], arr)
2806 except KeyError:
2807 pass
2808 elif dest["/Page"].indirect_reference.idnum in srcpages:
2809 arr[NumberObject(0)] = srcpages[
2810 dest["/Page"].indirect_reference.idnum
2811 ].indirect_reference
2812 self.add_named_destination_array(dest["/Title"], arr)
2814 def _add_articles_thread(
2815 self,
2816 thread: DictionaryObject, # thread entry from the reader's array of threads
2817 pages: dict[int, PageObject],
2818 reader: PdfReader,
2819 ) -> IndirectObject:
2820 """
2821 Clone the thread with only the applicable articles.
2823 Args:
2824 thread:
2825 pages:
2826 reader:
2828 Returns:
2829 The added thread as an indirect reference
2831 """
2832 nthread = thread.clone(
2833 self, force_duplicate=True, ignore_fields=("/F",)
2834 ) # use of clone to keep link between reader and writer
2835 self.threads.append(nthread.indirect_reference)
2836 first_article = cast("DictionaryObject", thread["/F"])
2837 current_article: Optional[DictionaryObject] = first_article
2838 new_article: Optional[DictionaryObject] = None
2839 while current_article is not None:
2840 pag = self._get_cloned_page(
2841 cast("PageObject", current_article["/P"]), pages, reader
2842 )
2843 if pag is not None:
2844 if new_article is None:
2845 new_article = cast(
2846 "DictionaryObject",
2847 self._add_object(DictionaryObject()).get_object(),
2848 )
2849 new_first = new_article
2850 nthread[NameObject("/F")] = new_article.indirect_reference
2851 else:
2852 new_article2 = cast(
2853 "DictionaryObject",
2854 self._add_object(
2855 DictionaryObject(
2856 {NameObject("/V"): new_article.indirect_reference}
2857 )
2858 ).get_object(),
2859 )
2860 new_article[NameObject("/N")] = new_article2.indirect_reference
2861 new_article = new_article2
2862 new_article[NameObject("/P")] = pag
2863 new_article[NameObject("/T")] = nthread.indirect_reference
2864 new_article[NameObject("/R")] = current_article["/R"]
2865 pag_obj = cast("PageObject", pag.get_object())
2866 if "/B" not in pag_obj:
2867 pag_obj[NameObject("/B")] = ArrayObject()
2868 cast("ArrayObject", pag_obj["/B"]).append(
2869 new_article.indirect_reference
2870 )
2871 current_article = cast("DictionaryObject", current_article["/N"])
2872 if current_article == first_article:
2873 new_article[NameObject("/N")] = new_first.indirect_reference # type: ignore
2874 new_first[NameObject("/V")] = new_article.indirect_reference # type: ignore
2875 current_article = None
2876 assert nthread.indirect_reference is not None
2877 return nthread.indirect_reference
2879 def add_filtered_articles(
2880 self,
2881 fltr: Union[
2882 Pattern[Any], str
2883 ], # thread entry from the reader's array of threads
2884 pages: dict[int, PageObject],
2885 reader: PdfReader,
2886 ) -> None:
2887 """
2888 Add articles matching the defined criteria.
2890 Args:
2891 fltr:
2892 pages:
2893 reader:
2895 """
2896 if isinstance(fltr, str):
2897 fltr = re.compile(fltr)
2898 elif not isinstance(fltr, Pattern):
2899 fltr = re.compile("")
2900 for p in pages.values():
2901 pp = p.original_page
2902 for a in pp.get("/B", ()):
2903 a_obj = a.get_object()
2904 if is_null_or_none(a_obj):
2905 continue
2906 thr = a_obj.get("/T")
2907 if thr is None:
2908 continue
2909 thr = thr.get_object()
2910 if thr.indirect_reference.idnum not in self._id_translated[
2911 id(reader)
2912 ] and fltr.search((thr.get("/I", {})).get("/Title", "")):
2913 self._add_articles_thread(thr, pages, reader)
2915 def _get_cloned_page(
2916 self,
2917 page: Union[None, IndirectObject, PageObject, NullObject],
2918 pages: dict[int, PageObject],
2919 reader: PdfReader,
2920 ) -> Optional[IndirectObject]:
2921 if isinstance(page, NullObject):
2922 return None
2923 if isinstance(page, DictionaryObject) and page.get("/Type", "") == "/Page":
2924 _i = page.indirect_reference
2925 elif isinstance(page, IndirectObject):
2926 _i = page
2927 try:
2928 return pages[_i.idnum].indirect_reference # type: ignore
2929 except Exception:
2930 return None
2932 def _insert_filtered_annotations(
2933 self,
2934 annots: Union[IndirectObject, list[DictionaryObject], None],
2935 page: PageObject,
2936 pages: dict[int, PageObject],
2937 reader: PdfReader,
2938 ) -> list[Destination]:
2939 outlist = ArrayObject()
2940 if isinstance(annots, IndirectObject):
2941 annots = cast("list[Any]", annots.get_object())
2942 if annots is None:
2943 return outlist
2944 if not isinstance(annots, list):
2945 logger_warning(f"Expected list of annotations, got {annots} of type {annots.__class__.__name__}.", __name__)
2946 return outlist
2947 for an in annots:
2948 ano = cast("DictionaryObject", an.get_object())
2949 if (
2950 ano["/Subtype"] != "/Link"
2951 or "/A" not in ano
2952 or cast("DictionaryObject", ano["/A"])["/S"] != "/GoTo"
2953 or "/Dest" in ano
2954 ):
2955 if "/Dest" not in ano:
2956 outlist.append(self._add_object(ano.clone(self)))
2957 else:
2958 d = ano["/Dest"]
2959 if isinstance(d, str):
2960 # it is a named dest
2961 if str(d) in self.get_named_dest_root():
2962 outlist.append(ano.clone(self).indirect_reference)
2963 else:
2964 d = cast("ArrayObject", d)
2965 p = self._get_cloned_page(d[0], pages, reader)
2966 if p is not None:
2967 anc = ano.clone(self, ignore_fields=("/Dest",))
2968 anc[NameObject("/Dest")] = ArrayObject([p, *d[1:]])
2969 outlist.append(self._add_object(anc))
2970 else:
2971 d = cast("DictionaryObject", ano["/A"]).get("/D", NullObject())
2972 if is_null_or_none(d):
2973 continue
2974 if isinstance(d, str):
2975 # it is a named dest
2976 if str(d) in self.get_named_dest_root():
2977 outlist.append(ano.clone(self).indirect_reference)
2978 else:
2979 d = cast("ArrayObject", d)
2980 p = self._get_cloned_page(d[0], pages, reader)
2981 if p is not None:
2982 anc = ano.clone(self, ignore_fields=("/D",))
2983 cast("DictionaryObject", anc["/A"])[
2984 NameObject("/D")
2985 ] = ArrayObject([p, *d[1:]])
2986 outlist.append(self._add_object(anc))
2987 return outlist
2989 def _get_filtered_outline(
2990 self,
2991 node: Any,
2992 pages: dict[int, PageObject],
2993 reader: PdfReader,
2994 ) -> list[Destination]:
2995 """
2996 Extract outline item entries that are part of the specified page set.
2998 Args:
2999 node:
3000 pages:
3001 reader:
3003 Returns:
3004 A list of destination objects.
3006 """
3007 new_outline = []
3008 if node is None:
3009 node = NullObject()
3010 node = node.get_object()
3011 if is_null_or_none(node):
3012 node = DictionaryObject()
3013 if node.get("/Type", "") == "/Outlines" or "/Title" not in node:
3014 node = node.get("/First", None)
3015 if node is not None:
3016 node = node.get_object()
3017 new_outline += self._get_filtered_outline(node, pages, reader)
3018 else:
3019 v: Union[None, IndirectObject, NullObject]
3020 while node is not None:
3021 node = node.get_object()
3022 o = cast("Destination", reader._build_outline_item(node))
3023 v = self._get_cloned_page(cast("PageObject", o["/Page"]), pages, reader)
3024 if v is None:
3025 v = NullObject()
3026 o[NameObject("/Page")] = v
3027 if "/First" in node:
3028 o._filtered_children = self._get_filtered_outline(
3029 node["/First"], pages, reader
3030 )
3031 else:
3032 o._filtered_children = []
3033 if (
3034 not isinstance(o["/Page"], NullObject)
3035 or len(o._filtered_children) > 0
3036 ):
3037 new_outline.append(o)
3038 node = node.get("/Next", None)
3039 return new_outline
3041 def _clone_outline(self, dest: Destination) -> TreeObject:
3042 n_ol = TreeObject()
3043 self._add_object(n_ol)
3044 n_ol[NameObject("/Title")] = TextStringObject(dest["/Title"])
3045 if not isinstance(dest["/Page"], NullObject):
3046 if dest.node is not None and "/A" in dest.node:
3047 n_ol[NameObject("/A")] = dest.node["/A"].clone(self)
3048 else:
3049 n_ol[NameObject("/Dest")] = dest.dest_array
3050 # TODO: /SE
3051 if dest.node is not None:
3052 n_ol[NameObject("/F")] = NumberObject(dest.node.get("/F", 0))
3053 n_ol[NameObject("/C")] = ArrayObject(
3054 dest.node.get(
3055 "/C", [FloatObject(0.0), FloatObject(0.0), FloatObject(0.0)]
3056 )
3057 )
3058 return n_ol
3060 def _insert_filtered_outline(
3061 self,
3062 outlines: list[Destination],
3063 parent: Union[TreeObject, IndirectObject],
3064 before: Union[None, TreeObject, IndirectObject] = None,
3065 ) -> None:
3066 for dest in outlines:
3067 # TODO: can be improved to keep A and SE entries (ignored for the moment)
3068 # with np=self.add_outline_item_destination(dest,parent,before)
3069 if dest.get("/Type", "") == "/Outlines" or "/Title" not in dest:
3070 np = parent
3071 else:
3072 np = self._clone_outline(dest)
3073 cast(TreeObject, parent.get_object()).insert_child(np, before, self)
3074 self._insert_filtered_outline(dest._filtered_children, np, None)
3076 def close(self) -> None:
3077 """Implemented for API harmonization."""
3078 return
3080 def find_outline_item(
3081 self,
3082 outline_item: dict[str, Any],
3083 root: Optional[OutlineType] = None,
3084 ) -> Optional[list[int]]:
3085 if root is None:
3086 o = self.get_outline_root()
3087 else:
3088 o = cast("TreeObject", root)
3090 i = 0
3091 while o is not None:
3092 if (
3093 o.indirect_reference == outline_item
3094 or o.get("/Title", None) == outline_item
3095 ):
3096 return [i]
3097 if "/First" in o:
3098 res = self.find_outline_item(
3099 outline_item, cast(OutlineType, o["/First"])
3100 )
3101 if res:
3102 return ([i] if "/Title" in o else []) + res
3103 if "/Next" in o:
3104 i += 1
3105 o = cast(TreeObject, o["/Next"])
3106 else:
3107 return None
3108 raise PyPdfError("This line is theoretically unreachable.") # pragma: no cover
3110 def reset_translation(
3111 self, reader: Union[None, PdfReader, IndirectObject] = None
3112 ) -> None:
3113 """
3114 Reset the translation table between reader and the writer object.
3116 Late cloning will create new independent objects.
3118 Args:
3119 reader: PdfReader or IndirectObject referencing a PdfReader object.
3120 if set to None or omitted, all tables will be reset.
3122 """
3123 if reader is None:
3124 self._id_translated = {}
3125 elif isinstance(reader, PdfReader):
3126 try:
3127 del self._id_translated[id(reader)]
3128 except Exception:
3129 pass
3130 elif isinstance(reader, IndirectObject):
3131 try:
3132 del self._id_translated[id(reader.pdf)]
3133 except Exception:
3134 pass
3135 else:
3136 raise Exception("invalid parameter {reader}")
3138 def set_page_label(
3139 self,
3140 page_index_from: int,
3141 page_index_to: int,
3142 style: Optional[PageLabelStyle] = None,
3143 prefix: Optional[str] = None,
3144 start: Optional[int] = 0,
3145 ) -> None:
3146 """
3147 Set a page label to a range of pages.
3149 Page indexes must be given starting from 0.
3150 Labels must have a style, a prefix or both.
3151 If a range is not assigned any page label, a decimal label starting from 1 is applied.
3153 Args:
3154 page_index_from: page index of the beginning of the range starting from 0
3155 page_index_to: page index of the beginning of the range starting from 0
3156 style: The numbering style to be used for the numeric portion of each page label:
3158 * ``/D`` Decimal Arabic numerals
3159 * ``/R`` Uppercase Roman numerals
3160 * ``/r`` Lowercase Roman numerals
3161 * ``/A`` Uppercase letters (A to Z for the first 26 pages,
3162 AA to ZZ for the next 26, and so on)
3163 * ``/a`` Lowercase letters (a to z for the first 26 pages,
3164 aa to zz for the next 26, and so on)
3166 prefix: The label prefix for page labels in this range.
3167 start: The value of the numeric portion for the first page label
3168 in the range.
3169 Subsequent pages are numbered sequentially from this value,
3170 which must be greater than or equal to 1.
3171 Default value: 1.
3173 """
3174 if style is None and prefix is None:
3175 raise ValueError("At least one of style and prefix must be given")
3176 if page_index_from < 0:
3177 raise ValueError("page_index_from must be greater or equal than 0")
3178 if page_index_to < page_index_from:
3179 raise ValueError(
3180 "page_index_to must be greater or equal than page_index_from"
3181 )
3182 if page_index_to >= len(self.pages):
3183 raise ValueError("page_index_to exceeds number of pages")
3184 if start is not None and start != 0 and start < 1:
3185 raise ValueError("If given, start must be greater or equal than one")
3187 self._set_page_label(page_index_from, page_index_to, style, prefix, start)
3189 def _set_page_label(
3190 self,
3191 page_index_from: int,
3192 page_index_to: int,
3193 style: Optional[PageLabelStyle] = None,
3194 prefix: Optional[str] = None,
3195 start: Optional[int] = 0,
3196 ) -> None:
3197 """
3198 Set a page label to a range of pages.
3200 Page indexes must be given starting from 0.
3201 Labels must have a style, a prefix or both.
3202 If a range is not assigned any page label a decimal label starting from 1 is applied.
3204 Args:
3205 page_index_from: page index of the beginning of the range starting from 0
3206 page_index_to: page index of the beginning of the range starting from 0
3207 style: The numbering style to be used for the numeric portion of each page label:
3208 /D Decimal Arabic numerals
3209 /R Uppercase Roman numerals
3210 /r Lowercase Roman numerals
3211 /A Uppercase letters (A to Z for the first 26 pages,
3212 AA to ZZ for the next 26, and so on)
3213 /a Lowercase letters (a to z for the first 26 pages,
3214 aa to zz for the next 26, and so on)
3215 prefix: The label prefix for page labels in this range.
3216 start: The value of the numeric portion for the first page label
3217 in the range.
3218 Subsequent pages are numbered sequentially from this value,
3219 which must be greater than or equal to 1. Default value: 1.
3221 """
3222 default_page_label = DictionaryObject()
3223 default_page_label[NameObject("/S")] = NameObject("/D")
3225 new_page_label = DictionaryObject()
3226 if style is not None:
3227 new_page_label[NameObject("/S")] = NameObject(style)
3228 if prefix is not None:
3229 new_page_label[NameObject("/P")] = TextStringObject(prefix)
3230 if start != 0:
3231 new_page_label[NameObject("/St")] = NumberObject(start)
3233 if NameObject(CatalogDictionary.PAGE_LABELS) not in self._root_object:
3234 nums = ArrayObject()
3235 nums_insert(NumberObject(0), default_page_label, nums)
3236 page_labels = TreeObject()
3237 page_labels[NameObject("/Nums")] = nums
3238 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3240 page_labels = cast(
3241 TreeObject, self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)]
3242 )
3243 nums = cast(ArrayObject, page_labels[NameObject("/Nums")])
3245 nums_insert(NumberObject(page_index_from), new_page_label, nums)
3246 nums_clear_range(NumberObject(page_index_from), page_index_to, nums)
3247 next_label_pos, *_ = nums_next(NumberObject(page_index_from), nums)
3248 if next_label_pos != page_index_to + 1 and page_index_to + 1 < len(self.pages):
3249 nums_insert(NumberObject(page_index_to + 1), default_page_label, nums)
3251 page_labels[NameObject("/Nums")] = nums
3252 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3254 def _repr_mimebundle_(
3255 self,
3256 include: Union[None, Iterable[str]] = None,
3257 exclude: Union[None, Iterable[str]] = None,
3258 ) -> dict[str, Any]:
3259 """
3260 Integration into Jupyter Notebooks.
3262 This method returns a dictionary that maps a mime-type to its
3263 representation.
3265 .. seealso::
3267 https://ipython.readthedocs.io/en/stable/config/integrating.html
3268 """
3269 pdf_data = BytesIO()
3270 self.write(pdf_data)
3271 data = {
3272 "application/pdf": pdf_data,
3273 }
3275 if include is not None:
3276 # Filter representations based on include list
3277 data = {k: v for k, v in data.items() if k in include}
3279 if exclude is not None:
3280 # Remove representations based on exclude list
3281 data = {k: v for k, v in data.items() if k not in exclude}
3283 return data
3286def _pdf_objectify(obj: Union[dict[str, Any], str, float, list[Any]]) -> PdfObject:
3287 if isinstance(obj, PdfObject):
3288 return obj
3289 if isinstance(obj, dict):
3290 to_add = DictionaryObject()
3291 for key, value in obj.items():
3292 to_add[NameObject(key)] = _pdf_objectify(value)
3293 return to_add
3294 if isinstance(obj, str):
3295 if obj.startswith("/"):
3296 return NameObject(obj)
3297 return TextStringObject(obj)
3298 if isinstance(obj, (float, int)):
3299 return FloatObject(obj)
3300 if isinstance(obj, list):
3301 return ArrayObject(_pdf_objectify(i) for i in obj)
3302 raise NotImplementedError(
3303 f"{type(obj)=} could not be cast to a PdfObject"
3304 )
3307def _create_outline_item(
3308 action_ref: Union[None, IndirectObject],
3309 title: str,
3310 color: Union[tuple[float, float, float], str, None],
3311 italic: bool,
3312 bold: bool,
3313) -> TreeObject:
3314 outline_item = TreeObject()
3315 if action_ref is not None:
3316 outline_item[NameObject("/A")] = action_ref
3317 outline_item.update(
3318 {
3319 NameObject("/Title"): create_string_object(title),
3320 }
3321 )
3322 if color:
3323 if isinstance(color, str):
3324 color = hex_to_rgb(color)
3325 outline_item.update(
3326 {NameObject("/C"): ArrayObject([FloatObject(c) for c in color])}
3327 )
3328 if italic or bold:
3329 format_flag = 0
3330 if italic:
3331 format_flag += OutlineFontFlag.italic
3332 if bold:
3333 format_flag += OutlineFontFlag.bold
3334 outline_item.update({NameObject("/F"): NumberObject(format_flag)})
3335 return outline_item