Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_writer.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com>
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15# * The name of the author may not be used to endorse or promote products
16# derived from this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
30import decimal
31import enum
32import hashlib
33import re
34import struct
35import sys
36import uuid
37from collections.abc import Iterable, Mapping
38from io import BytesIO, FileIO, IOBase
39from itertools import compress
40from pathlib import Path
41from re import Pattern
42from types import TracebackType
43from typing import (
44 IO,
45 Any,
46 Callable,
47 Optional,
48 Union,
49 cast,
50)
52if sys.version_info >= (3, 11):
53 from typing import Self
54else:
55 from typing_extensions import Self
57from ._doc_common import DocumentInformation, PdfDocCommon
58from ._encryption import EncryptAlgorithm, Encryption
59from ._page import PageObject, Transformation
60from ._page_labels import nums_clear_range, nums_insert, nums_next
61from ._reader import PdfReader
62from ._utils import (
63 StrByteType,
64 StreamType,
65 _get_max_pdf_version_header,
66 deprecation_no_replacement,
67 logger_warning,
68)
69from .constants import AnnotationDictionaryAttributes as AA
70from .constants import CatalogAttributes as CA
71from .constants import (
72 CatalogDictionary,
73 GoToActionArguments,
74 ImageType,
75 InteractiveFormDictEntries,
76 OutlineFontFlag,
77 PageLabelStyle,
78 PagesAttributes,
79 TypFitArguments,
80 UserAccessPermissions,
81)
82from .constants import Core as CO
83from .constants import FieldDictionaryAttributes as FA
84from .constants import PageAttributes as PG
85from .constants import TrailerKeys as TK
86from .errors import PdfReadError, PyPdfError
87from .generic import (
88 PAGE_FIT,
89 ArrayObject,
90 BooleanObject,
91 ByteStringObject,
92 ContentStream,
93 Destination,
94 DictionaryObject,
95 EmbeddedFile,
96 Fit,
97 FloatObject,
98 IndirectObject,
99 NameObject,
100 NullObject,
101 NumberObject,
102 PdfObject,
103 RectangleObject,
104 ReferenceLink,
105 StreamObject,
106 TextStringObject,
107 TreeObject,
108 ViewerPreferences,
109 create_string_object,
110 extract_links,
111 hex_to_rgb,
112 is_null_or_none,
113)
114from .generic._appearance_stream import TextStreamAppearance
115from .pagerange import PageRange, PageRangeSpec
116from .types import (
117 AnnotationSubtype,
118 BorderArrayType,
119 LayoutType,
120 OutlineItemType,
121 OutlineType,
122 PagemodeType,
123)
124from .xmp import XmpInformation
126ALL_DOCUMENT_PERMISSIONS = UserAccessPermissions.all()
129class ObjectDeletionFlag(enum.IntFlag):
130 NONE = 0
131 TEXT = enum.auto()
132 LINKS = enum.auto()
133 ATTACHMENTS = enum.auto()
134 OBJECTS_3D = enum.auto()
135 ALL_ANNOTATIONS = enum.auto()
136 XOBJECT_IMAGES = enum.auto()
137 INLINE_IMAGES = enum.auto()
138 DRAWING_IMAGES = enum.auto()
139 IMAGES = XOBJECT_IMAGES | INLINE_IMAGES | DRAWING_IMAGES
142def _rolling_checksum(stream: BytesIO, blocksize: int = 65536) -> str:
143 hash = hashlib.md5(usedforsecurity=False)
144 for block in iter(lambda: stream.read(blocksize), b""):
145 hash.update(block)
146 return hash.hexdigest()
149class PdfWriter(PdfDocCommon):
150 """
151 Write a PDF file out, given pages produced by another class or through
152 cloning a PDF file during initialization.
154 Typically data is added from a :class:`PdfReader<pypdf.PdfReader>`.
156 Args:
157 clone_from: identical to fileobj (for compatibility)
159 incremental: If true, loads the document and set the PdfWriter in incremental mode.
161 When writing incrementally, the original document is written first and new/modified
162 content is appended. To be used for signed document/forms to keep signature valid.
164 full: If true, loads all the objects (always full if incremental = True).
165 This parameter may allow loading large PDFs.
167 strict: If true, pypdf will raise an exception if a PDF does not follow the specification.
168 If false, pypdf will try to be forgiving and do something reasonable, but it will log
169 a warning message. It is a best-effort approach.
171 """
173 def __init__(
174 self,
175 fileobj: Union[None, PdfReader, StrByteType, Path] = "",
176 clone_from: Union[None, PdfReader, StrByteType, Path] = None,
177 incremental: bool = False,
178 full: bool = False,
179 strict: bool = False,
180 ) -> None:
181 self.strict = strict
182 """
183 If true, pypdf will raise an exception if a PDF does not follow the specification.
184 If false, pypdf will try to be forgiving and do something reasonable, but it will log
185 a warning message. It is a best-effort approach.
186 """
188 self.incremental = incremental or full
189 """
190 Returns if the PdfWriter object has been started in incremental mode.
191 """
193 self._objects: list[Optional[PdfObject]] = []
194 """
195 The indirect objects in the PDF.
196 For the incremental case, it will be filled with None
197 in clone_reader_document_root.
198 """
200 self._original_hash: list[int] = []
201 """
202 List of hashes after import; used to identify changes.
203 """
205 self._idnum_hash: dict[bytes, tuple[IndirectObject, list[IndirectObject]]] = {}
206 """
207 Maps hash values of indirect objects to the list of IndirectObjects.
208 This is used for compression.
209 """
211 self._id_translated: dict[int, dict[int, int]] = {}
212 """List of already translated IDs.
213 dict[id(pdf)][(idnum, generation)]
214 """
216 self._info_obj: Optional[PdfObject]
217 """The PDF files's document information dictionary,
218 defined by Info in the PDF file's trailer dictionary."""
220 self._ID: Union[ArrayObject, None] = None
221 """The PDF file identifier,
222 defined by the ID in the PDF file's trailer dictionary."""
224 self._unresolved_links: list[tuple[ReferenceLink, ReferenceLink]] = []
225 "Tracks links in pages added to the writer for resolving later."
226 self._merged_in_pages: dict[Optional[IndirectObject], Optional[IndirectObject]] = {}
227 "Tracks pages added to the writer and what page they turned into."
229 if self.incremental:
230 if isinstance(fileobj, (str, Path)):
231 with open(fileobj, "rb") as f:
232 fileobj = BytesIO(f.read(-1))
233 if isinstance(fileobj, BytesIO):
234 fileobj = PdfReader(fileobj)
235 if not isinstance(fileobj, PdfReader):
236 raise PyPdfError("Invalid type for incremental mode")
237 self._reader = fileobj # prev content is in _reader.stream
238 self._header = fileobj.pdf_header.encode()
239 self._readonly = True # TODO: to be analysed
240 else:
241 self._header = b"%PDF-1.3"
242 self._info_obj = self._add_object(
243 DictionaryObject(
244 {NameObject("/Producer"): create_string_object("pypdf")}
245 )
246 )
248 def _get_clone_from(
249 fileobj: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
250 clone_from: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
251 ) -> Union[None, PdfReader, str, Path, IO[Any], BytesIO]:
252 if isinstance(fileobj, (str, Path, IO, BytesIO)) and (
253 fileobj == "" or clone_from is not None
254 ):
255 return clone_from
256 cloning = True
257 if isinstance(fileobj, (str, Path)) and (
258 not Path(str(fileobj)).exists()
259 or Path(str(fileobj)).stat().st_size == 0
260 ):
261 cloning = False
262 if isinstance(fileobj, (IOBase, BytesIO)):
263 t = fileobj.tell()
264 if fileobj.seek(0, 2) == 0:
265 cloning = False
266 fileobj.seek(t, 0)
267 if cloning:
268 clone_from = fileobj
269 return clone_from
271 clone_from = _get_clone_from(fileobj, clone_from)
272 # To prevent overwriting
273 self.temp_fileobj = fileobj
274 self.fileobj = ""
275 self._with_as_usage = False
276 self._cloned = False
277 # The root of our page tree node
278 pages = DictionaryObject(
279 {
280 NameObject(PagesAttributes.TYPE): NameObject("/Pages"),
281 NameObject(PagesAttributes.COUNT): NumberObject(0),
282 NameObject(PagesAttributes.KIDS): ArrayObject(),
283 }
284 )
285 self.flattened_pages = []
286 self._encryption: Optional[Encryption] = None
287 self._encrypt_entry: Optional[DictionaryObject] = None
289 if clone_from is not None:
290 if not isinstance(clone_from, PdfReader):
291 clone_from = PdfReader(clone_from)
292 self.clone_document_from_reader(clone_from)
293 self._cloned = True
294 else:
295 self._pages = self._add_object(pages)
296 self._root_object = DictionaryObject(
297 {
298 NameObject(PagesAttributes.TYPE): NameObject(CO.CATALOG),
299 NameObject(CO.PAGES): self._pages,
300 }
301 )
302 self._add_object(self._root_object)
303 if full and not incremental:
304 self.incremental = False
305 if isinstance(self._ID, list):
306 if isinstance(self._ID[0], TextStringObject):
307 self._ID[0] = ByteStringObject(self._ID[0].get_original_bytes())
308 if isinstance(self._ID[1], TextStringObject):
309 self._ID[1] = ByteStringObject(self._ID[1].get_original_bytes())
311 # for commonality
312 @property
313 def is_encrypted(self) -> bool:
314 """
315 Read-only boolean property showing whether this PDF file is encrypted.
317 Note that this property, if true, will remain true even after the
318 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
319 """
320 return False
322 @property
323 def root_object(self) -> DictionaryObject:
324 """
325 Provide direct access to PDF Structure.
327 Note:
328 Recommended only for read access.
330 """
331 return self._root_object
333 @property
334 def _info(self) -> Optional[DictionaryObject]:
335 """
336 Provide access to "/Info". Standardized with PdfReader.
338 Returns:
339 /Info Dictionary; None if the entry does not exist
341 """
342 return (
343 None
344 if self._info_obj is None
345 else cast(DictionaryObject, self._info_obj.get_object())
346 )
348 @_info.setter
349 def _info(self, value: Optional[Union[IndirectObject, DictionaryObject]]) -> None:
350 if value is None:
351 try:
352 self._objects[self._info_obj.indirect_reference.idnum - 1] = None # type: ignore
353 except (KeyError, AttributeError):
354 pass
355 self._info_obj = None
356 else:
357 if self._info_obj is None:
358 self._info_obj = self._add_object(DictionaryObject())
359 obj = cast(DictionaryObject, self._info_obj.get_object())
360 obj.clear()
361 obj.update(cast(DictionaryObject, value.get_object()))
363 @property
364 def xmp_metadata(self) -> Optional[XmpInformation]:
365 """XMP (Extensible Metadata Platform) data."""
366 return cast(XmpInformation, self.root_object.xmp_metadata)
368 @xmp_metadata.setter
369 def xmp_metadata(self, value: Union[XmpInformation, bytes, None]) -> None:
370 """XMP (Extensible Metadata Platform) data."""
371 if value is None:
372 if "/Metadata" in self.root_object:
373 del self.root_object["/Metadata"]
374 return
376 metadata = self.root_object.get("/Metadata", None)
377 if not isinstance(metadata, IndirectObject):
378 if metadata is not None:
379 del self.root_object["/Metadata"]
380 metadata_stream = StreamObject()
381 stream_reference = self._add_object(metadata_stream)
382 self.root_object[NameObject("/Metadata")] = stream_reference
383 else:
384 metadata_stream = cast(StreamObject, metadata.get_object())
386 if isinstance(value, XmpInformation):
387 bytes_data = value.stream.get_data()
388 else:
389 bytes_data = value
390 metadata_stream.set_data(bytes_data)
392 @property
393 def with_as_usage(self) -> bool:
394 deprecation_no_replacement("with_as_usage", "5.0")
395 return self._with_as_usage
397 @with_as_usage.setter
398 def with_as_usage(self, value: bool) -> None:
399 deprecation_no_replacement("with_as_usage", "5.0")
400 self._with_as_usage = value
402 def __enter__(self) -> Self:
403 """Store how writer is initialized by 'with'."""
404 c: bool = self._cloned
405 t = self.temp_fileobj
406 self.__init__() # type: ignore
407 self._cloned = c
408 self._with_as_usage = True
409 self.fileobj = t # type: ignore
410 return self
412 def __exit__(
413 self,
414 exc_type: Optional[type[BaseException]],
415 exc: Optional[BaseException],
416 traceback: Optional[TracebackType],
417 ) -> None:
418 """Write data to the fileobj."""
419 if self.fileobj and not self._cloned:
420 self.write(self.fileobj)
422 @property
423 def pdf_header(self) -> str:
424 """
425 Read/Write property of the PDF header that is written.
427 This should be something like ``'%PDF-1.5'``. It is recommended to set
428 the lowest version that supports all features which are used within the
429 PDF file.
431 Note: `pdf_header` returns a string but accepts bytes or str for writing
432 """
433 return self._header.decode()
435 @pdf_header.setter
436 def pdf_header(self, new_header: Union[str, bytes]) -> None:
437 if isinstance(new_header, str):
438 new_header = new_header.encode()
439 self._header = new_header
441 def _add_object(self, obj: PdfObject) -> IndirectObject:
442 if (
443 getattr(obj, "indirect_reference", None) is not None
444 and obj.indirect_reference.pdf == self # type: ignore
445 ):
446 return obj.indirect_reference # type: ignore
447 # check for /Contents in Pages (/Contents in annotations are strings)
448 if isinstance(obj, DictionaryObject) and isinstance(
449 obj.get(PG.CONTENTS, None), (ArrayObject, DictionaryObject)
450 ):
451 obj[NameObject(PG.CONTENTS)] = self._add_object(obj[PG.CONTENTS])
452 self._objects.append(obj)
453 obj.indirect_reference = IndirectObject(len(self._objects), 0, self)
454 return obj.indirect_reference
456 def get_object(
457 self,
458 indirect_reference: Union[int, IndirectObject],
459 ) -> PdfObject:
460 if isinstance(indirect_reference, int):
461 obj = self._objects[indirect_reference - 1]
462 elif indirect_reference.pdf != self:
463 raise ValueError("PDF must be self")
464 else:
465 obj = self._objects[indirect_reference.idnum - 1]
466 assert obj is not None, "mypy"
467 return obj
469 def _replace_object(
470 self,
471 indirect_reference: Union[int, IndirectObject],
472 obj: PdfObject,
473 ) -> PdfObject:
474 if isinstance(indirect_reference, IndirectObject):
475 if indirect_reference.pdf != self:
476 raise ValueError("PDF must be self")
477 indirect_reference = indirect_reference.idnum
478 gen = self._objects[indirect_reference - 1].indirect_reference.generation # type: ignore
479 if (
480 getattr(obj, "indirect_reference", None) is not None
481 and obj.indirect_reference.pdf != self # type: ignore
482 ):
483 obj = obj.clone(self)
484 self._objects[indirect_reference - 1] = obj
485 obj.indirect_reference = IndirectObject(indirect_reference, gen, self)
487 assert isinstance(obj, PdfObject), "mypy"
488 return obj
490 def _add_page(
491 self,
492 page: PageObject,
493 index: int,
494 excluded_keys: Iterable[str] = (),
495 ) -> PageObject:
496 if not isinstance(page, PageObject) or page.get(PagesAttributes.TYPE, None) != CO.PAGE:
497 raise ValueError("Invalid page object")
498 assert self.flattened_pages is not None, "for mypy"
499 page_org = page
500 excluded_keys = list(excluded_keys)
501 excluded_keys += [PagesAttributes.PARENT, "/StructParents"]
502 # Acrobat does not accept two indirect references pointing on the same
503 # page; therefore in order to add multiple copies of the same
504 # page, we need to create a new dictionary for the page, however the
505 # objects below (including content) are not duplicated:
506 try: # delete an already existing page
507 del self._id_translated[id(page_org.indirect_reference.pdf)][ # type: ignore
508 page_org.indirect_reference.idnum # type: ignore
509 ]
510 except Exception:
511 pass
513 page = cast(
514 "PageObject", page_org.clone(self, False, excluded_keys).get_object()
515 )
516 if page_org.pdf is not None:
517 other = page_org.pdf.pdf_header
518 self.pdf_header = _get_max_pdf_version_header(self.pdf_header, other)
520 node, idx = self._get_page_in_node(index)
521 page[NameObject(PagesAttributes.PARENT)] = node.indirect_reference
523 if idx >= 0:
524 cast(ArrayObject, node[PagesAttributes.KIDS]).insert(idx, page.indirect_reference)
525 self.flattened_pages.insert(index, page)
526 else:
527 cast(ArrayObject, node[PagesAttributes.KIDS]).append(page.indirect_reference)
528 self.flattened_pages.append(page)
529 recurse = 0
530 while not is_null_or_none(node):
531 node = cast(DictionaryObject, node.get_object())
532 node[NameObject(PagesAttributes.COUNT)] = NumberObject(cast(int, node[PagesAttributes.COUNT]) + 1)
533 node = node.get(PagesAttributes.PARENT, None) # type: ignore[assignment] # TODO: Fix.
534 recurse += 1
535 if recurse > 1000:
536 raise PyPdfError("Too many recursive calls!")
538 if page_org.pdf is not None:
539 # the page may contain links to other pages, and those other
540 # pages may or may not already be added. we store the
541 # information we need, so that we can resolve the references
542 # later.
543 self._unresolved_links.extend(extract_links(page, page_org))
544 self._merged_in_pages[page_org.indirect_reference] = page.indirect_reference
546 return page
548 def set_need_appearances_writer(self, state: bool = True) -> None:
549 """
550 Sets the "NeedAppearances" flag in the PDF writer.
552 The "NeedAppearances" flag indicates whether the appearance dictionary
553 for form fields should be automatically generated by the PDF viewer or
554 if the embedded appearance should be used.
556 Args:
557 state: The actual value of the NeedAppearances flag.
559 Returns:
560 None
562 """
563 # See §12.7.2 and §7.7.2 for more information:
564 # https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
565 try:
566 # get the AcroForm tree
567 if CatalogDictionary.ACRO_FORM not in self._root_object:
568 self._root_object[
569 NameObject(CatalogDictionary.ACRO_FORM)
570 ] = self._add_object(DictionaryObject())
572 need_appearances = NameObject(InteractiveFormDictEntries.NeedAppearances)
573 cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])[
574 need_appearances
575 ] = BooleanObject(state)
576 except Exception as exc: # pragma: no cover
577 logger_warning(
578 f"set_need_appearances_writer({state}) catch : {exc}", __name__
579 )
581 def create_viewer_preferences(self) -> ViewerPreferences:
582 o = ViewerPreferences()
583 self._root_object[
584 NameObject(CatalogDictionary.VIEWER_PREFERENCES)
585 ] = self._add_object(o)
586 return o
588 def add_page(
589 self,
590 page: PageObject,
591 excluded_keys: Iterable[str] = (),
592 ) -> PageObject:
593 """
594 Add a page to this PDF file.
596 Recommended for advanced usage including the adequate excluded_keys.
598 The page is usually acquired from a :class:`PdfReader<pypdf.PdfReader>`
599 instance.
601 Args:
602 page: The page to add to the document. Should be
603 an instance of :class:`PageObject<pypdf._page.PageObject>`
604 excluded_keys:
606 Returns:
607 The added PageObject.
609 """
610 assert self.flattened_pages is not None, "mypy"
611 return self._add_page(page, len(self.flattened_pages), excluded_keys)
613 def insert_page(
614 self,
615 page: PageObject,
616 index: int = 0,
617 excluded_keys: Iterable[str] = (),
618 ) -> PageObject:
619 """
620 Insert a page in this PDF file. The page is usually acquired from a
621 :class:`PdfReader<pypdf.PdfReader>` instance.
623 Args:
624 page: The page to add to the document.
625 index: Position at which the page will be inserted.
626 excluded_keys:
628 Returns:
629 The added PageObject.
631 """
632 assert self.flattened_pages is not None, "mypy"
633 if index < 0:
634 index += len(self.flattened_pages)
635 if index < 0:
636 raise ValueError("Invalid index value")
637 if index >= len(self.flattened_pages):
638 return self.add_page(page, excluded_keys)
639 return self._add_page(page, index, excluded_keys)
641 def _get_page_number_by_indirect(
642 self, indirect_reference: Union[None, int, NullObject, IndirectObject]
643 ) -> Optional[int]:
644 """
645 Generate _page_id2num.
647 Args:
648 indirect_reference:
650 Returns:
651 The page number or None
653 """
654 # To provide same function as in PdfReader
655 if is_null_or_none(indirect_reference):
656 return None
657 assert indirect_reference is not None, "mypy"
658 if isinstance(indirect_reference, int):
659 indirect_reference = IndirectObject(indirect_reference, 0, self)
660 obj = indirect_reference.get_object()
661 if isinstance(obj, PageObject):
662 return obj.page_number
663 return None
665 def add_blank_page(
666 self, width: Optional[float] = None, height: Optional[float] = None
667 ) -> PageObject:
668 """
669 Append a blank page to this PDF file and return it.
671 If no page size is specified, use the size of the last page.
673 Args:
674 width: The width of the new page expressed in default user
675 space units.
676 height: The height of the new page expressed in default
677 user space units.
679 Returns:
680 The newly appended page.
682 Raises:
683 PageSizeNotDefinedError: if width and height are not defined
684 and previous page does not exist.
686 """
687 page = PageObject.create_blank_page(self, width, height)
688 return self.add_page(page)
690 def insert_blank_page(
691 self,
692 width: Optional[Union[float, decimal.Decimal]] = None,
693 height: Optional[Union[float, decimal.Decimal]] = None,
694 index: int = 0,
695 ) -> PageObject:
696 """
697 Insert a blank page to this PDF file and return it.
699 If no page size is specified for a dimension, use the size of the last page.
701 Args:
702 width: The width of the new page expressed in default user
703 space units.
704 height: The height of the new page expressed in default
705 user space units.
706 index: Position to add the page.
708 Returns:
709 The newly inserted page.
711 Raises:
712 PageSizeNotDefinedError: if width and height are not defined
713 and previous page does not exist.
715 """
716 if width is None or (height is None and index < self.get_num_pages()):
717 oldpage = self.pages[index]
718 width = oldpage.mediabox.width
719 height = oldpage.mediabox.height
720 page = PageObject.create_blank_page(self, width, height)
721 self.insert_page(page, index)
722 return page
724 @property
725 def open_destination(
726 self,
727 ) -> Union[None, Destination, TextStringObject, ByteStringObject]:
728 return super().open_destination
730 @open_destination.setter
731 def open_destination(self, dest: Union[None, str, Destination, PageObject]) -> None:
732 if dest is None:
733 try:
734 del self._root_object["/OpenAction"]
735 except KeyError:
736 pass
737 elif isinstance(dest, str):
738 self._root_object[NameObject("/OpenAction")] = TextStringObject(dest)
739 elif isinstance(dest, Destination):
740 self._root_object[NameObject("/OpenAction")] = dest.dest_array
741 elif isinstance(dest, PageObject):
742 self._root_object[NameObject("/OpenAction")] = Destination(
743 "Opening",
744 dest.indirect_reference
745 if dest.indirect_reference is not None
746 else NullObject(),
747 PAGE_FIT,
748 ).dest_array
750 def add_js(self, javascript: str) -> None:
751 """
752 Add JavaScript which will launch upon opening this PDF.
754 Args:
755 javascript: Your JavaScript.
757 Example:
758 This will launch the print window when the PDF is opened.
760 >>> from pypdf import PdfWriter
761 >>> output = PdfWriter()
762 >>> output.add_js("this.print({bUI:true,bSilent:false,bShrinkToFit:true});")
764 """
765 # Names / JavaScript preferred to be able to add multiple scripts
766 if "/Names" not in self._root_object:
767 self._root_object[NameObject(CA.NAMES)] = DictionaryObject()
768 names = cast(DictionaryObject, self._root_object[CA.NAMES])
769 if "/JavaScript" not in names:
770 names[NameObject("/JavaScript")] = DictionaryObject(
771 {NameObject("/Names"): ArrayObject()}
772 )
773 js_list = cast(
774 ArrayObject, cast(DictionaryObject, names["/JavaScript"])["/Names"]
775 )
776 # We need a name for parameterized JavaScript in the PDF file,
777 # but it can be anything.
778 js_list.append(create_string_object(str(uuid.uuid4())))
780 js = DictionaryObject(
781 {
782 NameObject(PagesAttributes.TYPE): NameObject("/Action"),
783 NameObject("/S"): NameObject("/JavaScript"),
784 NameObject("/JS"): TextStringObject(f"{javascript}"),
785 }
786 )
787 js_list.append(self._add_object(js))
789 def add_attachment(self, filename: str, data: Union[str, bytes]) -> "EmbeddedFile":
790 """
791 Embed a file inside the PDF.
793 Reference:
794 https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
795 Section 7.11.3
797 Args:
798 filename: The filename to display.
799 data: The data in the file.
801 Returns:
802 EmbeddedFile instance for the newly created embedded file.
804 """
805 return EmbeddedFile._create_new(self, filename, data)
807 def append_pages_from_reader(
808 self,
809 reader: PdfReader,
810 after_page_append: Optional[Callable[[PageObject], None]] = None,
811 ) -> None:
812 """
813 Copy pages from reader to writer. Includes an optional callback
814 parameter which is invoked after pages are appended to the writer.
816 ``append`` should be preferred.
818 Args:
819 reader: a PdfReader object from which to copy page
820 annotations to this writer object. The writer's annots
821 will then be updated.
822 after_page_append:
823 Callback function that is invoked after each page is appended to
824 the writer. Signature includes a reference to the appended page
825 (delegates to append_pages_from_reader). The single parameter of
826 the callback is a reference to the page just appended to the
827 document.
829 """
830 reader_num_pages = len(reader.pages)
831 # Copy pages from reader to writer
832 for reader_page_number in range(reader_num_pages):
833 reader_page = reader.pages[reader_page_number]
834 writer_page = self.add_page(reader_page)
835 # Trigger callback, pass writer page as parameter
836 if callable(after_page_append):
837 after_page_append(writer_page)
839 def _merge_content_stream_to_page(
840 self,
841 page: PageObject,
842 new_content_data: bytes,
843 ) -> None:
844 """
845 Combines existing content stream(s) with new content (as bytes).
847 Args:
848 page: The page to which the new content data will be added.
849 new_content_data: A binary-encoded new content stream, for
850 instance the commands to draw an XObject.
851 """
852 # First resolve the existing page content. This always is an IndirectObject:
853 # PDF Explained by John Whitington
854 # https://www.oreilly.com/library/view/pdf-explained/9781449321581/ch04.html
855 if NameObject("/Contents") in page:
856 existing_content_ref = page[NameObject("/Contents")]
857 existing_content = existing_content_ref.get_object()
859 if isinstance(existing_content, ArrayObject):
860 # Create a new StreamObject for the new_content_data
861 new_stream_obj = StreamObject()
862 new_stream_obj.set_data(new_content_data)
863 existing_content.append(self._add_object(new_stream_obj))
864 page[NameObject("/Contents")] = self._add_object(existing_content)
865 if isinstance(existing_content, StreamObject):
866 # Merge new content to existing StreamObject
867 merged_data = existing_content.get_data() + b"\n" + new_content_data
868 new_stream = StreamObject()
869 new_stream.set_data(merged_data)
870 page[NameObject("/Contents")] = self._add_object(new_stream)
871 else:
872 # If no existing content, then we have an empty page.
873 # Create a new StreamObject in a new /Contents entry.
874 new_stream = StreamObject()
875 new_stream.set_data(new_content_data)
876 page[NameObject("/Contents")] = self._add_object(new_stream)
878 def _add_apstream_object(
879 self,
880 page: PageObject,
881 appearance_stream_obj: StreamObject,
882 object_name: str,
883 x_offset: float,
884 y_offset: float,
885 ) -> None:
886 """
887 Adds an appearance stream to the page content in the form of
888 an XObject.
890 Args:
891 page: The page to which to add the appearance stream.
892 appearance_stream_obj: The appearance stream.
893 object_name: The name of the appearance stream.
894 x_offset: The horizontal offset for the appearance stream.
895 y_offset: The vertical offset for the appearance stream.
896 """
897 # Prepare XObject resource dictionary on the page. This currently
898 # only deals with font resources, but can easily be adapted to also
899 # include other resources.
900 pg_res = cast(DictionaryObject, page[PG.RESOURCES])
901 if "/Resources" in appearance_stream_obj:
902 ap_stream_res = cast(DictionaryObject, appearance_stream_obj["/Resources"])
903 ap_stream_font_dict = cast(DictionaryObject, ap_stream_res.get("/Font", DictionaryObject()))
904 if "/Font" not in pg_res:
905 font_dict_ref = self._add_object(DictionaryObject())
906 pg_res[NameObject("/Font")] = font_dict_ref
907 pg_font_res = cast(DictionaryObject, pg_res["/Font"].get_object())
908 # Merge fonts from the appearance stream into the page's font resources
909 for font_name, font_res in ap_stream_font_dict.items():
910 if font_name not in pg_font_res:
911 font_res_ref = self._add_object(font_res)
912 pg_font_res[font_name] = font_res_ref
913 # Always add the resolved stream object to the writer to get a new IndirectObject.
914 # This ensures we have a valid IndirectObject managed by *this* writer.
915 xobject_ref = self._add_object(appearance_stream_obj)
916 xobject_name = NameObject(f"/Fm_{object_name}")._sanitize()
917 if "/XObject" not in pg_res:
918 pg_res[NameObject("/XObject")] = DictionaryObject()
919 pg_xo_res = cast(DictionaryObject, pg_res["/XObject"])
920 if xobject_name not in pg_xo_res:
921 pg_xo_res[xobject_name] = xobject_ref
922 else:
923 logger_warning(
924 f"XObject {xobject_name!r} already added to page resources. This might be an issue.",
925 __name__
926 )
927 xobject_cm = Transformation().translate(x_offset, y_offset)
928 xobject_drawing_commands = f"q\n{xobject_cm._to_cm()}\n{xobject_name} Do\nQ".encode()
929 self._merge_content_stream_to_page(page, xobject_drawing_commands)
931 FFBITS_NUL = FA.FfBits(0)
933 def update_page_form_field_values(
934 self,
935 page: Union[PageObject, list[PageObject], None],
936 fields: Mapping[str, Union[str, list[str], tuple[str, str, float]]],
937 flags: FA.FfBits = FFBITS_NUL,
938 auto_regenerate: Optional[bool] = True,
939 flatten: bool = False,
940 ) -> None:
941 """
942 Update the form field values for a given page from a fields dictionary.
944 Copy field texts and values from fields to page.
945 If the field links to a parent object, add the information to the parent.
947 Args:
948 page: `PageObject` - references **PDF writer's page** where the
949 annotations and field data will be updated.
950 `List[Pageobject]` - provides list of pages to be processed.
951 `None` - all pages.
952 fields: a Python dictionary of:
954 * field names (/T) as keys and text values (/V) as value
955 * field names (/T) as keys and list of text values (/V) for multiple choice list
956 * field names (/T) as keys and tuple of:
957 * text values (/V)
958 * font id (e.g. /F1, the font id must exist)
959 * font size (0 for autosize)
961 flags: A set of flags from :class:`~pypdf.constants.FieldDictionaryAttributes.FfBits`.
963 auto_regenerate: Set/unset the need_appearances flag;
964 the flag is unchanged if auto_regenerate is None.
966 flatten: Whether or not to flatten the annotation. If True, this adds the annotation's
967 appearance stream to the page contents. Note that this option does not remove the
968 annotation itself.
970 """
971 if CatalogDictionary.ACRO_FORM not in self._root_object:
972 raise PyPdfError("No /AcroForm dictionary in PDF of PdfWriter Object")
973 acro_form = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
974 if InteractiveFormDictEntries.Fields not in acro_form:
975 raise PyPdfError("No /Fields dictionary in PDF of PdfWriter Object")
976 if isinstance(auto_regenerate, bool):
977 self.set_need_appearances_writer(auto_regenerate)
978 # Iterate through pages, update field values
979 if page is None:
980 page = list(self.pages)
981 if isinstance(page, list):
982 for p in page:
983 if PG.ANNOTS in p: # just to prevent warnings
984 self.update_page_form_field_values(p, fields, flags, None, flatten=flatten)
985 return
986 if PG.ANNOTS not in page:
987 logger_warning("No fields to update on this page", __name__)
988 return
989 for annotation in page[PG.ANNOTS]: # type: ignore
990 annotation = cast(DictionaryObject, annotation.get_object())
991 if annotation.get("/Subtype", "") != "/Widget":
992 continue
993 if "/FT" in annotation and "/T" in annotation:
994 parent_annotation = annotation
995 else:
996 parent_annotation = annotation.get(
997 PG.PARENT, DictionaryObject()
998 ).get_object()
1000 for field, value in fields.items():
1001 rectangle = cast(RectangleObject, annotation[AA.Rect])
1002 if not (
1003 self._get_qualified_field_name(parent_annotation) == field
1004 or parent_annotation.get("/T", None) == field
1005 ):
1006 continue
1007 if (
1008 parent_annotation.get("/FT", None) == "/Ch"
1009 and "/I" in parent_annotation
1010 ):
1011 del parent_annotation["/I"]
1012 if flags:
1013 annotation[NameObject(FA.Ff)] = NumberObject(flags)
1014 # Set the field value
1015 if not (value is None and flatten): # Only change values if given by user and not flattening.
1016 if isinstance(value, list):
1017 lst = ArrayObject(TextStringObject(v) for v in value)
1018 parent_annotation[NameObject(FA.V)] = lst
1019 elif isinstance(value, tuple):
1020 annotation[NameObject(FA.V)] = TextStringObject(
1021 value[0],
1022 )
1023 else:
1024 parent_annotation[NameObject(FA.V)] = TextStringObject(value)
1025 # Get or create the field's appearance stream object
1026 if parent_annotation.get(FA.FT) == "/Btn":
1027 # Checkbox button (no /FT found in Radio widgets);
1028 # We can find the associated appearance stream object
1029 # within the annotation.
1030 v = NameObject(value)
1031 ap = cast(DictionaryObject, annotation[NameObject(AA.AP)])
1032 normal_ap = cast(DictionaryObject, ap["/N"])
1033 if v not in normal_ap:
1034 v = NameObject("/Off")
1035 appearance_stream_obj = normal_ap.get(v)
1036 # Other cases will be updated through the for loop
1037 annotation[NameObject(AA.AS)] = v
1038 annotation[NameObject(FA.V)] = v
1039 elif (
1040 parent_annotation.get(FA.FT) == "/Tx"
1041 or parent_annotation.get(FA.FT) == "/Ch"
1042 ):
1043 # Textbox; we need to generate the appearance stream object
1044 if isinstance(value, tuple):
1045 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1046 acro_form, parent_annotation, annotation, value[1], value[2]
1047 )
1048 else:
1049 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1050 acro_form, parent_annotation, annotation
1051 )
1052 # Add the appearance stream object
1053 if AA.AP not in annotation:
1054 annotation[NameObject(AA.AP)] = DictionaryObject(
1055 {NameObject("/N"): self._add_object(appearance_stream_obj)}
1056 )
1057 elif "/N" not in (ap:= cast(DictionaryObject, annotation[AA.AP])):
1058 cast(DictionaryObject, annotation[NameObject(AA.AP)])[
1059 NameObject("/N")
1060 ] = self._add_object(appearance_stream_obj)
1061 else: # [/AP][/N] exists
1062 n = annotation[AA.AP]["/N"].indirect_reference.idnum # type: ignore
1063 self._objects[n - 1] = appearance_stream_obj
1064 appearance_stream_obj.indirect_reference = IndirectObject(n, 0, self)
1065 elif (
1066 annotation.get(FA.FT) == "/Sig"
1067 ): # deprecated # not implemented yet
1068 logger_warning("Signature forms not implemented yet", __name__)
1069 if flatten and appearance_stream_obj is not None:
1070 self._add_apstream_object(page, appearance_stream_obj, field, rectangle[0], rectangle[1])
1072 def reattach_fields(
1073 self, page: Optional[PageObject] = None
1074 ) -> list[DictionaryObject]:
1075 """
1076 Parse annotations within the page looking for orphan fields and
1077 reattach then into the Fields Structure.
1079 Args:
1080 page: page to analyze.
1081 If none is provided, all pages will be analyzed.
1083 Returns:
1084 list of reattached fields.
1086 """
1087 lst = []
1088 if page is None:
1089 for p in self.pages:
1090 lst += self.reattach_fields(p)
1091 return lst
1093 try:
1094 af = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
1095 except KeyError:
1096 af = DictionaryObject()
1097 self._root_object[NameObject(CatalogDictionary.ACRO_FORM)] = af
1098 try:
1099 fields = cast(ArrayObject, af[InteractiveFormDictEntries.Fields])
1100 except KeyError:
1101 fields = ArrayObject()
1102 af[NameObject(InteractiveFormDictEntries.Fields)] = fields
1104 if "/Annots" not in page:
1105 return lst
1106 annotations = cast(ArrayObject, page["/Annots"])
1107 for idx, annotation in enumerate(annotations):
1108 is_indirect = isinstance(annotation, IndirectObject)
1109 annotation = cast(DictionaryObject, annotation.get_object())
1110 if annotation.get("/Subtype", "") == "/Widget" and "/FT" in annotation:
1111 if (
1112 "indirect_reference" in annotation.__dict__
1113 and annotation.indirect_reference in fields
1114 ):
1115 continue
1116 if not is_indirect:
1117 annotations[idx] = self._add_object(annotation)
1118 fields.append(annotation.indirect_reference)
1119 lst.append(annotation)
1120 return lst
1122 def clone_reader_document_root(self, reader: PdfReader) -> None:
1123 """
1124 Copy the reader document root to the writer and all sub-elements,
1125 including pages, threads, outlines,... For partial insertion, ``append``
1126 should be considered.
1128 Args:
1129 reader: PdfReader from which the document root should be copied.
1131 """
1132 self._info_obj = None
1133 if self.incremental:
1134 self._objects = [None] * (cast(int, reader.trailer["/Size"]) - 1)
1135 for i in range(len(self._objects)):
1136 o = reader.get_object(i + 1)
1137 if o is not None:
1138 self._objects[i] = o.replicate(self)
1139 else:
1140 self._objects.clear()
1141 self._root_object = reader.root_object.clone(self)
1142 self._pages = self._root_object.raw_get("/Pages")
1144 if len(self._objects) > cast(int, reader.trailer["/Size"]):
1145 if self.strict:
1146 raise PdfReadError(
1147 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}"
1148 )
1149 logger_warning(
1150 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}",
1151 __name__
1152 )
1154 # must be done here before rewriting
1155 if self.incremental:
1156 self._original_hash = [
1157 (obj.hash_bin() if obj is not None else 0) for obj in self._objects
1158 ]
1160 try:
1161 self._flatten()
1162 except IndexError:
1163 raise PdfReadError("Got index error while flattening.")
1165 assert self.flattened_pages is not None
1166 for p in self.flattened_pages:
1167 self._replace_object(cast(IndirectObject, p.indirect_reference).idnum, p)
1168 if not self.incremental:
1169 p[NameObject("/Parent")] = self._pages
1170 if not self.incremental:
1171 cast(DictionaryObject, self._pages.get_object())[
1172 NameObject("/Kids")
1173 ] = ArrayObject([p.indirect_reference for p in self.flattened_pages])
1175 def clone_document_from_reader(
1176 self,
1177 reader: PdfReader,
1178 after_page_append: Optional[Callable[[PageObject], None]] = None,
1179 ) -> None:
1180 """
1181 Create a copy (clone) of a document from a PDF file reader cloning
1182 section '/Root' and '/Info' and '/ID' of the pdf.
1184 Args:
1185 reader: PDF file reader instance from which the clone
1186 should be created.
1187 after_page_append:
1188 Callback function that is invoked after each page is appended to
1189 the writer. Signature includes a reference to the appended page
1190 (delegates to append_pages_from_reader). The single parameter of
1191 the callback is a reference to the page just appended to the
1192 document.
1194 """
1195 self.clone_reader_document_root(reader)
1196 inf = reader._info
1197 if self.incremental:
1198 if inf is not None:
1199 self._info_obj = cast(
1200 IndirectObject, inf.clone(self).indirect_reference
1201 )
1202 assert isinstance(self._info, DictionaryObject), "for mypy"
1203 self._original_hash[
1204 self._info_obj.indirect_reference.idnum - 1
1205 ] = self._info.hash_bin()
1206 elif inf is not None:
1207 self._info_obj = self._add_object(
1208 DictionaryObject(cast(DictionaryObject, inf.get_object()))
1209 )
1210 # else: _info_obj = None done in clone_reader_document_root()
1212 try:
1213 self._ID = cast(ArrayObject, reader._ID).clone(self)
1214 except AttributeError:
1215 pass
1217 if callable(after_page_append):
1218 for page in cast(
1219 ArrayObject, cast(DictionaryObject, self._pages.get_object())["/Kids"]
1220 ):
1221 after_page_append(page.get_object())
1223 def _compute_document_identifier(self) -> ByteStringObject:
1224 stream = BytesIO()
1225 self._write_pdf_structure(stream)
1226 stream.seek(0)
1227 return ByteStringObject(_rolling_checksum(stream).encode("utf8"))
1229 def generate_file_identifiers(self) -> None:
1230 """
1231 Generate an identifier for the PDF that will be written.
1233 The only point of this is ensuring uniqueness. Reproducibility is not
1234 required.
1235 When a file is first written, both identifiers shall be set to the same value.
1236 If both identifiers match when a file reference is resolved, it is very
1237 likely that the correct and unchanged file has been found. If only the first
1238 identifier matches, a different version of the correct file has been found.
1239 see §14.4 "File Identifiers".
1240 """
1241 if self._ID:
1242 id1 = self._ID[0]
1243 id2 = self._compute_document_identifier()
1244 else:
1245 id1 = self._compute_document_identifier()
1246 id2 = id1
1247 self._ID = ArrayObject((id1, id2))
1249 def encrypt(
1250 self,
1251 user_password: str,
1252 owner_password: Optional[str] = None,
1253 use_128bit: bool = True,
1254 permissions_flag: UserAccessPermissions = ALL_DOCUMENT_PERMISSIONS,
1255 *,
1256 algorithm: Optional[str] = None,
1257 ) -> None:
1258 """
1259 Encrypt this PDF file with the PDF Standard encryption handler.
1261 Args:
1262 user_password: The password which allows for opening
1263 and reading the PDF file with the restrictions provided.
1264 owner_password: The password which allows for
1265 opening the PDF files without any restrictions. By default,
1266 the owner password is the same as the user password.
1267 use_128bit: flag as to whether to use 128bit
1268 encryption. When false, 40bit encryption will be used.
1269 By default, this flag is on.
1270 permissions_flag: permissions as described in
1271 Table 3.20 of the PDF 1.7 specification. A bit value of 1 means
1272 the permission is granted.
1273 Hence an integer value of -1 will set all flags.
1274 Bit position 3 is for printing, 4 is for modifying content,
1275 5 and 6 control annotations, 9 for form fields,
1276 10 for extraction of text and graphics.
1277 algorithm: encrypt algorithm. Values may be one of "RC4-40", "RC4-128",
1278 "AES-128", "AES-256-R5", "AES-256". If it is valid,
1279 `use_128bit` will be ignored.
1281 """
1282 if owner_password is None:
1283 owner_password = user_password
1285 if algorithm is not None:
1286 try:
1287 alg = getattr(EncryptAlgorithm, algorithm.replace("-", "_"))
1288 except AttributeError:
1289 raise ValueError(f"Algorithm '{algorithm}' NOT supported")
1290 else:
1291 alg = EncryptAlgorithm.RC4_128
1292 if not use_128bit:
1293 alg = EncryptAlgorithm.RC4_40
1294 self.generate_file_identifiers()
1295 assert self._ID
1296 self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
1297 # in case call `encrypt` again
1298 entry = self._encryption.write_entry(user_password, owner_password)
1299 if self._encrypt_entry:
1300 # replace old encrypt_entry
1301 assert self._encrypt_entry.indirect_reference is not None
1302 entry.indirect_reference = self._encrypt_entry.indirect_reference
1303 self._objects[entry.indirect_reference.idnum - 1] = entry
1304 else:
1305 self._add_object(entry)
1306 self._encrypt_entry = entry
1308 def _resolve_links(self) -> None:
1309 """Patch up links that were added to the document earlier, to
1310 make sure they still point to the same pages.
1311 """
1312 for (new_link, old_link) in self._unresolved_links:
1313 old_page = old_link.find_referenced_page()
1314 if not old_page:
1315 continue
1316 new_page = self._merged_in_pages.get(old_page)
1317 if new_page is None:
1318 continue
1319 new_link.patch_reference(self, new_page)
1321 def write_stream(self, stream: StreamType) -> None:
1322 if hasattr(stream, "mode") and "b" not in stream.mode:
1323 logger_warning(
1324 f"File <{stream.name}> to write to is not in binary mode. "
1325 "It may not be written to correctly.",
1326 __name__,
1327 )
1328 self._resolve_links()
1330 if self.incremental:
1331 self._reader.stream.seek(0)
1332 stream.write(self._reader.stream.read(-1))
1333 if len(self.list_objects_in_increment()) > 0:
1334 self._write_increment(stream) # writes objs, xref stream and startxref
1335 else:
1336 object_positions, free_objects = self._write_pdf_structure(stream)
1337 xref_location = self._write_xref_table(
1338 stream, object_positions, free_objects
1339 )
1340 self._write_trailer(stream, xref_location)
1342 def write(self, stream: Union[Path, StrByteType]) -> tuple[bool, IO[Any]]:
1343 """
1344 Write the collection of pages added to this object out as a PDF file.
1346 Args:
1347 stream: An object to write the file to. The object can support
1348 the write method and the tell method, similar to a file object, or
1349 be a file path, just like the fileobj, just named it stream to keep
1350 existing workflow.
1352 Returns:
1353 A tuple (bool, IO).
1355 """
1356 my_file = False
1358 if stream == "":
1359 raise ValueError(f"Output({stream=}) is empty.")
1361 if isinstance(stream, (str, Path)):
1362 stream = FileIO(stream, "wb")
1363 my_file = True
1365 self.write_stream(stream)
1367 if my_file:
1368 stream.close()
1369 else:
1370 stream.flush()
1372 return my_file, stream
1374 def list_objects_in_increment(self) -> list[IndirectObject]:
1375 """
1376 For analysis or debugging.
1377 Provides the list of new or modified objects that will be written
1378 in the increment.
1379 Deleted objects will not be freed but will become orphans.
1381 Returns:
1382 List of new or modified IndirectObjects
1384 """
1385 original_hash_count = len(self._original_hash)
1386 return [
1387 cast(IndirectObject, obj).indirect_reference
1388 for i, obj in enumerate(self._objects)
1389 if (
1390 obj is not None
1391 and (
1392 i >= original_hash_count
1393 or obj.hash_bin() != self._original_hash[i]
1394 )
1395 )
1396 ]
1398 def _write_increment(self, stream: StreamType) -> None:
1399 object_positions = {}
1400 object_blocks = []
1401 current_start = -1
1402 current_stop = -2
1403 original_hash_count = len(self._original_hash)
1404 for i, obj in enumerate(self._objects):
1405 if obj is not None and (
1406 i >= original_hash_count
1407 or obj.hash_bin() != self._original_hash[i]
1408 ):
1409 idnum = i + 1
1410 assert isinstance(obj, PdfObject), "mypy"
1411 # first write new/modified object
1412 object_positions[idnum] = stream.tell()
1413 stream.write(f"{idnum} 0 obj\n".encode())
1414 """ encryption is not operational
1415 if self._encryption and obj != self._encrypt_entry:
1416 obj = self._encryption.encrypt_object(obj, idnum, 0)
1417 """
1418 obj.write_to_stream(stream)
1419 stream.write(b"\nendobj\n")
1421 # prepare xref
1422 if idnum != current_stop:
1423 if current_start > 0:
1424 object_blocks.append(
1425 [current_start, current_stop - current_start]
1426 )
1427 current_start = idnum
1428 current_stop = idnum + 1
1429 assert current_start > 0, "for pytest only"
1430 object_blocks.append([current_start, current_stop - current_start])
1431 # write incremented xref
1432 xref_location = stream.tell()
1433 xr_id = len(self._objects) + 1
1434 stream.write(f"{xr_id} 0 obj".encode())
1435 init_data = {
1436 NameObject("/Type"): NameObject("/XRef"),
1437 NameObject("/Size"): NumberObject(xr_id + 1),
1438 NameObject("/Root"): self.root_object.indirect_reference,
1439 NameObject("/Filter"): NameObject("/FlateDecode"),
1440 NameObject("/Index"): ArrayObject(
1441 [NumberObject(_it) for _su in object_blocks for _it in _su]
1442 ),
1443 NameObject("/W"): ArrayObject(
1444 [NumberObject(1), NumberObject(4), NumberObject(1)]
1445 ),
1446 "__streamdata__": b"",
1447 }
1448 if self._info is not None and (
1449 self._info.indirect_reference.idnum - 1 # type: ignore
1450 >= len(self._original_hash)
1451 or cast(IndirectObject, self._info).hash_bin() # kept for future
1452 != self._original_hash[
1453 self._info.indirect_reference.idnum - 1 # type: ignore
1454 ]
1455 ):
1456 init_data[NameObject(TK.INFO)] = self._info.indirect_reference
1457 init_data[NameObject(TK.PREV)] = NumberObject(self._reader._startxref)
1458 if self._ID:
1459 init_data[NameObject(TK.ID)] = self._ID
1460 xr = StreamObject.initialize_from_dictionary(init_data)
1461 xr.set_data(
1462 b"".join(
1463 [struct.pack(b">BIB", 1, _pos, 0) for _pos in object_positions.values()]
1464 )
1465 )
1466 xr.write_to_stream(stream)
1467 stream.write(f"\nendobj\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1469 def _write_pdf_structure(self, stream: StreamType) -> tuple[list[int], list[int]]:
1470 object_positions = []
1471 free_objects = []
1472 stream.write(self.pdf_header.encode() + b"\n")
1473 stream.write(b"%\xE2\xE3\xCF\xD3\n")
1475 for idnum, obj in enumerate(self._objects, start=1):
1476 if obj is not None:
1477 object_positions.append(stream.tell())
1478 stream.write(f"{idnum} 0 obj\n".encode())
1479 if self._encryption and obj != self._encrypt_entry:
1480 obj = self._encryption.encrypt_object(obj, idnum, 0)
1481 obj.write_to_stream(stream)
1482 stream.write(b"\nendobj\n")
1483 else:
1484 object_positions.append(-1)
1485 free_objects.append(idnum)
1486 free_objects.append(0) # add 0 to loop in accordance with specification
1487 return object_positions, free_objects
1489 def _write_xref_table(
1490 self, stream: StreamType, object_positions: list[int], free_objects: list[int]
1491 ) -> int:
1492 xref_location = stream.tell()
1493 stream.write(b"xref\n")
1494 stream.write(f"0 {len(self._objects) + 1}\n".encode())
1495 stream.write(f"{free_objects[0]:0>10} {65535:0>5} f \n".encode())
1496 free_idx = 1
1497 for offset in object_positions:
1498 if offset > 0:
1499 stream.write(f"{offset:0>10} {0:0>5} n \n".encode())
1500 else:
1501 stream.write(f"{free_objects[free_idx]:0>10} {1:0>5} f \n".encode())
1502 free_idx += 1
1503 return xref_location
1505 def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
1506 """
1507 Write the PDF trailer to the stream.
1509 To quote the PDF specification:
1510 [The] trailer [gives] the location of the cross-reference table and
1511 of certain special objects within the body of the file.
1512 """
1513 stream.write(b"trailer\n")
1514 trailer = DictionaryObject(
1515 {
1516 NameObject(TK.SIZE): NumberObject(len(self._objects) + 1),
1517 NameObject(TK.ROOT): self.root_object.indirect_reference,
1518 }
1519 )
1520 if self._info is not None:
1521 trailer[NameObject(TK.INFO)] = self._info.indirect_reference
1522 if self._ID is not None:
1523 trailer[NameObject(TK.ID)] = self._ID
1524 if self._encrypt_entry:
1525 trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
1526 trailer.write_to_stream(stream)
1527 stream.write(f"\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1529 @property
1530 def metadata(self) -> Optional[DocumentInformation]:
1531 """
1532 Retrieve/set the PDF file's document information dictionary, if it exists.
1534 Args:
1535 value: dict with the entries to be set. if None : remove the /Info entry from the pdf.
1537 Note that some PDF files use (XMP) metadata streams instead of document
1538 information dictionaries, and these metadata streams will not be
1539 accessed by this function, but by :meth:`~xmp_metadata`.
1541 """
1542 return super().metadata
1544 @metadata.setter
1545 def metadata(
1546 self,
1547 value: Optional[Union[DocumentInformation, DictionaryObject, dict[Any, Any]]],
1548 ) -> None:
1549 if value is None:
1550 self._info = None
1551 else:
1552 if self._info is not None:
1553 self._info.clear()
1555 self.add_metadata(value)
1557 def add_metadata(self, infos: dict[str, Any]) -> None:
1558 """
1559 Add custom metadata to the output.
1561 Args:
1562 infos: a Python dictionary where each key is a field
1563 and each value is your new metadata.
1565 """
1566 args = {}
1567 if isinstance(infos, PdfObject):
1568 infos = cast(DictionaryObject, infos.get_object())
1569 for key, value in list(infos.items()):
1570 if isinstance(value, PdfObject):
1571 value = value.get_object()
1572 args[NameObject(key)] = create_string_object(str(value))
1573 if self._info is None:
1574 self._info = DictionaryObject()
1575 self._info.update(args)
1577 def compress_identical_objects(
1578 self,
1579 remove_identicals: bool = True,
1580 remove_orphans: bool = True,
1581 ) -> None:
1582 """
1583 Parse the PDF file and merge objects that have the same hash.
1584 This will make objects common to multiple pages.
1585 Recommended to be used just before writing output.
1587 Args:
1588 remove_identicals: Remove identical objects.
1589 remove_orphans: Remove unreferenced objects.
1591 """
1593 def replace_in_obj(
1594 obj: PdfObject, crossref: dict[IndirectObject, IndirectObject]
1595 ) -> None:
1596 if isinstance(obj, DictionaryObject):
1597 key_val = obj.items()
1598 elif isinstance(obj, ArrayObject):
1599 key_val = enumerate(obj) # type: ignore
1600 else:
1601 return
1602 assert isinstance(obj, (DictionaryObject, ArrayObject))
1603 for k, v in key_val:
1604 if isinstance(v, IndirectObject):
1605 orphans[v.idnum - 1] = False
1606 if v in crossref:
1607 obj[k] = crossref[v]
1608 else:
1609 """the filtering on DictionaryObject and ArrayObject only
1610 will be performed within replace_in_obj"""
1611 replace_in_obj(v, crossref)
1613 # _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...])
1614 self._idnum_hash = {}
1615 orphans = [True] * len(self._objects)
1616 # look for similar objects
1617 for idx, obj in enumerate(self._objects):
1618 if is_null_or_none(obj):
1619 continue
1620 assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here.
1621 assert isinstance(obj.indirect_reference, IndirectObject)
1622 h = obj.hash_value()
1623 if remove_identicals and h in self._idnum_hash:
1624 self._idnum_hash[h][1].append(obj.indirect_reference)
1625 self._objects[idx] = None
1626 else:
1627 self._idnum_hash[h] = (obj.indirect_reference, [])
1629 # generate the dict converting others to 1st
1630 cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0}
1631 cnv_rev: dict[IndirectObject, IndirectObject] = {}
1632 for k, v in cnv.items():
1633 cnv_rev.update(zip(v, (k,) * len(v)))
1635 # replace reference to merged objects
1636 for obj in self._objects:
1637 if isinstance(obj, (DictionaryObject, ArrayObject)):
1638 replace_in_obj(obj, cnv_rev)
1640 # remove orphans (if applicable)
1641 orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore
1643 if not is_null_or_none(self._info):
1644 orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore
1646 try:
1647 orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore
1648 except AttributeError:
1649 pass
1650 for i in compress(range(len(self._objects)), orphans):
1651 self._objects[i] = None
1653 def get_reference(self, obj: PdfObject) -> IndirectObject:
1654 idnum = self._objects.index(obj) + 1
1655 ref = IndirectObject(idnum, 0, self)
1656 assert ref.get_object() == obj
1657 return ref
1659 def get_outline_root(self) -> TreeObject:
1660 if CO.OUTLINES in self._root_object:
1661 # Entries in the catalog dictionary
1662 outline = cast(TreeObject, self._root_object[CO.OUTLINES])
1663 if not isinstance(outline, TreeObject):
1664 t = TreeObject(outline)
1665 self._replace_object(outline.indirect_reference.idnum, t)
1666 outline = t
1667 idnum = self._objects.index(outline) + 1
1668 outline_ref = IndirectObject(idnum, 0, self)
1669 assert outline_ref.get_object() == outline
1670 else:
1671 outline = TreeObject()
1672 outline.update({})
1673 outline_ref = self._add_object(outline)
1674 self._root_object[NameObject(CO.OUTLINES)] = outline_ref
1676 return outline
1678 def get_threads_root(self) -> ArrayObject:
1679 """
1680 The list of threads.
1682 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1684 Returns:
1685 An array (possibly empty) of Dictionaries with an ``/F`` key,
1686 and optionally information about the thread in ``/I`` or ``/Metadata`` keys.
1688 """
1689 if CO.THREADS in self._root_object:
1690 # Entries in the catalog dictionary
1691 threads = cast(ArrayObject, self._root_object[CO.THREADS])
1692 else:
1693 threads = ArrayObject()
1694 self._root_object[NameObject(CO.THREADS)] = threads
1695 return threads
1697 @property
1698 def threads(self) -> ArrayObject:
1699 """
1700 Read-only property for the list of threads.
1702 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1704 Each element is a dictionary with an ``/F`` key, and optionally
1705 information about the thread in ``/I`` or ``/Metadata`` keys.
1706 """
1707 return self.get_threads_root()
1709 def add_outline_item_destination(
1710 self,
1711 page_destination: Union[IndirectObject, PageObject, TreeObject],
1712 parent: Union[None, TreeObject, IndirectObject] = None,
1713 before: Union[None, TreeObject, IndirectObject] = None,
1714 is_open: bool = True,
1715 ) -> IndirectObject:
1716 page_destination = cast(PageObject, page_destination.get_object())
1717 if isinstance(page_destination, PageObject):
1718 return self.add_outline_item_destination(
1719 Destination(
1720 f"page #{page_destination.page_number}",
1721 cast(IndirectObject, page_destination.indirect_reference),
1722 Fit.fit(),
1723 )
1724 )
1726 if parent is None:
1727 parent = self.get_outline_root()
1729 page_destination[NameObject("/%is_open%")] = BooleanObject(is_open)
1730 parent = cast(TreeObject, parent.get_object())
1731 page_destination_ref = self._add_object(page_destination)
1732 if before is not None:
1733 before = before.indirect_reference
1734 parent.insert_child(
1735 page_destination_ref,
1736 before,
1737 self,
1738 page_destination.inc_parent_counter_outline
1739 if is_open
1740 else (lambda x, y: 0), # noqa: ARG005
1741 )
1742 if "/Count" not in page_destination:
1743 page_destination[NameObject("/Count")] = NumberObject(0)
1745 return page_destination_ref
1747 def add_outline_item_dict(
1748 self,
1749 outline_item: OutlineItemType,
1750 parent: Union[None, TreeObject, IndirectObject] = None,
1751 before: Union[None, TreeObject, IndirectObject] = None,
1752 is_open: bool = True,
1753 ) -> IndirectObject:
1754 outline_item_object = TreeObject()
1755 outline_item_object.update(outline_item)
1757 """code currently unreachable
1758 if "/A" in outline_item:
1759 action = DictionaryObject()
1760 a_dict = cast(DictionaryObject, outline_item["/A"])
1761 for k, v in list(a_dict.items()):
1762 action[NameObject(str(k))] = v
1763 action_ref = self._add_object(action)
1764 outline_item_object[NameObject("/A")] = action_ref
1765 """
1766 return self.add_outline_item_destination(
1767 outline_item_object, parent, before, is_open
1768 )
1770 def add_outline_item(
1771 self,
1772 title: str,
1773 page_number: Union[None, PageObject, IndirectObject, int],
1774 parent: Union[None, TreeObject, IndirectObject] = None,
1775 before: Union[None, TreeObject, IndirectObject] = None,
1776 color: Optional[Union[tuple[float, float, float], str]] = None,
1777 bold: bool = False,
1778 italic: bool = False,
1779 fit: Fit = PAGE_FIT,
1780 is_open: bool = True,
1781 ) -> IndirectObject:
1782 """
1783 Add an outline item (commonly referred to as a "Bookmark") to the PDF file.
1785 Args:
1786 title: Title to use for this outline item.
1787 page_number: Page number this outline item will point to.
1788 parent: A reference to a parent outline item to create nested
1789 outline items.
1790 before:
1791 color: Color of the outline item's font as a red, green, blue tuple
1792 from 0.0 to 1.0 or as a Hex String (#RRGGBB)
1793 bold: Outline item font is bold
1794 italic: Outline item font is italic
1795 fit: The fit of the destination page.
1797 Returns:
1798 The added outline item as an indirect object.
1800 """
1801 page_ref: Union[None, NullObject, IndirectObject, NumberObject]
1802 if isinstance(italic, Fit): # it means that we are on the old params
1803 if fit is not None and page_number is None:
1804 page_number = fit
1805 return self.add_outline_item(
1806 title, page_number, parent, None, before, color, bold, italic, is_open=is_open
1807 )
1808 if page_number is None:
1809 action_ref = None
1810 else:
1811 if isinstance(page_number, IndirectObject):
1812 page_ref = page_number
1813 elif isinstance(page_number, PageObject):
1814 page_ref = page_number.indirect_reference
1815 elif isinstance(page_number, int):
1816 try:
1817 page_ref = self.pages[page_number].indirect_reference
1818 except IndexError:
1819 page_ref = NumberObject(page_number)
1820 if page_ref is None:
1821 logger_warning(
1822 f"can not find reference of page {page_number}",
1823 __name__,
1824 )
1825 page_ref = NullObject()
1826 dest = Destination(
1827 NameObject("/" + title + " outline item"),
1828 page_ref,
1829 fit,
1830 )
1832 action_ref = self._add_object(
1833 DictionaryObject(
1834 {
1835 NameObject(GoToActionArguments.D): dest.dest_array,
1836 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1837 }
1838 )
1839 )
1840 outline_item = self._add_object(
1841 _create_outline_item(action_ref, title, color, italic, bold)
1842 )
1844 if parent is None:
1845 parent = self.get_outline_root()
1846 return self.add_outline_item_destination(outline_item, parent, before, is_open)
1848 def add_outline(self) -> None:
1849 raise NotImplementedError(
1850 "This method is not yet implemented. Use :meth:`add_outline_item` instead."
1851 )
1853 def add_named_destination_array(
1854 self, title: TextStringObject, destination: Union[IndirectObject, ArrayObject]
1855 ) -> None:
1856 named_dest = self.get_named_dest_root()
1857 i = 0
1858 while i < len(named_dest):
1859 if title < named_dest[i]:
1860 named_dest.insert(i, destination)
1861 named_dest.insert(i, TextStringObject(title))
1862 return
1863 i += 2
1864 named_dest.extend([TextStringObject(title), destination])
1865 return
1867 def add_named_destination_object(
1868 self,
1869 page_destination: PdfObject,
1870 ) -> IndirectObject:
1871 page_destination_ref = self._add_object(page_destination.dest_array) # type: ignore
1872 self.add_named_destination_array(
1873 cast("TextStringObject", page_destination["/Title"]), page_destination_ref # type: ignore
1874 )
1876 return page_destination_ref
1878 def add_named_destination(
1879 self,
1880 title: str,
1881 page_number: int,
1882 ) -> IndirectObject:
1883 page_ref = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
1884 dest = DictionaryObject()
1885 dest.update(
1886 {
1887 NameObject(GoToActionArguments.D): ArrayObject(
1888 [page_ref, NameObject(TypFitArguments.FIT_H), NumberObject(826)]
1889 ),
1890 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1891 }
1892 )
1894 dest_ref = self._add_object(dest)
1895 if not isinstance(title, TextStringObject):
1896 title = TextStringObject(str(title))
1898 self.add_named_destination_array(title, dest_ref)
1899 return dest_ref
1901 def remove_links(self) -> None:
1902 """Remove links and annotations from this output."""
1903 for page in self.pages:
1904 self.remove_objects_from_page(page, ObjectDeletionFlag.ALL_ANNOTATIONS)
1906 def remove_annotations(
1907 self, subtypes: Optional[Union[AnnotationSubtype, Iterable[AnnotationSubtype]]]
1908 ) -> None:
1909 """
1910 Remove annotations by annotation subtype.
1912 Args:
1913 subtypes: subtype or list of subtypes to be removed.
1914 Examples are: "/Link", "/FileAttachment", "/Sound",
1915 "/Movie", "/Screen", ...
1916 If you want to remove all annotations, use subtypes=None.
1918 """
1919 for page in self.pages:
1920 self._remove_annots_from_page(page, subtypes)
1922 def _remove_annots_from_page(
1923 self,
1924 page: Union[IndirectObject, PageObject, DictionaryObject],
1925 subtypes: Optional[Iterable[str]],
1926 ) -> None:
1927 page = cast(DictionaryObject, page.get_object())
1928 if PG.ANNOTS in page:
1929 i = 0
1930 while i < len(cast(ArrayObject, page[PG.ANNOTS])):
1931 an = cast(ArrayObject, page[PG.ANNOTS])[i]
1932 obj = cast(DictionaryObject, an.get_object())
1933 if subtypes is None or cast(str, obj["/Subtype"]) in subtypes:
1934 if isinstance(an, IndirectObject):
1935 self._objects[an.idnum - 1] = NullObject() # to reduce PDF size
1936 del page[PG.ANNOTS][i] # type:ignore
1937 else:
1938 i += 1
1940 def remove_objects_from_page(
1941 self,
1942 page: Union[PageObject, DictionaryObject],
1943 to_delete: Union[ObjectDeletionFlag, Iterable[ObjectDeletionFlag]],
1944 text_filters: Optional[dict[str, Any]] = None
1945 ) -> None:
1946 """
1947 Remove objects specified by ``to_delete`` from the given page.
1949 Args:
1950 page: Page object to clean up.
1951 to_delete: Objects to be deleted; can be a ``ObjectDeletionFlag``
1952 or a list of ObjectDeletionFlag
1953 text_filters: Properties of text to be deleted, if applicable. Optional.
1954 This is a Python dictionary with the following properties:
1956 * font_ids: List of font resource IDs (such as /F1 or /T1_0) to be deleted.
1958 """
1959 if isinstance(to_delete, (list, tuple)):
1960 for to_d in to_delete:
1961 self.remove_objects_from_page(page, to_d)
1962 return None
1963 assert isinstance(to_delete, ObjectDeletionFlag)
1965 if to_delete & ObjectDeletionFlag.LINKS:
1966 return self._remove_annots_from_page(page, ("/Link",))
1967 if to_delete & ObjectDeletionFlag.ATTACHMENTS:
1968 return self._remove_annots_from_page(
1969 page, ("/FileAttachment", "/Sound", "/Movie", "/Screen")
1970 )
1971 if to_delete & ObjectDeletionFlag.OBJECTS_3D:
1972 return self._remove_annots_from_page(page, ("/3D",))
1973 if to_delete & ObjectDeletionFlag.ALL_ANNOTATIONS:
1974 return self._remove_annots_from_page(page, None)
1976 jump_operators = []
1977 if to_delete & ObjectDeletionFlag.DRAWING_IMAGES:
1978 jump_operators = [
1979 b"w", b"J", b"j", b"M", b"d", b"i",
1980 b"W", b"W*",
1981 b"b", b"b*", b"B", b"B*", b"S", b"s", b"f", b"f*", b"F", b"n",
1982 b"m", b"l", b"c", b"v", b"y", b"h", b"re",
1983 b"sh"
1984 ]
1985 if to_delete & ObjectDeletionFlag.TEXT:
1986 jump_operators = [b"Tj", b"TJ", b"'", b'"']
1988 if not isinstance(page, PageObject):
1989 page = PageObject(self, page.indirect_reference) # pragma: no cover
1990 if "/Contents" in page:
1991 content = cast(ContentStream, page.get_contents())
1993 images, forms = self._remove_objects_from_page__clean_forms(
1994 elt=page, stack=[], jump_operators=jump_operators, to_delete=to_delete, text_filters=text_filters,
1995 )
1997 self._remove_objects_from_page__clean(
1998 content=content, images=images, forms=forms,
1999 jump_operators=jump_operators, to_delete=to_delete,
2000 text_filters=text_filters
2001 )
2002 page.replace_contents(content)
2003 return [], [] # type: ignore[return-value]
2005 def _remove_objects_from_page__clean(
2006 self,
2007 content: ContentStream,
2008 images: list[str],
2009 forms: list[str],
2010 jump_operators: list[bytes],
2011 to_delete: ObjectDeletionFlag,
2012 text_filters: Optional[dict[str, Any]] = None,
2013 ) -> None:
2014 font_id = None
2015 font_ids_to_delete = []
2016 if text_filters and to_delete & ObjectDeletionFlag.TEXT:
2017 font_ids_to_delete = text_filters.get("font_ids", [])
2019 i = 0
2020 while i < len(content.operations):
2021 operands, operator = content.operations[i]
2022 if operator == b"Tf":
2023 font_id = operands[0]
2024 if (
2025 (
2026 operator == b"INLINE IMAGE"
2027 and (to_delete & ObjectDeletionFlag.INLINE_IMAGES)
2028 )
2029 or (operator in jump_operators)
2030 or (
2031 operator == b"Do"
2032 and (to_delete & ObjectDeletionFlag.XOBJECT_IMAGES)
2033 and (operands[0] in images)
2034 )
2035 ):
2036 if (
2037 not to_delete & ObjectDeletionFlag.TEXT
2038 or (to_delete & ObjectDeletionFlag.TEXT and not text_filters)
2039 or (to_delete & ObjectDeletionFlag.TEXT and font_id in font_ids_to_delete)
2040 ):
2041 del content.operations[i]
2042 else:
2043 i += 1
2044 else:
2045 i += 1
2046 content.get_data() # this ensures ._data is rebuilt from the .operations
2048 def _remove_objects_from_page__clean_forms(
2049 self,
2050 elt: DictionaryObject,
2051 stack: list[DictionaryObject],
2052 jump_operators: list[bytes],
2053 to_delete: ObjectDeletionFlag,
2054 text_filters: Optional[dict[str, Any]] = None,
2055 ) -> tuple[list[str], list[str]]:
2056 # elt in recursive call is a new ContentStream object, so we have to check the indirect_reference
2057 if (elt in stack) or (
2058 hasattr(elt, "indirect_reference") and any(
2059 elt.indirect_reference == getattr(x, "indirect_reference", -1)
2060 for x in stack
2061 )
2062 ):
2063 # to prevent infinite looping
2064 return [], [] # pragma: no cover
2065 try:
2066 d = cast(
2067 dict[Any, Any],
2068 cast(DictionaryObject, elt["/Resources"])["/XObject"],
2069 )
2070 except KeyError:
2071 d = {}
2072 images = []
2073 forms = []
2074 for k, v in d.items():
2075 o = v.get_object()
2076 try:
2077 content: Any = None
2078 if (
2079 to_delete & ObjectDeletionFlag.XOBJECT_IMAGES
2080 and o["/Subtype"] == "/Image"
2081 ):
2082 content = NullObject() # to delete the image keeping the entry
2083 images.append(k)
2084 if o["/Subtype"] == "/Form":
2085 forms.append(k)
2086 if isinstance(o, ContentStream):
2087 content = o
2088 else:
2089 content = ContentStream(o, self)
2090 content.update(
2091 {
2092 k1: v1
2093 for k1, v1 in o.items()
2094 if k1 not in ["/Length", "/Filter", "/DecodeParms"]
2095 }
2096 )
2097 try:
2098 content.indirect_reference = o.indirect_reference
2099 except AttributeError: # pragma: no cover
2100 pass
2101 stack.append(elt)
2103 # clean subforms
2104 self._remove_objects_from_page__clean_forms(
2105 elt=content, stack=stack, jump_operators=jump_operators, to_delete=to_delete,
2106 text_filters=text_filters,
2107 )
2108 if content is not None:
2109 if isinstance(v, IndirectObject):
2110 self._objects[v.idnum - 1] = content
2111 else:
2112 # should only occur in a PDF not respecting PDF spec
2113 # where streams must be indirected.
2114 d[k] = self._add_object(content) # pragma: no cover
2115 except (TypeError, KeyError):
2116 pass
2117 for im in images:
2118 del d[im] # for clean-up
2119 if isinstance(elt, StreamObject): # for /Form
2120 if not isinstance(elt, ContentStream): # pragma: no cover
2121 e = ContentStream(elt, self)
2122 e.update(elt.items())
2123 elt = e
2124 # clean the content
2125 self._remove_objects_from_page__clean(
2126 content=elt, images=images, forms=forms, jump_operators=jump_operators,
2127 to_delete=to_delete, text_filters=text_filters
2128 )
2129 return images, forms
2131 def remove_images(
2132 self,
2133 to_delete: ImageType = ImageType.ALL,
2134 ) -> None:
2135 """
2136 Remove images from this output.
2138 Args:
2139 to_delete: The type of images to be deleted
2140 (default = all images types)
2142 """
2143 if isinstance(to_delete, bool):
2144 to_delete = ImageType.ALL
2146 i = ObjectDeletionFlag.NONE
2148 for image in ("XOBJECT_IMAGES", "INLINE_IMAGES", "DRAWING_IMAGES"):
2149 if to_delete & ImageType[image]:
2150 i |= ObjectDeletionFlag[image]
2152 for page in self.pages:
2153 self.remove_objects_from_page(page, i)
2155 def remove_text(self, font_names: Optional[list[str]] = None) -> None:
2156 """
2157 Remove text from the PDF.
2159 Args:
2160 font_names: List of font names to remove, such as "Helvetica-Bold".
2161 Optional. If not specified, all text will be removed.
2162 """
2163 if not font_names:
2164 font_names = []
2166 for page in self.pages:
2167 resource_ids_to_remove = []
2169 # Content streams reference fonts and other resources with names like "/F1" or "/T1_0"
2170 # Font names need to be converted to resource names/IDs for easier removal
2171 if font_names:
2172 # Recursively loop through page objects to gather font info
2173 def get_font_info(
2174 obj: Any,
2175 font_info: Optional[dict[str, Any]] = None,
2176 key: Optional[str] = None
2177 ) -> dict[str, Any]:
2178 if font_info is None:
2179 font_info = {}
2180 if isinstance(obj, IndirectObject):
2181 obj = obj.get_object()
2182 if isinstance(obj, dict):
2183 if obj.get("/Type") == "/Font":
2184 font_name = obj.get("/BaseFont", "")
2185 # Normalize font names like "/RRXFFV+Palatino-Bold" to "Palatino-Bold"
2186 normalized_font_name = font_name.lstrip("/").split("+")[-1]
2187 if normalized_font_name not in font_info:
2188 font_info[normalized_font_name] = {
2189 "normalized_font_name": normalized_font_name,
2190 "resource_ids": [],
2191 }
2192 if key not in font_info[normalized_font_name]["resource_ids"]:
2193 font_info[normalized_font_name]["resource_ids"].append(key)
2194 for k in obj:
2195 font_info = get_font_info(obj[k], font_info, k)
2196 elif isinstance(obj, (list, ArrayObject)):
2197 for child_obj in obj:
2198 font_info = get_font_info(child_obj, font_info)
2199 return font_info
2201 # Add relevant resource names for removal
2202 font_info = get_font_info(page.get("/Resources"))
2203 for font_name in font_names:
2204 if font_name in font_info:
2205 resource_ids_to_remove.extend(font_info[font_name]["resource_ids"])
2207 text_filters = {}
2208 if font_names:
2209 text_filters["font_ids"] = resource_ids_to_remove
2210 self.remove_objects_from_page(page, ObjectDeletionFlag.TEXT, text_filters=text_filters)
2212 def add_uri(
2213 self,
2214 page_number: int,
2215 uri: str,
2216 rect: RectangleObject,
2217 border: Optional[ArrayObject] = None,
2218 ) -> None:
2219 """
2220 Add an URI from a rectangular area to the specified page.
2222 Args:
2223 page_number: index of the page on which to place the URI action.
2224 uri: URI of resource to link to.
2225 rect: :class:`RectangleObject<pypdf.generic.RectangleObject>` or
2226 array of four integers specifying the clickable rectangular area
2227 ``[xLL, yLL, xUR, yUR]``, or string in the form
2228 ``"[ xLL yLL xUR yUR ]"``.
2229 border: if provided, an array describing border-drawing
2230 properties. See the PDF spec for details. No border will be
2231 drawn if this argument is omitted.
2233 """
2234 page_link = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
2235 page_ref = cast(dict[str, Any], self.get_object(page_link))
2237 border_arr: BorderArrayType
2238 if border is not None:
2239 border_arr = [NumberObject(n) for n in border[:3]]
2240 if len(border) == 4:
2241 dash_pattern = ArrayObject([NumberObject(n) for n in border[3]])
2242 border_arr.append(dash_pattern)
2243 else:
2244 border_arr = [NumberObject(2), NumberObject(2), NumberObject(2)]
2246 if isinstance(rect, str):
2247 rect = NumberObject(rect)
2248 elif isinstance(rect, RectangleObject):
2249 pass
2250 else:
2251 rect = RectangleObject(rect)
2253 lnk2 = DictionaryObject()
2254 lnk2.update(
2255 {
2256 NameObject("/S"): NameObject("/URI"),
2257 NameObject("/URI"): TextStringObject(uri),
2258 }
2259 )
2260 lnk = DictionaryObject()
2261 lnk.update(
2262 {
2263 NameObject(AA.Type): NameObject("/Annot"),
2264 NameObject(AA.Subtype): NameObject("/Link"),
2265 NameObject(AA.P): page_link,
2266 NameObject(AA.Rect): rect,
2267 NameObject("/H"): NameObject("/I"),
2268 NameObject(AA.Border): ArrayObject(border_arr),
2269 NameObject("/A"): lnk2,
2270 }
2271 )
2272 lnk_ref = self._add_object(lnk)
2274 if PG.ANNOTS in page_ref:
2275 page_ref[PG.ANNOTS].append(lnk_ref)
2276 else:
2277 page_ref[NameObject(PG.ANNOTS)] = ArrayObject([lnk_ref])
2279 _valid_layouts = (
2280 "/NoLayout",
2281 "/SinglePage",
2282 "/OneColumn",
2283 "/TwoColumnLeft",
2284 "/TwoColumnRight",
2285 "/TwoPageLeft",
2286 "/TwoPageRight",
2287 )
2289 def _get_page_layout(self) -> Optional[LayoutType]:
2290 try:
2291 return cast(LayoutType, self._root_object["/PageLayout"])
2292 except KeyError:
2293 return None
2295 def _set_page_layout(self, layout: Union[NameObject, LayoutType]) -> None:
2296 """
2297 Set the page layout.
2299 Args:
2300 layout: The page layout to be used.
2302 .. list-table:: Valid ``layout`` arguments
2303 :widths: 50 200
2305 * - /NoLayout
2306 - Layout explicitly not specified
2307 * - /SinglePage
2308 - Show one page at a time
2309 * - /OneColumn
2310 - Show one column at a time
2311 * - /TwoColumnLeft
2312 - Show pages in two columns, odd-numbered pages on the left
2313 * - /TwoColumnRight
2314 - Show pages in two columns, odd-numbered pages on the right
2315 * - /TwoPageLeft
2316 - Show two pages at a time, odd-numbered pages on the left
2317 * - /TwoPageRight
2318 - Show two pages at a time, odd-numbered pages on the right
2320 """
2321 if not isinstance(layout, NameObject):
2322 if layout not in self._valid_layouts:
2323 logger_warning(
2324 f"Layout should be one of: {'', ''.join(self._valid_layouts)}",
2325 __name__,
2326 )
2327 layout = NameObject(layout)
2328 self._root_object.update({NameObject("/PageLayout"): layout})
2330 def set_page_layout(self, layout: LayoutType) -> None:
2331 """
2332 Set the page layout.
2334 Args:
2335 layout: The page layout to be used
2337 .. list-table:: Valid ``layout`` arguments
2338 :widths: 50 200
2340 * - /NoLayout
2341 - Layout explicitly not specified
2342 * - /SinglePage
2343 - Show one page at a time
2344 * - /OneColumn
2345 - Show one column at a time
2346 * - /TwoColumnLeft
2347 - Show pages in two columns, odd-numbered pages on the left
2348 * - /TwoColumnRight
2349 - Show pages in two columns, odd-numbered pages on the right
2350 * - /TwoPageLeft
2351 - Show two pages at a time, odd-numbered pages on the left
2352 * - /TwoPageRight
2353 - Show two pages at a time, odd-numbered pages on the right
2355 """
2356 self._set_page_layout(layout)
2358 @property
2359 def page_layout(self) -> Optional[LayoutType]:
2360 """
2361 Page layout property.
2363 .. list-table:: Valid ``layout`` values
2364 :widths: 50 200
2366 * - /NoLayout
2367 - Layout explicitly not specified
2368 * - /SinglePage
2369 - Show one page at a time
2370 * - /OneColumn
2371 - Show one column at a time
2372 * - /TwoColumnLeft
2373 - Show pages in two columns, odd-numbered pages on the left
2374 * - /TwoColumnRight
2375 - Show pages in two columns, odd-numbered pages on the right
2376 * - /TwoPageLeft
2377 - Show two pages at a time, odd-numbered pages on the left
2378 * - /TwoPageRight
2379 - Show two pages at a time, odd-numbered pages on the right
2380 """
2381 return self._get_page_layout()
2383 @page_layout.setter
2384 def page_layout(self, layout: LayoutType) -> None:
2385 self._set_page_layout(layout)
2387 _valid_modes = (
2388 "/UseNone",
2389 "/UseOutlines",
2390 "/UseThumbs",
2391 "/FullScreen",
2392 "/UseOC",
2393 "/UseAttachments",
2394 )
2396 def _get_page_mode(self) -> Optional[PagemodeType]:
2397 try:
2398 return cast(PagemodeType, self._root_object["/PageMode"])
2399 except KeyError:
2400 return None
2402 @property
2403 def page_mode(self) -> Optional[PagemodeType]:
2404 """
2405 Page mode property.
2407 .. list-table:: Valid ``mode`` values
2408 :widths: 50 200
2410 * - /UseNone
2411 - Do not show outline or thumbnails panels
2412 * - /UseOutlines
2413 - Show outline (aka bookmarks) panel
2414 * - /UseThumbs
2415 - Show page thumbnails panel
2416 * - /FullScreen
2417 - Fullscreen view
2418 * - /UseOC
2419 - Show Optional Content Group (OCG) panel
2420 * - /UseAttachments
2421 - Show attachments panel
2422 """
2423 return self._get_page_mode()
2425 @page_mode.setter
2426 def page_mode(self, mode: PagemodeType) -> None:
2427 if isinstance(mode, NameObject):
2428 mode_name: NameObject = mode
2429 else:
2430 if mode not in self._valid_modes:
2431 logger_warning(
2432 f"Mode should be one of: {', '.join(self._valid_modes)}", __name__
2433 )
2434 mode_name = NameObject(mode)
2435 self._root_object.update({NameObject("/PageMode"): mode_name})
2437 def add_annotation(
2438 self,
2439 page_number: Union[int, PageObject],
2440 annotation: dict[str, Any],
2441 ) -> DictionaryObject:
2442 """
2443 Add a single annotation to the page.
2444 The added annotation must be a new annotation.
2445 It cannot be recycled.
2447 Args:
2448 page_number: PageObject or page index.
2449 annotation: Annotation to be added (created with annotation).
2451 Returns:
2452 The inserted object.
2453 This can be used for popup creation, for example.
2455 """
2456 page = page_number
2457 if isinstance(page, int):
2458 page = self.pages[page]
2459 elif not isinstance(page, PageObject):
2460 raise TypeError("page: invalid type")
2462 to_add = cast(DictionaryObject, _pdf_objectify(annotation))
2463 to_add[NameObject("/P")] = page.indirect_reference
2465 if page.annotations is None:
2466 page[NameObject("/Annots")] = ArrayObject()
2467 assert page.annotations is not None
2469 # Internal link annotations need the correct object type for the
2470 # destination
2471 if to_add.get("/Subtype") == "/Link" and "/Dest" in to_add:
2472 tmp = cast(dict[Any, Any], to_add[NameObject("/Dest")])
2473 dest = Destination(
2474 NameObject("/LinkName"),
2475 tmp["target_page_index"],
2476 Fit(
2477 fit_type=tmp["fit"], fit_args=dict(tmp)["fit_args"]
2478 ), # I have no clue why this dict-hack is necessary
2479 )
2480 to_add[NameObject("/Dest")] = dest.dest_array
2482 page.annotations.append(self._add_object(to_add))
2484 if to_add.get("/Subtype") == "/Popup" and NameObject("/Parent") in to_add:
2485 cast(DictionaryObject, to_add["/Parent"].get_object())[
2486 NameObject("/Popup")
2487 ] = to_add.indirect_reference
2489 return to_add
2491 def clean_page(self, page: Union[PageObject, IndirectObject]) -> PageObject:
2492 """
2493 Perform some clean up in the page.
2494 Currently: convert NameObject named destination to TextStringObject
2495 (required for names/dests list)
2497 Args:
2498 page:
2500 Returns:
2501 The cleaned PageObject
2503 """
2504 page = cast("PageObject", page.get_object())
2505 for a in page.get("/Annots", []):
2506 a_obj = a.get_object()
2507 d = a_obj.get("/Dest", None)
2508 act = a_obj.get("/A", None)
2509 if isinstance(d, NameObject):
2510 a_obj[NameObject("/Dest")] = TextStringObject(d)
2511 elif act is not None:
2512 act = act.get_object()
2513 d = act.get("/D", None)
2514 if isinstance(d, NameObject):
2515 act[NameObject("/D")] = TextStringObject(d)
2516 return page
2518 def _create_stream(
2519 self, fileobj: Union[Path, StrByteType, PdfReader]
2520 ) -> tuple[IOBase, Optional[Encryption]]:
2521 # If the fileobj parameter is a string, assume it is a path
2522 # and create a file object at that location. If it is a file,
2523 # copy the file's contents into a BytesIO stream object; if
2524 # it is a PdfReader, copy that reader's stream into a
2525 # BytesIO stream.
2526 # If fileobj is none of the above types, it is not modified
2527 encryption_obj = None
2528 stream: IOBase
2529 if isinstance(fileobj, (str, Path)):
2530 with FileIO(fileobj, "rb") as f:
2531 stream = BytesIO(f.read())
2532 elif isinstance(fileobj, PdfReader):
2533 if fileobj._encryption:
2534 encryption_obj = fileobj._encryption
2535 orig_tell = fileobj.stream.tell()
2536 fileobj.stream.seek(0)
2537 stream = BytesIO(fileobj.stream.read())
2539 # reset the stream to its original location
2540 fileobj.stream.seek(orig_tell)
2541 elif hasattr(fileobj, "seek") and hasattr(fileobj, "read"):
2542 fileobj.seek(0)
2543 filecontent = fileobj.read()
2544 stream = BytesIO(filecontent)
2545 else:
2546 raise NotImplementedError(
2547 "Merging requires an object that PdfReader can parse. "
2548 "Typically, that is a Path or a string representing a Path, "
2549 "a file object, or an object implementing .seek and .read. "
2550 "Passing a PdfReader directly works as well."
2551 )
2552 return stream, encryption_obj
2554 def append(
2555 self,
2556 fileobj: Union[StrByteType, PdfReader, Path],
2557 outline_item: Union[
2558 str, None, PageRange, tuple[int, int], tuple[int, int, int], list[int]
2559 ] = None,
2560 pages: Union[
2561 None,
2562 PageRange,
2563 tuple[int, int],
2564 tuple[int, int, int],
2565 list[int],
2566 list[PageObject],
2567 ] = None,
2568 import_outline: bool = True,
2569 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = None,
2570 ) -> None:
2571 """
2572 Identical to the :meth:`merge()<merge>` method, but assumes you want to
2573 concatenate all pages onto the end of the file instead of specifying a
2574 position.
2576 Args:
2577 fileobj: A File Object or an object that supports the standard
2578 read and seek methods similar to a File Object. Could also be a
2579 string representing a path to a PDF file.
2580 outline_item: Optionally, you may specify a string to build an
2581 outline (aka 'bookmark') to identify the beginning of the
2582 included file.
2583 pages: Can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2584 or a ``(start, stop[, step])`` tuple
2585 or a list of pages to be processed
2586 to merge only the specified range of pages from the source
2587 document into the output document.
2588 import_outline: You may prevent the source document's
2589 outline (collection of outline items, previously referred to as
2590 'bookmarks') from being imported by specifying this as ``False``.
2591 excluded_fields: Provide the list of fields/keys to be ignored
2592 if ``/Annots`` is part of the list, the annotation will be ignored
2593 if ``/B`` is part of the list, the articles will be ignored
2595 """
2596 if excluded_fields is None:
2597 excluded_fields = ()
2598 if isinstance(outline_item, (tuple, list, PageRange)):
2599 if isinstance(pages, bool):
2600 if not isinstance(import_outline, bool):
2601 excluded_fields = import_outline
2602 import_outline = pages
2603 pages = outline_item
2604 self.merge(
2605 None,
2606 fileobj,
2607 None,
2608 pages,
2609 import_outline,
2610 excluded_fields,
2611 )
2612 else: # if isinstance(outline_item, str):
2613 self.merge(
2614 None,
2615 fileobj,
2616 outline_item,
2617 pages,
2618 import_outline,
2619 excluded_fields,
2620 )
2622 def merge(
2623 self,
2624 position: Optional[int],
2625 fileobj: Union[Path, StrByteType, PdfReader],
2626 outline_item: Optional[str] = None,
2627 pages: Optional[Union[PageRangeSpec, list[PageObject]]] = None,
2628 import_outline: bool = True,
2629 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = (),
2630 ) -> None:
2631 """
2632 Merge the pages from the given file into the output file at the
2633 specified page number.
2635 Args:
2636 position: The *page number* to insert this file. File will
2637 be inserted after the given number.
2638 fileobj: A File Object or an object that supports the standard
2639 read and seek methods similar to a File Object. Could also be a
2640 string representing a path to a PDF file.
2641 outline_item: Optionally, you may specify a string to build an outline
2642 (aka 'bookmark') to identify the
2643 beginning of the included file.
2644 pages: can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2645 or a ``(start, stop[, step])`` tuple
2646 or a list of pages to be processed
2647 to merge only the specified range of pages from the source
2648 document into the output document.
2649 import_outline: You may prevent the source document's
2650 outline (collection of outline items, previously referred to as
2651 'bookmarks') from being imported by specifying this as ``False``.
2652 excluded_fields: provide the list of fields/keys to be ignored
2653 if ``/Annots`` is part of the list, the annotation will be ignored
2654 if ``/B`` is part of the list, the articles will be ignored
2656 Raises:
2657 TypeError: The pages attribute is not configured properly
2659 """
2660 if isinstance(fileobj, PdfDocCommon):
2661 reader = fileobj
2662 else:
2663 stream, _encryption_obj = self._create_stream(fileobj)
2664 # Create a new PdfReader instance using the stream
2665 # (either file or BytesIO or StringIO) created above
2666 reader = PdfReader(stream, strict=False) # type: ignore[arg-type]
2668 if excluded_fields is None:
2669 excluded_fields = ()
2670 # Find the range of pages to merge.
2671 if pages is None:
2672 pages = list(range(len(reader.pages)))
2673 elif isinstance(pages, PageRange):
2674 pages = list(range(*pages.indices(len(reader.pages))))
2675 elif isinstance(pages, list):
2676 pass # keep unchanged
2677 elif isinstance(pages, tuple) and len(pages) <= 3:
2678 pages = list(range(*pages))
2679 elif not isinstance(pages, tuple):
2680 raise TypeError(
2681 '"pages" must be a tuple of (start, stop[, step]) or a list'
2682 )
2684 srcpages = {}
2685 for page in pages:
2686 if isinstance(page, PageObject):
2687 pg = page
2688 else:
2689 pg = reader.pages[page]
2690 assert pg.indirect_reference is not None
2691 if position is None:
2692 # numbers in the exclude list identifies that the exclusion is
2693 # only applicable to 1st level of cloning
2694 srcpages[pg.indirect_reference.idnum] = self.add_page(
2695 pg, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2696 )
2697 else:
2698 srcpages[pg.indirect_reference.idnum] = self.insert_page(
2699 pg, position, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2700 )
2701 position += 1
2702 srcpages[pg.indirect_reference.idnum].original_page = pg
2704 reader._named_destinations = (
2705 reader.named_destinations
2706 ) # need for the outline processing below
2708 arr: Any
2710 for dest in reader._named_destinations.values():
2711 self._merge__process_named_dests(dest=dest, reader=reader, srcpages=srcpages)
2713 outline_item_typ: TreeObject
2714 if outline_item is not None:
2715 outline_item_typ = cast(
2716 "TreeObject",
2717 self.add_outline_item(
2718 TextStringObject(outline_item),
2719 next(iter(srcpages.values())).indirect_reference,
2720 fit=PAGE_FIT,
2721 ).get_object(),
2722 )
2723 else:
2724 outline_item_typ = self.get_outline_root()
2726 _ro = reader.root_object
2727 if import_outline and CO.OUTLINES in _ro:
2728 outline = self._get_filtered_outline(
2729 _ro.get(CO.OUTLINES, None), srcpages, reader
2730 )
2731 self._insert_filtered_outline(
2732 outline, outline_item_typ, None
2733 ) # TODO: use before parameter
2735 if "/Annots" not in excluded_fields:
2736 for pag in srcpages.values():
2737 lst = self._insert_filtered_annotations(
2738 pag.original_page.get("/Annots", []), pag, srcpages, reader
2739 )
2740 if len(lst) > 0:
2741 pag[NameObject("/Annots")] = lst
2742 self.clean_page(pag)
2744 if "/AcroForm" in _ro and not is_null_or_none(_ro["/AcroForm"]):
2745 if "/AcroForm" not in self._root_object:
2746 self._root_object[NameObject("/AcroForm")] = self._add_object(
2747 cast(
2748 DictionaryObject,
2749 reader.root_object["/AcroForm"],
2750 ).clone(self, False, ("/Fields",))
2751 )
2752 arr = ArrayObject()
2753 else:
2754 arr = cast(
2755 ArrayObject,
2756 cast(DictionaryObject, self._root_object["/AcroForm"])["/Fields"],
2757 )
2758 trslat = self._id_translated[id(reader)]
2759 try:
2760 for f in reader.root_object["/AcroForm"]["/Fields"]: # type: ignore
2761 try:
2762 ind = IndirectObject(trslat[f.idnum], 0, self)
2763 if ind not in arr:
2764 arr.append(ind)
2765 except KeyError:
2766 # for trslat[] which mean the field has not be copied
2767 # through the page
2768 pass
2769 except KeyError: # for /Acroform or /Fields are not existing
2770 arr = self._add_object(ArrayObject())
2771 cast(DictionaryObject, self._root_object["/AcroForm"])[
2772 NameObject("/Fields")
2773 ] = arr
2775 if "/B" not in excluded_fields:
2776 self.add_filtered_articles("", srcpages, reader)
2778 def _merge__process_named_dests(self, dest: Any, reader: PdfDocCommon, srcpages: dict[int, PageObject]) -> None:
2779 arr: Any = dest.dest_array
2780 if "/Names" in self._root_object and dest["/Title"] in cast(
2781 list[Any],
2782 cast(
2783 DictionaryObject,
2784 cast(DictionaryObject, self._root_object["/Names"]).get("/Dests", DictionaryObject()),
2785 ).get("/Names", DictionaryObject()),
2786 ):
2787 # already exists: should not duplicate it
2788 pass
2789 elif dest["/Page"] is None or isinstance(dest["/Page"], NullObject):
2790 pass
2791 elif isinstance(dest["/Page"], int):
2792 # the page reference is a page number normally not a PDF Reference
2793 # page numbers as int are normally accepted only in external goto
2794 try:
2795 p = reader.pages[dest["/Page"]]
2796 except IndexError:
2797 return
2798 assert p.indirect_reference is not None
2799 try:
2800 arr[NumberObject(0)] = NumberObject(
2801 srcpages[p.indirect_reference.idnum].page_number
2802 )
2803 self.add_named_destination_array(dest["/Title"], arr)
2804 except KeyError:
2805 pass
2806 elif dest["/Page"].indirect_reference.idnum in srcpages:
2807 arr[NumberObject(0)] = srcpages[
2808 dest["/Page"].indirect_reference.idnum
2809 ].indirect_reference
2810 self.add_named_destination_array(dest["/Title"], arr)
2812 def _add_articles_thread(
2813 self,
2814 thread: DictionaryObject, # thread entry from the reader's array of threads
2815 pages: dict[int, PageObject],
2816 reader: PdfReader,
2817 ) -> IndirectObject:
2818 """
2819 Clone the thread with only the applicable articles.
2821 Args:
2822 thread:
2823 pages:
2824 reader:
2826 Returns:
2827 The added thread as an indirect reference
2829 """
2830 nthread = thread.clone(
2831 self, force_duplicate=True, ignore_fields=("/F",)
2832 ) # use of clone to keep link between reader and writer
2833 self.threads.append(nthread.indirect_reference)
2834 first_article = cast("DictionaryObject", thread["/F"])
2835 current_article: Optional[DictionaryObject] = first_article
2836 new_article: Optional[DictionaryObject] = None
2837 while current_article is not None:
2838 pag = self._get_cloned_page(
2839 cast("PageObject", current_article["/P"]), pages, reader
2840 )
2841 if pag is not None:
2842 if new_article is None:
2843 new_article = cast(
2844 "DictionaryObject",
2845 self._add_object(DictionaryObject()).get_object(),
2846 )
2847 new_first = new_article
2848 nthread[NameObject("/F")] = new_article.indirect_reference
2849 else:
2850 new_article2 = cast(
2851 "DictionaryObject",
2852 self._add_object(
2853 DictionaryObject(
2854 {NameObject("/V"): new_article.indirect_reference}
2855 )
2856 ).get_object(),
2857 )
2858 new_article[NameObject("/N")] = new_article2.indirect_reference
2859 new_article = new_article2
2860 new_article[NameObject("/P")] = pag
2861 new_article[NameObject("/T")] = nthread.indirect_reference
2862 new_article[NameObject("/R")] = current_article["/R"]
2863 pag_obj = cast("PageObject", pag.get_object())
2864 if "/B" not in pag_obj:
2865 pag_obj[NameObject("/B")] = ArrayObject()
2866 cast("ArrayObject", pag_obj["/B"]).append(
2867 new_article.indirect_reference
2868 )
2869 current_article = cast("DictionaryObject", current_article["/N"])
2870 if current_article == first_article:
2871 new_article[NameObject("/N")] = new_first.indirect_reference # type: ignore
2872 new_first[NameObject("/V")] = new_article.indirect_reference # type: ignore
2873 current_article = None
2874 assert nthread.indirect_reference is not None
2875 return nthread.indirect_reference
2877 def add_filtered_articles(
2878 self,
2879 fltr: Union[
2880 Pattern[Any], str
2881 ], # thread entry from the reader's array of threads
2882 pages: dict[int, PageObject],
2883 reader: PdfReader,
2884 ) -> None:
2885 """
2886 Add articles matching the defined criteria.
2888 Args:
2889 fltr:
2890 pages:
2891 reader:
2893 """
2894 if isinstance(fltr, str):
2895 fltr = re.compile(fltr)
2896 elif not isinstance(fltr, Pattern):
2897 fltr = re.compile("")
2898 for p in pages.values():
2899 pp = p.original_page
2900 for a in pp.get("/B", ()):
2901 a_obj = a.get_object()
2902 if is_null_or_none(a_obj):
2903 continue
2904 thr = a_obj.get("/T")
2905 if thr is None:
2906 continue
2907 thr = thr.get_object()
2908 if thr.indirect_reference.idnum not in self._id_translated[
2909 id(reader)
2910 ] and fltr.search((thr.get("/I", {})).get("/Title", "")):
2911 self._add_articles_thread(thr, pages, reader)
2913 def _get_cloned_page(
2914 self,
2915 page: Union[None, IndirectObject, PageObject, NullObject],
2916 pages: dict[int, PageObject],
2917 reader: PdfReader,
2918 ) -> Optional[IndirectObject]:
2919 if isinstance(page, NullObject):
2920 return None
2921 if isinstance(page, DictionaryObject) and page.get("/Type", "") == "/Page":
2922 _i = page.indirect_reference
2923 elif isinstance(page, IndirectObject):
2924 _i = page
2925 try:
2926 return pages[_i.idnum].indirect_reference # type: ignore
2927 except Exception:
2928 return None
2930 def _insert_filtered_annotations(
2931 self,
2932 annots: Union[IndirectObject, list[DictionaryObject], None],
2933 page: PageObject,
2934 pages: dict[int, PageObject],
2935 reader: PdfReader,
2936 ) -> list[Destination]:
2937 outlist = ArrayObject()
2938 if isinstance(annots, IndirectObject):
2939 annots = cast("list[Any]", annots.get_object())
2940 if annots is None:
2941 return outlist
2942 if not isinstance(annots, list):
2943 logger_warning(f"Expected list of annotations, got {annots} of type {annots.__class__.__name__}.", __name__)
2944 return outlist
2945 for an in annots:
2946 ano = cast("DictionaryObject", an.get_object())
2947 if (
2948 ano["/Subtype"] != "/Link"
2949 or "/A" not in ano
2950 or cast("DictionaryObject", ano["/A"])["/S"] != "/GoTo"
2951 or "/Dest" in ano
2952 ):
2953 if "/Dest" not in ano:
2954 outlist.append(self._add_object(ano.clone(self)))
2955 else:
2956 d = ano["/Dest"]
2957 if isinstance(d, str):
2958 # it is a named dest
2959 if str(d) in self.get_named_dest_root():
2960 outlist.append(ano.clone(self).indirect_reference)
2961 else:
2962 d = cast("ArrayObject", d)
2963 p = self._get_cloned_page(d[0], pages, reader)
2964 if p is not None:
2965 anc = ano.clone(self, ignore_fields=("/Dest",))
2966 anc[NameObject("/Dest")] = ArrayObject([p, *d[1:]])
2967 outlist.append(self._add_object(anc))
2968 else:
2969 d = cast("DictionaryObject", ano["/A"]).get("/D", NullObject())
2970 if is_null_or_none(d):
2971 continue
2972 if isinstance(d, str):
2973 # it is a named dest
2974 if str(d) in self.get_named_dest_root():
2975 outlist.append(ano.clone(self).indirect_reference)
2976 else:
2977 d = cast("ArrayObject", d)
2978 p = self._get_cloned_page(d[0], pages, reader)
2979 if p is not None:
2980 anc = ano.clone(self, ignore_fields=("/D",))
2981 cast("DictionaryObject", anc["/A"])[
2982 NameObject("/D")
2983 ] = ArrayObject([p, *d[1:]])
2984 outlist.append(self._add_object(anc))
2985 return outlist
2987 def _get_filtered_outline(
2988 self,
2989 node: Any,
2990 pages: dict[int, PageObject],
2991 reader: PdfReader,
2992 ) -> list[Destination]:
2993 """
2994 Extract outline item entries that are part of the specified page set.
2996 Args:
2997 node:
2998 pages:
2999 reader:
3001 Returns:
3002 A list of destination objects.
3004 """
3005 new_outline = []
3006 if node is None:
3007 node = NullObject()
3008 node = node.get_object()
3009 if is_null_or_none(node):
3010 node = DictionaryObject()
3011 if node.get("/Type", "") == "/Outlines" or "/Title" not in node:
3012 node = node.get("/First", None)
3013 if node is not None:
3014 node = node.get_object()
3015 new_outline += self._get_filtered_outline(node, pages, reader)
3016 else:
3017 v: Union[None, IndirectObject, NullObject]
3018 while node is not None:
3019 node = node.get_object()
3020 o = cast("Destination", reader._build_outline_item(node))
3021 v = self._get_cloned_page(cast("PageObject", o["/Page"]), pages, reader)
3022 if v is None:
3023 v = NullObject()
3024 o[NameObject("/Page")] = v
3025 if "/First" in node:
3026 o._filtered_children = self._get_filtered_outline(
3027 node["/First"], pages, reader
3028 )
3029 else:
3030 o._filtered_children = []
3031 if (
3032 not isinstance(o["/Page"], NullObject)
3033 or len(o._filtered_children) > 0
3034 ):
3035 new_outline.append(o)
3036 node = node.get("/Next", None)
3037 return new_outline
3039 def _clone_outline(self, dest: Destination) -> TreeObject:
3040 n_ol = TreeObject()
3041 self._add_object(n_ol)
3042 n_ol[NameObject("/Title")] = TextStringObject(dest["/Title"])
3043 if not isinstance(dest["/Page"], NullObject):
3044 if dest.node is not None and "/A" in dest.node:
3045 n_ol[NameObject("/A")] = dest.node["/A"].clone(self)
3046 else:
3047 n_ol[NameObject("/Dest")] = dest.dest_array
3048 # TODO: /SE
3049 if dest.node is not None:
3050 n_ol[NameObject("/F")] = NumberObject(dest.node.get("/F", 0))
3051 n_ol[NameObject("/C")] = ArrayObject(
3052 dest.node.get(
3053 "/C", [FloatObject(0.0), FloatObject(0.0), FloatObject(0.0)]
3054 )
3055 )
3056 return n_ol
3058 def _insert_filtered_outline(
3059 self,
3060 outlines: list[Destination],
3061 parent: Union[TreeObject, IndirectObject],
3062 before: Union[None, TreeObject, IndirectObject] = None,
3063 ) -> None:
3064 for dest in outlines:
3065 # TODO: can be improved to keep A and SE entries (ignored for the moment)
3066 # with np=self.add_outline_item_destination(dest,parent,before)
3067 if dest.get("/Type", "") == "/Outlines" or "/Title" not in dest:
3068 np = parent
3069 else:
3070 np = self._clone_outline(dest)
3071 cast(TreeObject, parent.get_object()).insert_child(np, before, self)
3072 self._insert_filtered_outline(dest._filtered_children, np, None)
3074 def close(self) -> None:
3075 """Implemented for API harmonization."""
3076 return
3078 def find_outline_item(
3079 self,
3080 outline_item: dict[str, Any],
3081 root: Optional[OutlineType] = None,
3082 ) -> Optional[list[int]]:
3083 if root is None:
3084 o = self.get_outline_root()
3085 else:
3086 o = cast("TreeObject", root)
3088 i = 0
3089 while o is not None:
3090 if (
3091 o.indirect_reference == outline_item
3092 or o.get("/Title", None) == outline_item
3093 ):
3094 return [i]
3095 if "/First" in o:
3096 res = self.find_outline_item(
3097 outline_item, cast(OutlineType, o["/First"])
3098 )
3099 if res:
3100 return ([i] if "/Title" in o else []) + res
3101 if "/Next" in o:
3102 i += 1
3103 o = cast(TreeObject, o["/Next"])
3104 else:
3105 return None
3106 raise PyPdfError("This line is theoretically unreachable.") # pragma: no cover
3108 def reset_translation(
3109 self, reader: Union[None, PdfReader, IndirectObject] = None
3110 ) -> None:
3111 """
3112 Reset the translation table between reader and the writer object.
3114 Late cloning will create new independent objects.
3116 Args:
3117 reader: PdfReader or IndirectObject referencing a PdfReader object.
3118 if set to None or omitted, all tables will be reset.
3120 """
3121 if reader is None:
3122 self._id_translated = {}
3123 elif isinstance(reader, PdfReader):
3124 try:
3125 del self._id_translated[id(reader)]
3126 except Exception:
3127 pass
3128 elif isinstance(reader, IndirectObject):
3129 try:
3130 del self._id_translated[id(reader.pdf)]
3131 except Exception:
3132 pass
3133 else:
3134 raise Exception("invalid parameter {reader}")
3136 def set_page_label(
3137 self,
3138 page_index_from: int,
3139 page_index_to: int,
3140 style: Optional[PageLabelStyle] = None,
3141 prefix: Optional[str] = None,
3142 start: Optional[int] = 0,
3143 ) -> None:
3144 """
3145 Set a page label to a range of pages.
3147 Page indexes must be given starting from 0.
3148 Labels must have a style, a prefix or both.
3149 If a range is not assigned any page label, a decimal label starting from 1 is applied.
3151 Args:
3152 page_index_from: page index of the beginning of the range starting from 0
3153 page_index_to: page index of the beginning of the range starting from 0
3154 style: The numbering style to be used for the numeric portion of each page label:
3156 * ``/D`` Decimal Arabic numerals
3157 * ``/R`` Uppercase Roman numerals
3158 * ``/r`` Lowercase Roman numerals
3159 * ``/A`` Uppercase letters (A to Z for the first 26 pages,
3160 AA to ZZ for the next 26, and so on)
3161 * ``/a`` Lowercase letters (a to z for the first 26 pages,
3162 aa to zz for the next 26, and so on)
3164 prefix: The label prefix for page labels in this range.
3165 start: The value of the numeric portion for the first page label
3166 in the range.
3167 Subsequent pages are numbered sequentially from this value,
3168 which must be greater than or equal to 1.
3169 Default value: 1.
3171 """
3172 if style is None and prefix is None:
3173 raise ValueError("At least one of style and prefix must be given")
3174 if page_index_from < 0:
3175 raise ValueError("page_index_from must be greater or equal than 0")
3176 if page_index_to < page_index_from:
3177 raise ValueError(
3178 "page_index_to must be greater or equal than page_index_from"
3179 )
3180 if page_index_to >= len(self.pages):
3181 raise ValueError("page_index_to exceeds number of pages")
3182 if start is not None and start != 0 and start < 1:
3183 raise ValueError("If given, start must be greater or equal than one")
3185 self._set_page_label(page_index_from, page_index_to, style, prefix, start)
3187 def _set_page_label(
3188 self,
3189 page_index_from: int,
3190 page_index_to: int,
3191 style: Optional[PageLabelStyle] = None,
3192 prefix: Optional[str] = None,
3193 start: Optional[int] = 0,
3194 ) -> None:
3195 """
3196 Set a page label to a range of pages.
3198 Page indexes must be given starting from 0.
3199 Labels must have a style, a prefix or both.
3200 If a range is not assigned any page label a decimal label starting from 1 is applied.
3202 Args:
3203 page_index_from: page index of the beginning of the range starting from 0
3204 page_index_to: page index of the beginning of the range starting from 0
3205 style: The numbering style to be used for the numeric portion of each page label:
3206 /D Decimal Arabic numerals
3207 /R Uppercase Roman numerals
3208 /r Lowercase Roman numerals
3209 /A Uppercase letters (A to Z for the first 26 pages,
3210 AA to ZZ for the next 26, and so on)
3211 /a Lowercase letters (a to z for the first 26 pages,
3212 aa to zz for the next 26, and so on)
3213 prefix: The label prefix for page labels in this range.
3214 start: The value of the numeric portion for the first page label
3215 in the range.
3216 Subsequent pages are numbered sequentially from this value,
3217 which must be greater than or equal to 1. Default value: 1.
3219 """
3220 default_page_label = DictionaryObject()
3221 default_page_label[NameObject("/S")] = NameObject("/D")
3223 new_page_label = DictionaryObject()
3224 if style is not None:
3225 new_page_label[NameObject("/S")] = NameObject(style)
3226 if prefix is not None:
3227 new_page_label[NameObject("/P")] = TextStringObject(prefix)
3228 if start != 0:
3229 new_page_label[NameObject("/St")] = NumberObject(start)
3231 if NameObject(CatalogDictionary.PAGE_LABELS) not in self._root_object:
3232 nums = ArrayObject()
3233 nums_insert(NumberObject(0), default_page_label, nums)
3234 page_labels = TreeObject()
3235 page_labels[NameObject("/Nums")] = nums
3236 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3238 page_labels = cast(
3239 TreeObject, self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)]
3240 )
3241 nums = cast(ArrayObject, page_labels[NameObject("/Nums")])
3243 nums_insert(NumberObject(page_index_from), new_page_label, nums)
3244 nums_clear_range(NumberObject(page_index_from), page_index_to, nums)
3245 next_label_pos, *_ = nums_next(NumberObject(page_index_from), nums)
3246 if next_label_pos != page_index_to + 1 and page_index_to + 1 < len(self.pages):
3247 nums_insert(NumberObject(page_index_to + 1), default_page_label, nums)
3249 page_labels[NameObject("/Nums")] = nums
3250 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3252 def _repr_mimebundle_(
3253 self,
3254 include: Union[None, Iterable[str]] = None,
3255 exclude: Union[None, Iterable[str]] = None,
3256 ) -> dict[str, Any]:
3257 """
3258 Integration into Jupyter Notebooks.
3260 This method returns a dictionary that maps a mime-type to its
3261 representation.
3263 .. seealso::
3265 https://ipython.readthedocs.io/en/stable/config/integrating.html
3266 """
3267 pdf_data = BytesIO()
3268 self.write(pdf_data)
3269 data = {
3270 "application/pdf": pdf_data,
3271 }
3273 if include is not None:
3274 # Filter representations based on include list
3275 data = {k: v for k, v in data.items() if k in include}
3277 if exclude is not None:
3278 # Remove representations based on exclude list
3279 data = {k: v for k, v in data.items() if k not in exclude}
3281 return data
3284def _pdf_objectify(obj: Union[dict[str, Any], str, float, list[Any]]) -> PdfObject:
3285 if isinstance(obj, PdfObject):
3286 return obj
3287 if isinstance(obj, dict):
3288 to_add = DictionaryObject()
3289 for key, value in obj.items():
3290 to_add[NameObject(key)] = _pdf_objectify(value)
3291 return to_add
3292 if isinstance(obj, str):
3293 if obj.startswith("/"):
3294 return NameObject(obj)
3295 return TextStringObject(obj)
3296 if isinstance(obj, (float, int)):
3297 return FloatObject(obj)
3298 if isinstance(obj, list):
3299 return ArrayObject(_pdf_objectify(i) for i in obj)
3300 raise NotImplementedError(
3301 f"{type(obj)=} could not be cast to a PdfObject"
3302 )
3305def _create_outline_item(
3306 action_ref: Union[None, IndirectObject],
3307 title: str,
3308 color: Union[tuple[float, float, float], str, None],
3309 italic: bool,
3310 bold: bool,
3311) -> TreeObject:
3312 outline_item = TreeObject()
3313 if action_ref is not None:
3314 outline_item[NameObject("/A")] = action_ref
3315 outline_item.update(
3316 {
3317 NameObject("/Title"): create_string_object(title),
3318 }
3319 )
3320 if color:
3321 if isinstance(color, str):
3322 color = hex_to_rgb(color)
3323 outline_item.update(
3324 {NameObject("/C"): ArrayObject([FloatObject(c) for c in color])}
3325 )
3326 if italic or bold:
3327 format_flag = 0
3328 if italic:
3329 format_flag += OutlineFontFlag.italic
3330 if bold:
3331 format_flag += OutlineFontFlag.bold
3332 outline_item.update({NameObject("/F"): NumberObject(format_flag)})
3333 return outline_item