Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_writer.py: 21%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com>
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15# * The name of the author may not be used to endorse or promote products
16# derived from this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
30import decimal
31import enum
32import hashlib
33import re
34import struct
35import uuid
36from collections.abc import Iterable, Mapping
37from io import BytesIO, FileIO, IOBase
38from itertools import compress
39from pathlib import Path
40from re import Pattern
41from types import TracebackType
42from typing import (
43 IO,
44 Any,
45 Callable,
46 Optional,
47 Union,
48 cast,
49)
51from ._doc_common import DocumentInformation, PdfDocCommon
52from ._encryption import EncryptAlgorithm, Encryption
53from ._page import PageObject, Transformation
54from ._page_labels import nums_clear_range, nums_insert, nums_next
55from ._reader import PdfReader
56from ._utils import (
57 StrByteType,
58 StreamType,
59 _get_max_pdf_version_header,
60 deprecation_no_replacement,
61 logger_warning,
62)
63from .constants import AnnotationDictionaryAttributes as AA
64from .constants import CatalogAttributes as CA
65from .constants import (
66 CatalogDictionary,
67 GoToActionArguments,
68 ImageType,
69 InteractiveFormDictEntries,
70 OutlineFontFlag,
71 PageLabelStyle,
72 PagesAttributes,
73 TypFitArguments,
74 UserAccessPermissions,
75)
76from .constants import Core as CO
77from .constants import FieldDictionaryAttributes as FA
78from .constants import PageAttributes as PG
79from .constants import TrailerKeys as TK
80from .errors import PdfReadError, PyPdfError
81from .generic import (
82 PAGE_FIT,
83 ArrayObject,
84 BooleanObject,
85 ByteStringObject,
86 ContentStream,
87 Destination,
88 DictionaryObject,
89 EmbeddedFile,
90 Fit,
91 FloatObject,
92 IndirectObject,
93 NameObject,
94 NullObject,
95 NumberObject,
96 PdfObject,
97 RectangleObject,
98 ReferenceLink,
99 StreamObject,
100 TextStringObject,
101 TreeObject,
102 ViewerPreferences,
103 create_string_object,
104 extract_links,
105 hex_to_rgb,
106 is_null_or_none,
107)
108from .generic._appearance_stream import TextStreamAppearance
109from .pagerange import PageRange, PageRangeSpec
110from .types import (
111 AnnotationSubtype,
112 BorderArrayType,
113 LayoutType,
114 OutlineItemType,
115 OutlineType,
116 PagemodeType,
117)
118from .xmp import XmpInformation
120ALL_DOCUMENT_PERMISSIONS = UserAccessPermissions.all()
123class ObjectDeletionFlag(enum.IntFlag):
124 NONE = 0
125 TEXT = enum.auto()
126 LINKS = enum.auto()
127 ATTACHMENTS = enum.auto()
128 OBJECTS_3D = enum.auto()
129 ALL_ANNOTATIONS = enum.auto()
130 XOBJECT_IMAGES = enum.auto()
131 INLINE_IMAGES = enum.auto()
132 DRAWING_IMAGES = enum.auto()
133 IMAGES = XOBJECT_IMAGES | INLINE_IMAGES | DRAWING_IMAGES
136def _rolling_checksum(stream: BytesIO, blocksize: int = 65536) -> str:
137 hash = hashlib.md5(usedforsecurity=False)
138 for block in iter(lambda: stream.read(blocksize), b""):
139 hash.update(block)
140 return hash.hexdigest()
143class PdfWriter(PdfDocCommon):
144 """
145 Write a PDF file out, given pages produced by another class or through
146 cloning a PDF file during initialization.
148 Typically data is added from a :class:`PdfReader<pypdf.PdfReader>`.
150 Args:
151 clone_from: identical to fileobj (for compatibility)
153 incremental: If true, loads the document and set the PdfWriter in incremental mode.
155 When writing incrementally, the original document is written first and new/modified
156 content is appended. To be used for signed document/forms to keep signature valid.
158 full: If true, loads all the objects (always full if incremental = True).
159 This parameter may allow loading large PDFs.
161 strict: If true, pypdf will raise an exception if a PDF does not follow the specification.
162 If false, pypdf will try to be forgiving and do something reasonable, but it will log
163 a warning message. It is a best-effort approach.
165 """
167 def __init__(
168 self,
169 fileobj: Union[None, PdfReader, StrByteType, Path] = "",
170 clone_from: Union[None, PdfReader, StrByteType, Path] = None,
171 incremental: bool = False,
172 full: bool = False,
173 strict: bool = False,
174 ) -> None:
175 self.strict = strict
176 """
177 If true, pypdf will raise an exception if a PDF does not follow the specification.
178 If false, pypdf will try to be forgiving and do something reasonable, but it will log
179 a warning message. It is a best-effort approach.
180 """
182 self.incremental = incremental or full
183 """
184 Returns if the PdfWriter object has been started in incremental mode.
185 """
187 self._objects: list[Optional[PdfObject]] = []
188 """
189 The indirect objects in the PDF.
190 For the incremental case, it will be filled with None
191 in clone_reader_document_root.
192 """
194 self._original_hash: list[int] = []
195 """
196 List of hashes after import; used to identify changes.
197 """
199 self._idnum_hash: dict[bytes, tuple[IndirectObject, list[IndirectObject]]] = {}
200 """
201 Maps hash values of indirect objects to the list of IndirectObjects.
202 This is used for compression.
203 """
205 self._id_translated: dict[int, dict[int, int]] = {}
206 """List of already translated IDs.
207 dict[id(pdf)][(idnum, generation)]
208 """
210 self._info_obj: Optional[PdfObject]
211 """The PDF files's document information dictionary,
212 the Info entry in the PDF file's trailer dictionary."""
214 self._ID: Union[ArrayObject, None] = None
215 """The PDF file identifier,
216 defined by the ID in the PDF file's trailer dictionary."""
218 self._unresolved_links: list[tuple[ReferenceLink, ReferenceLink]] = []
219 "Tracks links in pages added to the writer for resolving later."
220 self._merged_in_pages: dict[Optional[IndirectObject], Optional[IndirectObject]] = {}
221 "Tracks pages added to the writer and what page they turned into."
223 if self.incremental:
224 if isinstance(fileobj, (str, Path)):
225 with open(fileobj, "rb") as f:
226 fileobj = BytesIO(f.read(-1))
227 if isinstance(fileobj, BytesIO):
228 fileobj = PdfReader(fileobj)
229 if not isinstance(fileobj, PdfReader):
230 raise PyPdfError("Invalid type for incremental mode")
231 self._reader = fileobj # prev content is in _reader.stream
232 self._header = fileobj.pdf_header.encode()
233 self._readonly = True # TODO: to be analysed
234 else:
235 self._header = b"%PDF-1.3"
236 self._info_obj = self._add_object(
237 DictionaryObject(
238 {NameObject("/Producer"): create_string_object("pypdf")}
239 )
240 )
242 def _get_clone_from(
243 fileobj: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
244 clone_from: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
245 ) -> Union[None, PdfReader, str, Path, IO[Any], BytesIO]:
246 if isinstance(fileobj, (str, Path, IO, BytesIO)) and (
247 fileobj == "" or clone_from is not None
248 ):
249 return clone_from
250 cloning = True
251 if isinstance(fileobj, (str, Path)) and (
252 not Path(str(fileobj)).exists()
253 or Path(str(fileobj)).stat().st_size == 0
254 ):
255 cloning = False
256 if isinstance(fileobj, (IOBase, BytesIO)):
257 t = fileobj.tell()
258 if fileobj.seek(0, 2) == 0:
259 cloning = False
260 fileobj.seek(t, 0)
261 if cloning:
262 clone_from = fileobj
263 return clone_from
265 clone_from = _get_clone_from(fileobj, clone_from)
266 # To prevent overwriting
267 self.temp_fileobj = fileobj
268 self.fileobj = ""
269 self._with_as_usage = False
270 self._cloned = False
271 # The root of our page tree node
272 pages = DictionaryObject(
273 {
274 NameObject(PagesAttributes.TYPE): NameObject("/Pages"),
275 NameObject(PagesAttributes.COUNT): NumberObject(0),
276 NameObject(PagesAttributes.KIDS): ArrayObject(),
277 }
278 )
279 self.flattened_pages = []
280 self._encryption: Optional[Encryption] = None
281 self._encrypt_entry: Optional[DictionaryObject] = None
283 if clone_from is not None:
284 if not isinstance(clone_from, PdfReader):
285 clone_from = PdfReader(clone_from)
286 self.clone_document_from_reader(clone_from)
287 self._cloned = True
288 else:
289 self._pages = self._add_object(pages)
290 self._root_object = DictionaryObject(
291 {
292 NameObject(PagesAttributes.TYPE): NameObject(CO.CATALOG),
293 NameObject(CO.PAGES): self._pages,
294 }
295 )
296 self._add_object(self._root_object)
297 if full and not incremental:
298 self.incremental = False
299 if isinstance(self._ID, list):
300 if isinstance(self._ID[0], TextStringObject):
301 self._ID[0] = ByteStringObject(self._ID[0].get_original_bytes())
302 if isinstance(self._ID[1], TextStringObject):
303 self._ID[1] = ByteStringObject(self._ID[1].get_original_bytes())
305 # for commonality
306 @property
307 def is_encrypted(self) -> bool:
308 """
309 Read-only boolean property showing whether this PDF file is encrypted.
311 Note that this property, if true, will remain true even after the
312 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
313 """
314 return False
316 @property
317 def root_object(self) -> DictionaryObject:
318 """
319 Provide direct access to PDF Structure.
321 Note:
322 Recommended only for read access.
324 """
325 return self._root_object
327 @property
328 def _info(self) -> Optional[DictionaryObject]:
329 """
330 Provide access to "/Info". Standardized with PdfReader.
332 Returns:
333 /Info Dictionary; None if the entry does not exist
335 """
336 return (
337 None
338 if self._info_obj is None
339 else cast(DictionaryObject, self._info_obj.get_object())
340 )
342 @_info.setter
343 def _info(self, value: Optional[Union[IndirectObject, DictionaryObject]]) -> None:
344 if value is None:
345 try:
346 self._objects[self._info_obj.indirect_reference.idnum - 1] = None # type: ignore
347 except (KeyError, AttributeError):
348 pass
349 self._info_obj = None
350 else:
351 if self._info_obj is None:
352 self._info_obj = self._add_object(DictionaryObject())
353 obj = cast(DictionaryObject, self._info_obj.get_object())
354 obj.clear()
355 obj.update(cast(DictionaryObject, value.get_object()))
357 @property
358 def xmp_metadata(self) -> Optional[XmpInformation]:
359 """XMP (Extensible Metadata Platform) data."""
360 return cast(XmpInformation, self.root_object.xmp_metadata)
362 @xmp_metadata.setter
363 def xmp_metadata(self, value: Union[XmpInformation, bytes, None]) -> None:
364 """XMP (Extensible Metadata Platform) data."""
365 if value is None:
366 if "/Metadata" in self.root_object:
367 del self.root_object["/Metadata"]
368 return
370 metadata = self.root_object.get("/Metadata", None)
371 if not isinstance(metadata, IndirectObject):
372 if metadata is not None:
373 del self.root_object["/Metadata"]
374 metadata_stream = StreamObject()
375 stream_reference = self._add_object(metadata_stream)
376 self.root_object[NameObject("/Metadata")] = stream_reference
377 else:
378 metadata_stream = cast(StreamObject, metadata.get_object())
380 if isinstance(value, XmpInformation):
381 bytes_data = value.stream.get_data()
382 else:
383 bytes_data = value
384 metadata_stream.set_data(bytes_data)
386 @property
387 def with_as_usage(self) -> bool:
388 deprecation_no_replacement("with_as_usage", "5.0")
389 return self._with_as_usage
391 @with_as_usage.setter
392 def with_as_usage(self, value: bool) -> None:
393 deprecation_no_replacement("with_as_usage", "5.0")
394 self._with_as_usage = value
396 def __enter__(self) -> "PdfWriter":
397 """Store how writer is initialized by 'with'."""
398 c: bool = self._cloned
399 t = self.temp_fileobj
400 self.__init__() # type: ignore
401 self._cloned = c
402 self._with_as_usage = True
403 self.fileobj = t # type: ignore
404 return self
406 def __exit__(
407 self,
408 exc_type: Optional[type[BaseException]],
409 exc: Optional[BaseException],
410 traceback: Optional[TracebackType],
411 ) -> None:
412 """Write data to the fileobj."""
413 if self.fileobj and not self._cloned:
414 self.write(self.fileobj)
416 @property
417 def pdf_header(self) -> str:
418 """
419 Read/Write property of the PDF header that is written.
421 This should be something like ``'%PDF-1.5'``. It is recommended to set
422 the lowest version that supports all features which are used within the
423 PDF file.
425 Note: `pdf_header` returns a string but accepts bytes or str for writing
426 """
427 return self._header.decode()
429 @pdf_header.setter
430 def pdf_header(self, new_header: Union[str, bytes]) -> None:
431 if isinstance(new_header, str):
432 new_header = new_header.encode()
433 self._header = new_header
435 def _add_object(self, obj: PdfObject) -> IndirectObject:
436 if (
437 getattr(obj, "indirect_reference", None) is not None
438 and obj.indirect_reference.pdf == self # type: ignore
439 ):
440 return obj.indirect_reference # type: ignore
441 # check for /Contents in Pages (/Contents in annotations are strings)
442 if isinstance(obj, DictionaryObject) and isinstance(
443 obj.get(PG.CONTENTS, None), (ArrayObject, DictionaryObject)
444 ):
445 obj[NameObject(PG.CONTENTS)] = self._add_object(obj[PG.CONTENTS])
446 self._objects.append(obj)
447 obj.indirect_reference = IndirectObject(len(self._objects), 0, self)
448 return obj.indirect_reference
450 def get_object(
451 self,
452 indirect_reference: Union[int, IndirectObject],
453 ) -> PdfObject:
454 if isinstance(indirect_reference, int):
455 obj = self._objects[indirect_reference - 1]
456 elif indirect_reference.pdf != self:
457 raise ValueError("PDF must be self")
458 else:
459 obj = self._objects[indirect_reference.idnum - 1]
460 assert obj is not None, "mypy"
461 return obj
463 def _replace_object(
464 self,
465 indirect_reference: Union[int, IndirectObject],
466 obj: PdfObject,
467 ) -> PdfObject:
468 if isinstance(indirect_reference, IndirectObject):
469 if indirect_reference.pdf != self:
470 raise ValueError("PDF must be self")
471 indirect_reference = indirect_reference.idnum
472 gen = self._objects[indirect_reference - 1].indirect_reference.generation # type: ignore
473 if (
474 getattr(obj, "indirect_reference", None) is not None
475 and obj.indirect_reference.pdf != self # type: ignore
476 ):
477 obj = obj.clone(self)
478 self._objects[indirect_reference - 1] = obj
479 obj.indirect_reference = IndirectObject(indirect_reference, gen, self)
481 assert isinstance(obj, PdfObject), "mypy"
482 return obj
484 def _add_page(
485 self,
486 page: PageObject,
487 index: int,
488 excluded_keys: Iterable[str] = (),
489 ) -> PageObject:
490 if not isinstance(page, PageObject) or page.get(PagesAttributes.TYPE, None) != CO.PAGE:
491 raise ValueError("Invalid page object")
492 assert self.flattened_pages is not None, "for mypy"
493 page_org = page
494 excluded_keys = list(excluded_keys)
495 excluded_keys += [PagesAttributes.PARENT, "/StructParents"]
496 # Acrobat does not accept two indirect references pointing on the same
497 # page; therefore in order to add multiple copies of the same
498 # page, we need to create a new dictionary for the page, however the
499 # objects below (including content) are not duplicated:
500 try: # delete an already existing page
501 del self._id_translated[id(page_org.indirect_reference.pdf)][ # type: ignore
502 page_org.indirect_reference.idnum # type: ignore
503 ]
504 except Exception:
505 pass
507 page = cast(
508 "PageObject", page_org.clone(self, False, excluded_keys).get_object()
509 )
510 if page_org.pdf is not None:
511 other = page_org.pdf.pdf_header
512 self.pdf_header = _get_max_pdf_version_header(self.pdf_header, other)
514 node, idx = self._get_page_in_node(index)
515 page[NameObject(PagesAttributes.PARENT)] = node.indirect_reference
517 if idx >= 0:
518 cast(ArrayObject, node[PagesAttributes.KIDS]).insert(idx, page.indirect_reference)
519 self.flattened_pages.insert(index, page)
520 else:
521 cast(ArrayObject, node[PagesAttributes.KIDS]).append(page.indirect_reference)
522 self.flattened_pages.append(page)
523 recurse = 0
524 while not is_null_or_none(node):
525 node = cast(DictionaryObject, node.get_object())
526 node[NameObject(PagesAttributes.COUNT)] = NumberObject(cast(int, node[PagesAttributes.COUNT]) + 1)
527 node = node.get(PagesAttributes.PARENT, None) # type: ignore[assignment] # TODO: Fix.
528 recurse += 1
529 if recurse > 1000:
530 raise PyPdfError("Too many recursive calls!")
532 if page_org.pdf is not None:
533 # the page may contain links to other pages, and those other
534 # pages may or may not already be added. we store the
535 # information we need, so that we can resolve the references
536 # later.
537 self._unresolved_links.extend(extract_links(page, page_org))
538 self._merged_in_pages[page_org.indirect_reference] = page.indirect_reference
540 return page
542 def set_need_appearances_writer(self, state: bool = True) -> None:
543 """
544 Sets the "NeedAppearances" flag in the PDF writer.
546 The "NeedAppearances" flag indicates whether the appearance dictionary
547 for form fields should be automatically generated by the PDF viewer or
548 if the embedded appearance should be used.
550 Args:
551 state: The actual value of the NeedAppearances flag.
553 Returns:
554 None
556 """
557 # See §12.7.2 and §7.7.2 for more information:
558 # https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
559 try:
560 # get the AcroForm tree
561 if CatalogDictionary.ACRO_FORM not in self._root_object:
562 self._root_object[
563 NameObject(CatalogDictionary.ACRO_FORM)
564 ] = self._add_object(DictionaryObject())
566 need_appearances = NameObject(InteractiveFormDictEntries.NeedAppearances)
567 cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])[
568 need_appearances
569 ] = BooleanObject(state)
570 except Exception as exc: # pragma: no cover
571 logger_warning(
572 f"set_need_appearances_writer({state}) catch : {exc}", __name__
573 )
575 def create_viewer_preferences(self) -> ViewerPreferences:
576 o = ViewerPreferences()
577 self._root_object[
578 NameObject(CatalogDictionary.VIEWER_PREFERENCES)
579 ] = self._add_object(o)
580 return o
582 def add_page(
583 self,
584 page: PageObject,
585 excluded_keys: Iterable[str] = (),
586 ) -> PageObject:
587 """
588 Add a page to this PDF file.
590 Recommended for advanced usage including the adequate excluded_keys.
592 The page is usually acquired from a :class:`PdfReader<pypdf.PdfReader>`
593 instance.
595 Args:
596 page: The page to add to the document. Should be
597 an instance of :class:`PageObject<pypdf._page.PageObject>`
598 excluded_keys:
600 Returns:
601 The added PageObject.
603 """
604 assert self.flattened_pages is not None, "mypy"
605 return self._add_page(page, len(self.flattened_pages), excluded_keys)
607 def insert_page(
608 self,
609 page: PageObject,
610 index: int = 0,
611 excluded_keys: Iterable[str] = (),
612 ) -> PageObject:
613 """
614 Insert a page in this PDF file. The page is usually acquired from a
615 :class:`PdfReader<pypdf.PdfReader>` instance.
617 Args:
618 page: The page to add to the document.
619 index: Position at which the page will be inserted.
620 excluded_keys:
622 Returns:
623 The added PageObject.
625 """
626 assert self.flattened_pages is not None, "mypy"
627 if index < 0:
628 index = len(self.flattened_pages) + index
629 if index < 0:
630 raise ValueError("Invalid index value")
631 if index >= len(self.flattened_pages):
632 return self.add_page(page, excluded_keys)
633 return self._add_page(page, index, excluded_keys)
635 def _get_page_number_by_indirect(
636 self, indirect_reference: Union[None, int, NullObject, IndirectObject]
637 ) -> Optional[int]:
638 """
639 Generate _page_id2num.
641 Args:
642 indirect_reference:
644 Returns:
645 The page number or None
647 """
648 # To provide same function as in PdfReader
649 if is_null_or_none(indirect_reference):
650 return None
651 assert indirect_reference is not None, "mypy"
652 if isinstance(indirect_reference, int):
653 indirect_reference = IndirectObject(indirect_reference, 0, self)
654 obj = indirect_reference.get_object()
655 if isinstance(obj, PageObject):
656 return obj.page_number
657 return None
659 def add_blank_page(
660 self, width: Optional[float] = None, height: Optional[float] = None
661 ) -> PageObject:
662 """
663 Append a blank page to this PDF file and return it.
665 If no page size is specified, use the size of the last page.
667 Args:
668 width: The width of the new page expressed in default user
669 space units.
670 height: The height of the new page expressed in default
671 user space units.
673 Returns:
674 The newly appended page.
676 Raises:
677 PageSizeNotDefinedError: if width and height are not defined
678 and previous page does not exist.
680 """
681 page = PageObject.create_blank_page(self, width, height)
682 return self.add_page(page)
684 def insert_blank_page(
685 self,
686 width: Optional[Union[float, decimal.Decimal]] = None,
687 height: Optional[Union[float, decimal.Decimal]] = None,
688 index: int = 0,
689 ) -> PageObject:
690 """
691 Insert a blank page to this PDF file and return it.
693 If no page size is specified, use the size of the last page.
695 Args:
696 width: The width of the new page expressed in default user
697 space units.
698 height: The height of the new page expressed in default
699 user space units.
700 index: Position to add the page.
702 Returns:
703 The newly inserted page.
705 Raises:
706 PageSizeNotDefinedError: if width and height are not defined
707 and previous page does not exist.
709 """
710 if width is None or (height is None and index < self.get_num_pages()):
711 oldpage = self.pages[index]
712 width = oldpage.mediabox.width
713 height = oldpage.mediabox.height
714 page = PageObject.create_blank_page(self, width, height)
715 self.insert_page(page, index)
716 return page
718 @property
719 def open_destination(
720 self,
721 ) -> Union[None, Destination, TextStringObject, ByteStringObject]:
722 return super().open_destination
724 @open_destination.setter
725 def open_destination(self, dest: Union[None, str, Destination, PageObject]) -> None:
726 if dest is None:
727 try:
728 del self._root_object["/OpenAction"]
729 except KeyError:
730 pass
731 elif isinstance(dest, str):
732 self._root_object[NameObject("/OpenAction")] = TextStringObject(dest)
733 elif isinstance(dest, Destination):
734 self._root_object[NameObject("/OpenAction")] = dest.dest_array
735 elif isinstance(dest, PageObject):
736 self._root_object[NameObject("/OpenAction")] = Destination(
737 "Opening",
738 dest.indirect_reference
739 if dest.indirect_reference is not None
740 else NullObject(),
741 PAGE_FIT,
742 ).dest_array
744 def add_js(self, javascript: str) -> None:
745 """
746 Add JavaScript which will launch upon opening this PDF.
748 Args:
749 javascript: Your JavaScript.
751 Example:
752 This will launch the print window when the PDF is opened.
754 >>> from pypdf import PdfWriter
755 >>> output = PdfWriter()
756 >>> output.add_js("this.print({bUI:true,bSilent:false,bShrinkToFit:true});")
758 """
759 # Names / JavaScript preferred to be able to add multiple scripts
760 if "/Names" not in self._root_object:
761 self._root_object[NameObject(CA.NAMES)] = DictionaryObject()
762 names = cast(DictionaryObject, self._root_object[CA.NAMES])
763 if "/JavaScript" not in names:
764 names[NameObject("/JavaScript")] = DictionaryObject(
765 {NameObject("/Names"): ArrayObject()}
766 )
767 js_list = cast(
768 ArrayObject, cast(DictionaryObject, names["/JavaScript"])["/Names"]
769 )
770 # We need a name for parameterized JavaScript in the PDF file,
771 # but it can be anything.
772 js_list.append(create_string_object(str(uuid.uuid4())))
774 js = DictionaryObject(
775 {
776 NameObject(PagesAttributes.TYPE): NameObject("/Action"),
777 NameObject("/S"): NameObject("/JavaScript"),
778 NameObject("/JS"): TextStringObject(f"{javascript}"),
779 }
780 )
781 js_list.append(self._add_object(js))
783 def add_attachment(self, filename: str, data: Union[str, bytes]) -> "EmbeddedFile":
784 """
785 Embed a file inside the PDF.
787 Reference:
788 https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
789 Section 7.11.3
791 Args:
792 filename: The filename to display.
793 data: The data in the file.
795 Returns:
796 EmbeddedFile instance for the newly created embedded file.
798 """
799 return EmbeddedFile._create_new(self, filename, data)
801 def append_pages_from_reader(
802 self,
803 reader: PdfReader,
804 after_page_append: Optional[Callable[[PageObject], None]] = None,
805 ) -> None:
806 """
807 Copy pages from reader to writer. Includes an optional callback
808 parameter which is invoked after pages are appended to the writer.
810 ``append`` should be preferred.
812 Args:
813 reader: a PdfReader object from which to copy page
814 annotations to this writer object. The writer's annots
815 will then be updated.
816 after_page_append:
817 Callback function that is invoked after each page is appended to
818 the writer. Signature includes a reference to the appended page
819 (delegates to append_pages_from_reader). The single parameter of
820 the callback is a reference to the page just appended to the
821 document.
823 """
824 reader_num_pages = len(reader.pages)
825 # Copy pages from reader to writer
826 for reader_page_number in range(reader_num_pages):
827 reader_page = reader.pages[reader_page_number]
828 writer_page = self.add_page(reader_page)
829 # Trigger callback, pass writer page as parameter
830 if callable(after_page_append):
831 after_page_append(writer_page)
833 def _merge_content_stream_to_page(
834 self,
835 page: PageObject,
836 new_content_data: bytes,
837 ) -> None:
838 """
839 Combines existing content stream(s) with new content (as bytes).
841 Args:
842 page: The page to which the new content data will be added.
843 new_content_data: A binary-encoded new content stream, for
844 instance the commands to draw an XObject.
845 """
846 # First resolve the existing page content. This always is an IndirectObject:
847 # PDF Explained by John Whitington
848 # https://www.oreilly.com/library/view/pdf-explained/9781449321581/ch04.html
849 if NameObject("/Contents") in page:
850 existing_content_ref = page[NameObject("/Contents")]
851 existing_content = existing_content_ref.get_object()
853 if isinstance(existing_content, ArrayObject):
854 # Create a new StreamObject for the new_content_data
855 new_stream_obj = StreamObject()
856 new_stream_obj.set_data(new_content_data)
857 existing_content.append(self._add_object(new_stream_obj))
858 page[NameObject("/Contents")] = self._add_object(existing_content)
859 if isinstance(existing_content, StreamObject):
860 # Merge new content to existing StreamObject
861 merged_data = existing_content.get_data() + b"\n" + new_content_data
862 new_stream = StreamObject()
863 new_stream.set_data(merged_data)
864 page[NameObject("/Contents")] = self._add_object(new_stream)
865 else:
866 # If no existing content, then we have an empty page.
867 # Create a new StreamObject in a new /Contents entry.
868 new_stream = StreamObject()
869 new_stream.set_data(new_content_data)
870 page[NameObject("/Contents")] = self._add_object(new_stream)
872 def _add_apstream_object(
873 self,
874 page: PageObject,
875 appearance_stream_obj: StreamObject,
876 object_name: str,
877 x_offset: float,
878 y_offset: float,
879 ) -> None:
880 """
881 Adds an appearance stream to the page content in the form of
882 an XObject.
884 Args:
885 page: The page to which to add the appearance stream.
886 appearance_stream_obj: The appearance stream.
887 object_name: The name of the appearance stream.
888 x_offset: The horizontal offset for the appearance stream.
889 y_offset: The vertical offset for the appearance stream.
890 """
891 # Prepare XObject resource dictionary on the page. This currently
892 # only deals with font resources, but can easily be adapted to also
893 # include other resources.
894 pg_res = cast(DictionaryObject, page[PG.RESOURCES])
895 if "/Resources" in appearance_stream_obj:
896 ap_stream_res = cast(DictionaryObject, appearance_stream_obj["/Resources"])
897 # No need to check "if "/Font" in ap_stream_res", because the only reason this
898 # code runs would be if we are flattening form fields, and the associated code
899 # either adds a Font resource or no resource at all. This probably needs to
900 # change if we want to use this method to flatten markup annotations.
901 ap_stream_font_dict = cast(DictionaryObject, ap_stream_res["/Font"])
902 if "/Font" not in pg_res:
903 pg_res[NameObject("/Font")] = DictionaryObject()
904 pg_font_res = cast(DictionaryObject, pg_res["/Font"])
905 # Merge fonts from the appearance stream into the page's font resources
906 for font_name, font_ref in ap_stream_font_dict.items():
907 if font_name not in pg_font_res:
908 pg_font_res[font_name] = font_ref
909 # Always add the resolved stream object to the writer to get a new IndirectObject.
910 # This ensures we have a valid IndirectObject managed by *this* writer.
911 xobject_ref = self._add_object(appearance_stream_obj)
912 xobject_name = NameObject(f"/Fm_{object_name}")._sanitize()
913 if "/XObject" not in pg_res:
914 pg_res[NameObject("/XObject")] = DictionaryObject()
915 pg_xo_res = cast(DictionaryObject, pg_res["/XObject"])
916 if xobject_name not in pg_xo_res:
917 pg_xo_res[xobject_name] = xobject_ref
918 else:
919 logger_warning(
920 f"XObject {xobject_name!r} already added to page resources. This might be an issue.",
921 __name__
922 )
923 xobject_cm = Transformation().translate(x_offset, y_offset)
924 xobject_drawing_commands = f"q\n{xobject_cm._to_cm()}\n{xobject_name} Do\nQ".encode()
925 self._merge_content_stream_to_page(page, xobject_drawing_commands)
927 FFBITS_NUL = FA.FfBits(0)
929 def update_page_form_field_values(
930 self,
931 page: Union[PageObject, list[PageObject], None],
932 fields: Mapping[str, Union[str, list[str], tuple[str, str, float]]],
933 flags: FA.FfBits = FFBITS_NUL,
934 auto_regenerate: Optional[bool] = True,
935 flatten: bool = False,
936 ) -> None:
937 """
938 Update the form field values for a given page from a fields dictionary.
940 Copy field texts and values from fields to page.
941 If the field links to a parent object, add the information to the parent.
943 Args:
944 page: `PageObject` - references **PDF writer's page** where the
945 annotations and field data will be updated.
946 `List[Pageobject]` - provides list of pages to be processed.
947 `None` - all pages.
948 fields: a Python dictionary of:
950 * field names (/T) as keys and text values (/V) as value
951 * field names (/T) as keys and list of text values (/V) for multiple choice list
952 * field names (/T) as keys and tuple of:
953 * text values (/V)
954 * font id (e.g. /F1, the font id must exist)
955 * font size (0 for autosize)
957 flags: A set of flags from :class:`~pypdf.constants.FieldDictionaryAttributes.FfBits`.
959 auto_regenerate: Set/unset the need_appearances flag;
960 the flag is unchanged if auto_regenerate is None.
962 flatten: Whether or not to flatten the annotation. If True, this adds the annotation's
963 appearance stream to the page contents. Note that this option does not remove the
964 annotation itself.
966 """
967 if CatalogDictionary.ACRO_FORM not in self._root_object:
968 raise PyPdfError("No /AcroForm dictionary in PDF of PdfWriter Object")
969 acro_form = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
970 if InteractiveFormDictEntries.Fields not in acro_form:
971 raise PyPdfError("No /Fields dictionary in PDF of PdfWriter Object")
972 if isinstance(auto_regenerate, bool):
973 self.set_need_appearances_writer(auto_regenerate)
974 # Iterate through pages, update field values
975 if page is None:
976 page = list(self.pages)
977 if isinstance(page, list):
978 for p in page:
979 if PG.ANNOTS in p: # just to prevent warnings
980 self.update_page_form_field_values(p, fields, flags, None, flatten=flatten)
981 return
982 if PG.ANNOTS not in page:
983 logger_warning("No fields to update on this page", __name__)
984 return
985 for annotation in page[PG.ANNOTS]: # type: ignore
986 annotation = cast(DictionaryObject, annotation.get_object())
987 if annotation.get("/Subtype", "") != "/Widget":
988 continue
989 if "/FT" in annotation and "/T" in annotation:
990 parent_annotation = annotation
991 else:
992 parent_annotation = annotation.get(
993 PG.PARENT, DictionaryObject()
994 ).get_object()
996 for field, value in fields.items():
997 rectangle = cast(RectangleObject, annotation[AA.Rect])
998 if not (
999 self._get_qualified_field_name(parent_annotation) == field
1000 or parent_annotation.get("/T", None) == field
1001 ):
1002 continue
1003 if (
1004 parent_annotation.get("/FT", None) == "/Ch"
1005 and "/I" in parent_annotation
1006 ):
1007 del parent_annotation["/I"]
1008 if flags:
1009 annotation[NameObject(FA.Ff)] = NumberObject(flags)
1010 # Set the field value
1011 if not (value is None and flatten): # Only change values if given by user and not flattening.
1012 if isinstance(value, list):
1013 lst = ArrayObject(TextStringObject(v) for v in value)
1014 parent_annotation[NameObject(FA.V)] = lst
1015 elif isinstance(value, tuple):
1016 annotation[NameObject(FA.V)] = TextStringObject(
1017 value[0],
1018 )
1019 else:
1020 parent_annotation[NameObject(FA.V)] = TextStringObject(value)
1021 # Get or create the field's appearance stream object
1022 if parent_annotation.get(FA.FT) == "/Btn":
1023 # Checkbox button (no /FT found in Radio widgets);
1024 # We can find the associated appearance stream object
1025 # within the annotation.
1026 v = NameObject(value)
1027 ap = cast(DictionaryObject, annotation[NameObject(AA.AP)])
1028 normal_ap = cast(DictionaryObject, ap["/N"])
1029 if v not in normal_ap:
1030 v = NameObject("/Off")
1031 appearance_stream_obj = normal_ap.get(v)
1032 # Other cases will be updated through the for loop
1033 annotation[NameObject(AA.AS)] = v
1034 annotation[NameObject(FA.V)] = v
1035 elif (
1036 parent_annotation.get(FA.FT) == "/Tx"
1037 or parent_annotation.get(FA.FT) == "/Ch"
1038 ):
1039 # Textbox; we need to generate the appearance stream object
1040 if isinstance(value, tuple):
1041 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1042 acro_form, parent_annotation, annotation, value[1], value[2]
1043 )
1044 else:
1045 appearance_stream_obj = TextStreamAppearance.from_text_annotation(
1046 acro_form, parent_annotation, annotation
1047 )
1048 # Add the appearance stream object
1049 if AA.AP not in annotation:
1050 annotation[NameObject(AA.AP)] = DictionaryObject(
1051 {NameObject("/N"): self._add_object(appearance_stream_obj)}
1052 )
1053 elif "/N" not in (ap:= cast(DictionaryObject, annotation[AA.AP])):
1054 cast(DictionaryObject, annotation[NameObject(AA.AP)])[
1055 NameObject("/N")
1056 ] = self._add_object(appearance_stream_obj)
1057 else: # [/AP][/N] exists
1058 n = annotation[AA.AP]["/N"].indirect_reference.idnum # type: ignore
1059 self._objects[n - 1] = appearance_stream_obj
1060 appearance_stream_obj.indirect_reference = IndirectObject(n, 0, self)
1061 elif (
1062 annotation.get(FA.FT) == "/Sig"
1063 ): # deprecated # not implemented yet
1064 logger_warning("Signature forms not implemented yet", __name__)
1065 if flatten and appearance_stream_obj is not None:
1066 self._add_apstream_object(page, appearance_stream_obj, field, rectangle[0], rectangle[1])
1068 def reattach_fields(
1069 self, page: Optional[PageObject] = None
1070 ) -> list[DictionaryObject]:
1071 """
1072 Parse annotations within the page looking for orphan fields and
1073 reattach then into the Fields Structure.
1075 Args:
1076 page: page to analyze.
1077 If none is provided, all pages will be analyzed.
1079 Returns:
1080 list of reattached fields.
1082 """
1083 lst = []
1084 if page is None:
1085 for p in self.pages:
1086 lst += self.reattach_fields(p)
1087 return lst
1089 try:
1090 af = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
1091 except KeyError:
1092 af = DictionaryObject()
1093 self._root_object[NameObject(CatalogDictionary.ACRO_FORM)] = af
1094 try:
1095 fields = cast(ArrayObject, af[InteractiveFormDictEntries.Fields])
1096 except KeyError:
1097 fields = ArrayObject()
1098 af[NameObject(InteractiveFormDictEntries.Fields)] = fields
1100 if "/Annots" not in page:
1101 return lst
1102 annotations = cast(ArrayObject, page["/Annots"])
1103 for idx, annotation in enumerate(annotations):
1104 is_indirect = isinstance(annotation, IndirectObject)
1105 annotation = cast(DictionaryObject, annotation.get_object())
1106 if annotation.get("/Subtype", "") == "/Widget" and "/FT" in annotation:
1107 if (
1108 "indirect_reference" in annotation.__dict__
1109 and annotation.indirect_reference in fields
1110 ):
1111 continue
1112 if not is_indirect:
1113 annotations[idx] = self._add_object(annotation)
1114 fields.append(annotation.indirect_reference)
1115 lst.append(annotation)
1116 return lst
1118 def clone_reader_document_root(self, reader: PdfReader) -> None:
1119 """
1120 Copy the reader document root to the writer and all sub-elements,
1121 including pages, threads, outlines,... For partial insertion, ``append``
1122 should be considered.
1124 Args:
1125 reader: PdfReader from which the document root should be copied.
1127 """
1128 self._info_obj = None
1129 if self.incremental:
1130 self._objects = [None] * (cast(int, reader.trailer["/Size"]) - 1)
1131 for i in range(len(self._objects)):
1132 o = reader.get_object(i + 1)
1133 if o is not None:
1134 self._objects[i] = o.replicate(self)
1135 else:
1136 self._objects.clear()
1137 self._root_object = reader.root_object.clone(self)
1138 self._pages = self._root_object.raw_get("/Pages")
1140 if len(self._objects) > cast(int, reader.trailer["/Size"]):
1141 if self.strict:
1142 raise PdfReadError(
1143 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}"
1144 )
1145 logger_warning(
1146 f"Object count {len(self._objects)} exceeds defined trailer size {reader.trailer['/Size']}",
1147 __name__
1148 )
1150 # must be done here before rewriting
1151 if self.incremental:
1152 self._original_hash = [
1153 (obj.hash_bin() if obj is not None else 0) for obj in self._objects
1154 ]
1156 try:
1157 self._flatten()
1158 except IndexError:
1159 raise PdfReadError("Got index error while flattening.")
1161 assert self.flattened_pages is not None
1162 for p in self.flattened_pages:
1163 self._replace_object(cast(IndirectObject, p.indirect_reference).idnum, p)
1164 if not self.incremental:
1165 p[NameObject("/Parent")] = self._pages
1166 if not self.incremental:
1167 cast(DictionaryObject, self._pages.get_object())[
1168 NameObject("/Kids")
1169 ] = ArrayObject([p.indirect_reference for p in self.flattened_pages])
1171 def clone_document_from_reader(
1172 self,
1173 reader: PdfReader,
1174 after_page_append: Optional[Callable[[PageObject], None]] = None,
1175 ) -> None:
1176 """
1177 Create a copy (clone) of a document from a PDF file reader cloning
1178 section '/Root' and '/Info' and '/ID' of the pdf.
1180 Args:
1181 reader: PDF file reader instance from which the clone
1182 should be created.
1183 after_page_append:
1184 Callback function that is invoked after each page is appended to
1185 the writer. Signature includes a reference to the appended page
1186 (delegates to append_pages_from_reader). The single parameter of
1187 the callback is a reference to the page just appended to the
1188 document.
1190 """
1191 self.clone_reader_document_root(reader)
1192 inf = reader._info
1193 if self.incremental:
1194 if inf is not None:
1195 self._info_obj = cast(
1196 IndirectObject, inf.clone(self).indirect_reference
1197 )
1198 assert isinstance(self._info, DictionaryObject), "for mypy"
1199 self._original_hash[
1200 self._info_obj.indirect_reference.idnum - 1
1201 ] = self._info.hash_bin()
1202 elif inf is not None:
1203 self._info_obj = self._add_object(
1204 DictionaryObject(cast(DictionaryObject, inf.get_object()))
1205 )
1206 # else: _info_obj = None done in clone_reader_document_root()
1208 try:
1209 self._ID = cast(ArrayObject, reader._ID).clone(self)
1210 except AttributeError:
1211 pass
1213 if callable(after_page_append):
1214 for page in cast(
1215 ArrayObject, cast(DictionaryObject, self._pages.get_object())["/Kids"]
1216 ):
1217 after_page_append(page.get_object())
1219 def _compute_document_identifier(self) -> ByteStringObject:
1220 stream = BytesIO()
1221 self._write_pdf_structure(stream)
1222 stream.seek(0)
1223 return ByteStringObject(_rolling_checksum(stream).encode("utf8"))
1225 def generate_file_identifiers(self) -> None:
1226 """
1227 Generate an identifier for the PDF that will be written.
1229 The only point of this is ensuring uniqueness. Reproducibility is not
1230 required.
1231 When a file is first written, both identifiers shall be set to the same value.
1232 If both identifiers match when a file reference is resolved, it is very
1233 likely that the correct and unchanged file has been found. If only the first
1234 identifier matches, a different version of the correct file has been found.
1235 see §14.4 "File Identifiers".
1236 """
1237 if self._ID:
1238 id1 = self._ID[0]
1239 id2 = self._compute_document_identifier()
1240 else:
1241 id1 = self._compute_document_identifier()
1242 id2 = id1
1243 self._ID = ArrayObject((id1, id2))
1245 def encrypt(
1246 self,
1247 user_password: str,
1248 owner_password: Optional[str] = None,
1249 use_128bit: bool = True,
1250 permissions_flag: UserAccessPermissions = ALL_DOCUMENT_PERMISSIONS,
1251 *,
1252 algorithm: Optional[str] = None,
1253 ) -> None:
1254 """
1255 Encrypt this PDF file with the PDF Standard encryption handler.
1257 Args:
1258 user_password: The password which allows for opening
1259 and reading the PDF file with the restrictions provided.
1260 owner_password: The password which allows for
1261 opening the PDF files without any restrictions. By default,
1262 the owner password is the same as the user password.
1263 use_128bit: flag as to whether to use 128bit
1264 encryption. When false, 40bit encryption will be used.
1265 By default, this flag is on.
1266 permissions_flag: permissions as described in
1267 Table 3.20 of the PDF 1.7 specification. A bit value of 1 means
1268 the permission is granted.
1269 Hence an integer value of -1 will set all flags.
1270 Bit position 3 is for printing, 4 is for modifying content,
1271 5 and 6 control annotations, 9 for form fields,
1272 10 for extraction of text and graphics.
1273 algorithm: encrypt algorithm. Values may be one of "RC4-40", "RC4-128",
1274 "AES-128", "AES-256-R5", "AES-256". If it is valid,
1275 `use_128bit` will be ignored.
1277 """
1278 if owner_password is None:
1279 owner_password = user_password
1281 if algorithm is not None:
1282 try:
1283 alg = getattr(EncryptAlgorithm, algorithm.replace("-", "_"))
1284 except AttributeError:
1285 raise ValueError(f"Algorithm '{algorithm}' NOT supported")
1286 else:
1287 alg = EncryptAlgorithm.RC4_128
1288 if not use_128bit:
1289 alg = EncryptAlgorithm.RC4_40
1290 self.generate_file_identifiers()
1291 assert self._ID
1292 self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
1293 # in case call `encrypt` again
1294 entry = self._encryption.write_entry(user_password, owner_password)
1295 if self._encrypt_entry:
1296 # replace old encrypt_entry
1297 assert self._encrypt_entry.indirect_reference is not None
1298 entry.indirect_reference = self._encrypt_entry.indirect_reference
1299 self._objects[entry.indirect_reference.idnum - 1] = entry
1300 else:
1301 self._add_object(entry)
1302 self._encrypt_entry = entry
1304 def _resolve_links(self) -> None:
1305 """Patch up links that were added to the document earlier, to
1306 make sure they still point to the same pages.
1307 """
1308 for (new_link, old_link) in self._unresolved_links:
1309 old_page = old_link.find_referenced_page()
1310 if not old_page:
1311 continue
1312 new_page = self._merged_in_pages.get(old_page)
1313 if new_page is None:
1314 continue
1315 new_link.patch_reference(self, new_page)
1317 def write_stream(self, stream: StreamType) -> None:
1318 if hasattr(stream, "mode") and "b" not in stream.mode:
1319 logger_warning(
1320 f"File <{stream.name}> to write to is not in binary mode. "
1321 "It may not be written to correctly.",
1322 __name__,
1323 )
1324 self._resolve_links()
1326 if self.incremental:
1327 self._reader.stream.seek(0)
1328 stream.write(self._reader.stream.read(-1))
1329 if len(self.list_objects_in_increment()) > 0:
1330 self._write_increment(stream) # writes objs, xref stream and startxref
1331 else:
1332 object_positions, free_objects = self._write_pdf_structure(stream)
1333 xref_location = self._write_xref_table(
1334 stream, object_positions, free_objects
1335 )
1336 self._write_trailer(stream, xref_location)
1338 def write(self, stream: Union[Path, StrByteType]) -> tuple[bool, IO[Any]]:
1339 """
1340 Write the collection of pages added to this object out as a PDF file.
1342 Args:
1343 stream: An object to write the file to. The object can support
1344 the write method and the tell method, similar to a file object, or
1345 be a file path, just like the fileobj, just named it stream to keep
1346 existing workflow.
1348 Returns:
1349 A tuple (bool, IO).
1351 """
1352 my_file = False
1354 if stream == "":
1355 raise ValueError(f"Output({stream=}) is empty.")
1357 if isinstance(stream, (str, Path)):
1358 stream = FileIO(stream, "wb")
1359 my_file = True
1361 self.write_stream(stream)
1363 if my_file:
1364 stream.close()
1365 else:
1366 stream.flush()
1368 return my_file, stream
1370 def list_objects_in_increment(self) -> list[IndirectObject]:
1371 """
1372 For analysis or debugging.
1373 Provides the list of new or modified objects that will be written
1374 in the increment.
1375 Deleted objects will not be freed but will become orphans.
1377 Returns:
1378 List of new or modified IndirectObjects
1380 """
1381 original_hash_count = len(self._original_hash)
1382 return [
1383 cast(IndirectObject, obj).indirect_reference
1384 for i, obj in enumerate(self._objects)
1385 if (
1386 obj is not None
1387 and (
1388 i >= original_hash_count
1389 or obj.hash_bin() != self._original_hash[i]
1390 )
1391 )
1392 ]
1394 def _write_increment(self, stream: StreamType) -> None:
1395 object_positions = {}
1396 object_blocks = []
1397 current_start = -1
1398 current_stop = -2
1399 original_hash_count = len(self._original_hash)
1400 for i, obj in enumerate(self._objects):
1401 if obj is not None and (
1402 i >= original_hash_count
1403 or obj.hash_bin() != self._original_hash[i]
1404 ):
1405 idnum = i + 1
1406 assert isinstance(obj, PdfObject), "mypy"
1407 # first write new/modified object
1408 object_positions[idnum] = stream.tell()
1409 stream.write(f"{idnum} 0 obj\n".encode())
1410 """ encryption is not operational
1411 if self._encryption and obj != self._encrypt_entry:
1412 obj = self._encryption.encrypt_object(obj, idnum, 0)
1413 """
1414 obj.write_to_stream(stream)
1415 stream.write(b"\nendobj\n")
1417 # prepare xref
1418 if idnum != current_stop:
1419 if current_start > 0:
1420 object_blocks.append(
1421 [current_start, current_stop - current_start]
1422 )
1423 current_start = idnum
1424 current_stop = idnum + 1
1425 assert current_start > 0, "for pytest only"
1426 object_blocks.append([current_start, current_stop - current_start])
1427 # write incremented xref
1428 xref_location = stream.tell()
1429 xr_id = len(self._objects) + 1
1430 stream.write(f"{xr_id} 0 obj".encode())
1431 init_data = {
1432 NameObject("/Type"): NameObject("/XRef"),
1433 NameObject("/Size"): NumberObject(xr_id + 1),
1434 NameObject("/Root"): self.root_object.indirect_reference,
1435 NameObject("/Filter"): NameObject("/FlateDecode"),
1436 NameObject("/Index"): ArrayObject(
1437 [NumberObject(_it) for _su in object_blocks for _it in _su]
1438 ),
1439 NameObject("/W"): ArrayObject(
1440 [NumberObject(1), NumberObject(4), NumberObject(1)]
1441 ),
1442 "__streamdata__": b"",
1443 }
1444 if self._info is not None and (
1445 self._info.indirect_reference.idnum - 1 # type: ignore
1446 >= len(self._original_hash)
1447 or cast(IndirectObject, self._info).hash_bin() # kept for future
1448 != self._original_hash[
1449 self._info.indirect_reference.idnum - 1 # type: ignore
1450 ]
1451 ):
1452 init_data[NameObject(TK.INFO)] = self._info.indirect_reference
1453 init_data[NameObject(TK.PREV)] = NumberObject(self._reader._startxref)
1454 if self._ID:
1455 init_data[NameObject(TK.ID)] = self._ID
1456 xr = StreamObject.initialize_from_dictionary(init_data)
1457 xr.set_data(
1458 b"".join(
1459 [struct.pack(b">BIB", 1, _pos, 0) for _pos in object_positions.values()]
1460 )
1461 )
1462 xr.write_to_stream(stream)
1463 stream.write(f"\nendobj\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1465 def _write_pdf_structure(self, stream: StreamType) -> tuple[list[int], list[int]]:
1466 object_positions = []
1467 free_objects = []
1468 stream.write(self.pdf_header.encode() + b"\n")
1469 stream.write(b"%\xE2\xE3\xCF\xD3\n")
1471 for idnum, obj in enumerate(self._objects, start=1):
1472 if obj is not None:
1473 object_positions.append(stream.tell())
1474 stream.write(f"{idnum} 0 obj\n".encode())
1475 if self._encryption and obj != self._encrypt_entry:
1476 obj = self._encryption.encrypt_object(obj, idnum, 0)
1477 obj.write_to_stream(stream)
1478 stream.write(b"\nendobj\n")
1479 else:
1480 object_positions.append(-1)
1481 free_objects.append(idnum)
1482 free_objects.append(0) # add 0 to loop in accordance with specification
1483 return object_positions, free_objects
1485 def _write_xref_table(
1486 self, stream: StreamType, object_positions: list[int], free_objects: list[int]
1487 ) -> int:
1488 xref_location = stream.tell()
1489 stream.write(b"xref\n")
1490 stream.write(f"0 {len(self._objects) + 1}\n".encode())
1491 stream.write(f"{free_objects[0]:0>10} {65535:0>5} f \n".encode())
1492 free_idx = 1
1493 for offset in object_positions:
1494 if offset > 0:
1495 stream.write(f"{offset:0>10} {0:0>5} n \n".encode())
1496 else:
1497 stream.write(f"{free_objects[free_idx]:0>10} {1:0>5} f \n".encode())
1498 free_idx += 1
1499 return xref_location
1501 def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
1502 """
1503 Write the PDF trailer to the stream.
1505 To quote the PDF specification:
1506 [The] trailer [gives] the location of the cross-reference table and
1507 of certain special objects within the body of the file.
1508 """
1509 stream.write(b"trailer\n")
1510 trailer = DictionaryObject(
1511 {
1512 NameObject(TK.SIZE): NumberObject(len(self._objects) + 1),
1513 NameObject(TK.ROOT): self.root_object.indirect_reference,
1514 }
1515 )
1516 if self._info is not None:
1517 trailer[NameObject(TK.INFO)] = self._info.indirect_reference
1518 if self._ID is not None:
1519 trailer[NameObject(TK.ID)] = self._ID
1520 if self._encrypt_entry:
1521 trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
1522 trailer.write_to_stream(stream)
1523 stream.write(f"\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1525 @property
1526 def metadata(self) -> Optional[DocumentInformation]:
1527 """
1528 Retrieve/set the PDF file's document information dictionary, if it exists.
1530 Args:
1531 value: dict with the entries to be set. if None : remove the /Info entry from the pdf.
1533 Note that some PDF files use (XMP) metadata streams instead of document
1534 information dictionaries, and these metadata streams will not be
1535 accessed by this function, but by :meth:`~xmp_metadata`.
1537 """
1538 return super().metadata
1540 @metadata.setter
1541 def metadata(
1542 self,
1543 value: Optional[Union[DocumentInformation, DictionaryObject, dict[Any, Any]]],
1544 ) -> None:
1545 if value is None:
1546 self._info = None
1547 else:
1548 if self._info is not None:
1549 self._info.clear()
1551 self.add_metadata(value)
1553 def add_metadata(self, infos: dict[str, Any]) -> None:
1554 """
1555 Add custom metadata to the output.
1557 Args:
1558 infos: a Python dictionary where each key is a field
1559 and each value is your new metadata.
1561 """
1562 args = {}
1563 if isinstance(infos, PdfObject):
1564 infos = cast(DictionaryObject, infos.get_object())
1565 for key, value in list(infos.items()):
1566 if isinstance(value, PdfObject):
1567 value = value.get_object()
1568 args[NameObject(key)] = create_string_object(str(value))
1569 if self._info is None:
1570 self._info = DictionaryObject()
1571 self._info.update(args)
1573 def compress_identical_objects(
1574 self,
1575 remove_identicals: bool = True,
1576 remove_orphans: bool = True,
1577 ) -> None:
1578 """
1579 Parse the PDF file and merge objects that have the same hash.
1580 This will make objects common to multiple pages.
1581 Recommended to be used just before writing output.
1583 Args:
1584 remove_identicals: Remove identical objects.
1585 remove_orphans: Remove unreferenced objects.
1587 """
1589 def replace_in_obj(
1590 obj: PdfObject, crossref: dict[IndirectObject, IndirectObject]
1591 ) -> None:
1592 if isinstance(obj, DictionaryObject):
1593 key_val = obj.items()
1594 elif isinstance(obj, ArrayObject):
1595 key_val = enumerate(obj) # type: ignore
1596 else:
1597 return
1598 assert isinstance(obj, (DictionaryObject, ArrayObject))
1599 for k, v in key_val:
1600 if isinstance(v, IndirectObject):
1601 orphans[v.idnum - 1] = False
1602 if v in crossref:
1603 obj[k] = crossref[v]
1604 else:
1605 """the filtering on DictionaryObject and ArrayObject only
1606 will be performed within replace_in_obj"""
1607 replace_in_obj(v, crossref)
1609 # _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...])
1610 self._idnum_hash = {}
1611 orphans = [True] * len(self._objects)
1612 # look for similar objects
1613 for idx, obj in enumerate(self._objects):
1614 if is_null_or_none(obj):
1615 continue
1616 assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here.
1617 assert isinstance(obj.indirect_reference, IndirectObject)
1618 h = obj.hash_value()
1619 if remove_identicals and h in self._idnum_hash:
1620 self._idnum_hash[h][1].append(obj.indirect_reference)
1621 self._objects[idx] = None
1622 else:
1623 self._idnum_hash[h] = (obj.indirect_reference, [])
1625 # generate the dict converting others to 1st
1626 cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0}
1627 cnv_rev: dict[IndirectObject, IndirectObject] = {}
1628 for k, v in cnv.items():
1629 cnv_rev.update(zip(v, (k,) * len(v)))
1631 # replace reference to merged objects
1632 for obj in self._objects:
1633 if isinstance(obj, (DictionaryObject, ArrayObject)):
1634 replace_in_obj(obj, cnv_rev)
1636 # remove orphans (if applicable)
1637 orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore
1639 orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore
1641 try:
1642 orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore
1643 except AttributeError:
1644 pass
1645 for i in compress(range(len(self._objects)), orphans):
1646 self._objects[i] = None
1648 def get_reference(self, obj: PdfObject) -> IndirectObject:
1649 idnum = self._objects.index(obj) + 1
1650 ref = IndirectObject(idnum, 0, self)
1651 assert ref.get_object() == obj
1652 return ref
1654 def get_outline_root(self) -> TreeObject:
1655 if CO.OUTLINES in self._root_object:
1656 # Entries in the catalog dictionary
1657 outline = cast(TreeObject, self._root_object[CO.OUTLINES])
1658 if not isinstance(outline, TreeObject):
1659 t = TreeObject(outline)
1660 self._replace_object(outline.indirect_reference.idnum, t)
1661 outline = t
1662 idnum = self._objects.index(outline) + 1
1663 outline_ref = IndirectObject(idnum, 0, self)
1664 assert outline_ref.get_object() == outline
1665 else:
1666 outline = TreeObject()
1667 outline.update({})
1668 outline_ref = self._add_object(outline)
1669 self._root_object[NameObject(CO.OUTLINES)] = outline_ref
1671 return outline
1673 def get_threads_root(self) -> ArrayObject:
1674 """
1675 The list of threads.
1677 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1679 Returns:
1680 An array (possibly empty) of Dictionaries with an ``/F`` key,
1681 and optionally information about the thread in ``/I`` or ``/Metadata`` keys.
1683 """
1684 if CO.THREADS in self._root_object:
1685 # Entries in the catalog dictionary
1686 threads = cast(ArrayObject, self._root_object[CO.THREADS])
1687 else:
1688 threads = ArrayObject()
1689 self._root_object[NameObject(CO.THREADS)] = threads
1690 return threads
1692 @property
1693 def threads(self) -> ArrayObject:
1694 """
1695 Read-only property for the list of threads.
1697 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1699 Each element is a dictionary with an ``/F`` key, and optionally
1700 information about the thread in ``/I`` or ``/Metadata`` keys.
1701 """
1702 return self.get_threads_root()
1704 def add_outline_item_destination(
1705 self,
1706 page_destination: Union[IndirectObject, PageObject, TreeObject],
1707 parent: Union[None, TreeObject, IndirectObject] = None,
1708 before: Union[None, TreeObject, IndirectObject] = None,
1709 is_open: bool = True,
1710 ) -> IndirectObject:
1711 page_destination = cast(PageObject, page_destination.get_object())
1712 if isinstance(page_destination, PageObject):
1713 return self.add_outline_item_destination(
1714 Destination(
1715 f"page #{page_destination.page_number}",
1716 cast(IndirectObject, page_destination.indirect_reference),
1717 Fit.fit(),
1718 )
1719 )
1721 if parent is None:
1722 parent = self.get_outline_root()
1724 page_destination[NameObject("/%is_open%")] = BooleanObject(is_open)
1725 parent = cast(TreeObject, parent.get_object())
1726 page_destination_ref = self._add_object(page_destination)
1727 if before is not None:
1728 before = before.indirect_reference
1729 parent.insert_child(
1730 page_destination_ref,
1731 before,
1732 self,
1733 page_destination.inc_parent_counter_outline
1734 if is_open
1735 else (lambda x, y: 0), # noqa: ARG005
1736 )
1737 if "/Count" not in page_destination:
1738 page_destination[NameObject("/Count")] = NumberObject(0)
1740 return page_destination_ref
1742 def add_outline_item_dict(
1743 self,
1744 outline_item: OutlineItemType,
1745 parent: Union[None, TreeObject, IndirectObject] = None,
1746 before: Union[None, TreeObject, IndirectObject] = None,
1747 is_open: bool = True,
1748 ) -> IndirectObject:
1749 outline_item_object = TreeObject()
1750 outline_item_object.update(outline_item)
1752 """code currently unreachable
1753 if "/A" in outline_item:
1754 action = DictionaryObject()
1755 a_dict = cast(DictionaryObject, outline_item["/A"])
1756 for k, v in list(a_dict.items()):
1757 action[NameObject(str(k))] = v
1758 action_ref = self._add_object(action)
1759 outline_item_object[NameObject("/A")] = action_ref
1760 """
1761 return self.add_outline_item_destination(
1762 outline_item_object, parent, before, is_open
1763 )
1765 def add_outline_item(
1766 self,
1767 title: str,
1768 page_number: Union[None, PageObject, IndirectObject, int],
1769 parent: Union[None, TreeObject, IndirectObject] = None,
1770 before: Union[None, TreeObject, IndirectObject] = None,
1771 color: Optional[Union[tuple[float, float, float], str]] = None,
1772 bold: bool = False,
1773 italic: bool = False,
1774 fit: Fit = PAGE_FIT,
1775 is_open: bool = True,
1776 ) -> IndirectObject:
1777 """
1778 Add an outline item (commonly referred to as a "Bookmark") to the PDF file.
1780 Args:
1781 title: Title to use for this outline item.
1782 page_number: Page number this outline item will point to.
1783 parent: A reference to a parent outline item to create nested
1784 outline items.
1785 before:
1786 color: Color of the outline item's font as a red, green, blue tuple
1787 from 0.0 to 1.0 or as a Hex String (#RRGGBB)
1788 bold: Outline item font is bold
1789 italic: Outline item font is italic
1790 fit: The fit of the destination page.
1792 Returns:
1793 The added outline item as an indirect object.
1795 """
1796 page_ref: Union[None, NullObject, IndirectObject, NumberObject]
1797 if isinstance(italic, Fit): # it means that we are on the old params
1798 if fit is not None and page_number is None:
1799 page_number = fit
1800 return self.add_outline_item(
1801 title, page_number, parent, None, before, color, bold, italic, is_open=is_open
1802 )
1803 if page_number is None:
1804 action_ref = None
1805 else:
1806 if isinstance(page_number, IndirectObject):
1807 page_ref = page_number
1808 elif isinstance(page_number, PageObject):
1809 page_ref = page_number.indirect_reference
1810 elif isinstance(page_number, int):
1811 try:
1812 page_ref = self.pages[page_number].indirect_reference
1813 except IndexError:
1814 page_ref = NumberObject(page_number)
1815 if page_ref is None:
1816 logger_warning(
1817 f"can not find reference of page {page_number}",
1818 __name__,
1819 )
1820 page_ref = NullObject()
1821 dest = Destination(
1822 NameObject("/" + title + " outline item"),
1823 page_ref,
1824 fit,
1825 )
1827 action_ref = self._add_object(
1828 DictionaryObject(
1829 {
1830 NameObject(GoToActionArguments.D): dest.dest_array,
1831 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1832 }
1833 )
1834 )
1835 outline_item = self._add_object(
1836 _create_outline_item(action_ref, title, color, italic, bold)
1837 )
1839 if parent is None:
1840 parent = self.get_outline_root()
1841 return self.add_outline_item_destination(outline_item, parent, before, is_open)
1843 def add_outline(self) -> None:
1844 raise NotImplementedError(
1845 "This method is not yet implemented. Use :meth:`add_outline_item` instead."
1846 )
1848 def add_named_destination_array(
1849 self, title: TextStringObject, destination: Union[IndirectObject, ArrayObject]
1850 ) -> None:
1851 named_dest = self.get_named_dest_root()
1852 i = 0
1853 while i < len(named_dest):
1854 if title < named_dest[i]:
1855 named_dest.insert(i, destination)
1856 named_dest.insert(i, TextStringObject(title))
1857 return
1858 i += 2
1859 named_dest.extend([TextStringObject(title), destination])
1860 return
1862 def add_named_destination_object(
1863 self,
1864 page_destination: PdfObject,
1865 ) -> IndirectObject:
1866 page_destination_ref = self._add_object(page_destination.dest_array) # type: ignore
1867 self.add_named_destination_array(
1868 cast("TextStringObject", page_destination["/Title"]), page_destination_ref # type: ignore
1869 )
1871 return page_destination_ref
1873 def add_named_destination(
1874 self,
1875 title: str,
1876 page_number: int,
1877 ) -> IndirectObject:
1878 page_ref = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
1879 dest = DictionaryObject()
1880 dest.update(
1881 {
1882 NameObject(GoToActionArguments.D): ArrayObject(
1883 [page_ref, NameObject(TypFitArguments.FIT_H), NumberObject(826)]
1884 ),
1885 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1886 }
1887 )
1889 dest_ref = self._add_object(dest)
1890 if not isinstance(title, TextStringObject):
1891 title = TextStringObject(str(title))
1893 self.add_named_destination_array(title, dest_ref)
1894 return dest_ref
1896 def remove_links(self) -> None:
1897 """Remove links and annotations from this output."""
1898 for page in self.pages:
1899 self.remove_objects_from_page(page, ObjectDeletionFlag.ALL_ANNOTATIONS)
1901 def remove_annotations(
1902 self, subtypes: Optional[Union[AnnotationSubtype, Iterable[AnnotationSubtype]]]
1903 ) -> None:
1904 """
1905 Remove annotations by annotation subtype.
1907 Args:
1908 subtypes: subtype or list of subtypes to be removed.
1909 Examples are: "/Link", "/FileAttachment", "/Sound",
1910 "/Movie", "/Screen", ...
1911 If you want to remove all annotations, use subtypes=None.
1913 """
1914 for page in self.pages:
1915 self._remove_annots_from_page(page, subtypes)
1917 def _remove_annots_from_page(
1918 self,
1919 page: Union[IndirectObject, PageObject, DictionaryObject],
1920 subtypes: Optional[Iterable[str]],
1921 ) -> None:
1922 page = cast(DictionaryObject, page.get_object())
1923 if PG.ANNOTS in page:
1924 i = 0
1925 while i < len(cast(ArrayObject, page[PG.ANNOTS])):
1926 an = cast(ArrayObject, page[PG.ANNOTS])[i]
1927 obj = cast(DictionaryObject, an.get_object())
1928 if subtypes is None or cast(str, obj["/Subtype"]) in subtypes:
1929 if isinstance(an, IndirectObject):
1930 self._objects[an.idnum - 1] = NullObject() # to reduce PDF size
1931 del page[PG.ANNOTS][i] # type:ignore
1932 else:
1933 i += 1
1935 def remove_objects_from_page(
1936 self,
1937 page: Union[PageObject, DictionaryObject],
1938 to_delete: Union[ObjectDeletionFlag, Iterable[ObjectDeletionFlag]],
1939 text_filters: Optional[dict[str, Any]] = None
1940 ) -> None:
1941 """
1942 Remove objects specified by ``to_delete`` from the given page.
1944 Args:
1945 page: Page object to clean up.
1946 to_delete: Objects to be deleted; can be a ``ObjectDeletionFlag``
1947 or a list of ObjectDeletionFlag
1948 text_filters: Properties of text to be deleted, if applicable. Optional.
1949 This is a Python dictionary with the following properties:
1951 * font_ids: List of font resource IDs (such as /F1 or /T1_0) to be deleted.
1953 """
1954 if isinstance(to_delete, (list, tuple)):
1955 for to_d in to_delete:
1956 self.remove_objects_from_page(page, to_d)
1957 return None
1958 assert isinstance(to_delete, ObjectDeletionFlag)
1960 if to_delete & ObjectDeletionFlag.LINKS:
1961 return self._remove_annots_from_page(page, ("/Link",))
1962 if to_delete & ObjectDeletionFlag.ATTACHMENTS:
1963 return self._remove_annots_from_page(
1964 page, ("/FileAttachment", "/Sound", "/Movie", "/Screen")
1965 )
1966 if to_delete & ObjectDeletionFlag.OBJECTS_3D:
1967 return self._remove_annots_from_page(page, ("/3D",))
1968 if to_delete & ObjectDeletionFlag.ALL_ANNOTATIONS:
1969 return self._remove_annots_from_page(page, None)
1971 jump_operators = []
1972 if to_delete & ObjectDeletionFlag.DRAWING_IMAGES:
1973 jump_operators = (
1974 [
1975 b"w", b"J", b"j", b"M", b"d", b"i",
1976 b"W", b"W*",
1977 b"b", b"b*", b"B", b"B*", b"S", b"s", b"f", b"f*", b"F", b"n",
1978 b"m", b"l", b"c", b"v", b"y", b"h", b"re",
1979 b"sh"
1980 ]
1981 )
1982 if to_delete & ObjectDeletionFlag.TEXT:
1983 jump_operators = [b"Tj", b"TJ", b"'", b'"']
1985 def clean(
1986 content: ContentStream,
1987 images: list[str],
1988 forms: list[str],
1989 text_filters: Optional[dict[str, Any]] = None
1990 ) -> None:
1991 nonlocal jump_operators, to_delete
1993 font_id = None
1994 font_ids_to_delete = []
1995 if text_filters and to_delete & ObjectDeletionFlag.TEXT:
1996 font_ids_to_delete = text_filters.get("font_ids", [])
1998 i = 0
1999 while i < len(content.operations):
2000 operands, operator = content.operations[i]
2001 if operator == b"Tf":
2002 font_id = operands[0]
2003 if (
2004 (
2005 operator == b"INLINE IMAGE"
2006 and (to_delete & ObjectDeletionFlag.INLINE_IMAGES)
2007 )
2008 or (operator in jump_operators)
2009 or (
2010 operator == b"Do"
2011 and (to_delete & ObjectDeletionFlag.XOBJECT_IMAGES)
2012 and (operands[0] in images)
2013 )
2014 ):
2015 if (
2016 not to_delete & ObjectDeletionFlag.TEXT
2017 or (to_delete & ObjectDeletionFlag.TEXT and not text_filters)
2018 or (to_delete & ObjectDeletionFlag.TEXT and font_id in font_ids_to_delete)
2019 ):
2020 del content.operations[i]
2021 else:
2022 i += 1
2023 else:
2024 i += 1
2025 content.get_data() # this ensures ._data is rebuilt from the .operations
2027 def clean_forms(
2028 elt: DictionaryObject, stack: list[DictionaryObject]
2029 ) -> tuple[list[str], list[str]]:
2030 nonlocal to_delete
2031 # elt in recursive call is a new ContentStream object, so we have to check the indirect_reference
2032 if (elt in stack) or (
2033 hasattr(elt, "indirect_reference")
2034 and any(
2035 elt.indirect_reference == getattr(x, "indirect_reference", -1)
2036 for x in stack
2037 )
2038 ):
2039 # to prevent infinite looping
2040 return [], [] # pragma: no cover
2041 try:
2042 d = cast(
2043 dict[Any, Any],
2044 cast(DictionaryObject, elt["/Resources"])["/XObject"],
2045 )
2046 except KeyError:
2047 d = {}
2048 images = []
2049 forms = []
2050 for k, v in d.items():
2051 o = v.get_object()
2052 try:
2053 content: Any = None
2054 if (
2055 to_delete & ObjectDeletionFlag.XOBJECT_IMAGES
2056 and o["/Subtype"] == "/Image"
2057 ):
2058 content = NullObject() # to delete the image keeping the entry
2059 images.append(k)
2060 if o["/Subtype"] == "/Form":
2061 forms.append(k)
2062 if isinstance(o, ContentStream):
2063 content = o
2064 else:
2065 content = ContentStream(o, self)
2066 content.update(
2067 {
2068 k1: v1
2069 for k1, v1 in o.items()
2070 if k1 not in ["/Length", "/Filter", "/DecodeParms"]
2071 }
2072 )
2073 try:
2074 content.indirect_reference = o.indirect_reference
2075 except AttributeError: # pragma: no cover
2076 pass
2077 stack.append(elt)
2078 clean_forms(content, stack) # clean subforms
2079 if content is not None:
2080 if isinstance(v, IndirectObject):
2081 self._objects[v.idnum - 1] = content
2082 else:
2083 # should only occur in a PDF not respecting PDF spec
2084 # where streams must be indirected.
2085 d[k] = self._add_object(content) # pragma: no cover
2086 except (TypeError, KeyError):
2087 pass
2088 for im in images:
2089 del d[im] # for clean-up
2090 if isinstance(elt, StreamObject): # for /Form
2091 if not isinstance(elt, ContentStream): # pragma: no cover
2092 e = ContentStream(elt, self)
2093 e.update(elt.items())
2094 elt = e
2095 clean(elt, images, forms, text_filters) # clean the content
2096 return images, forms
2098 if not isinstance(page, PageObject):
2099 page = PageObject(self, page.indirect_reference) # pragma: no cover
2100 if "/Contents" in page:
2101 content = cast(ContentStream, page.get_contents())
2103 images, forms = clean_forms(page, [])
2105 clean(content, images, forms, text_filters)
2106 page.replace_contents(content)
2107 return [], [] # type: ignore[return-value]
2109 def remove_images(
2110 self,
2111 to_delete: ImageType = ImageType.ALL,
2112 ) -> None:
2113 """
2114 Remove images from this output.
2116 Args:
2117 to_delete: The type of images to be deleted
2118 (default = all images types)
2120 """
2121 if isinstance(to_delete, bool):
2122 to_delete = ImageType.ALL
2124 i = ObjectDeletionFlag.NONE
2126 for image in ("XOBJECT_IMAGES", "INLINE_IMAGES", "DRAWING_IMAGES"):
2127 if to_delete & ImageType[image]:
2128 i |= ObjectDeletionFlag[image]
2130 for page in self.pages:
2131 self.remove_objects_from_page(page, i)
2133 def remove_text(self, font_names: Optional[list[str]] = None) -> None:
2134 """
2135 Remove text from the PDF.
2137 Args:
2138 font_names: List of font names to remove, such as "Helvetica-Bold".
2139 Optional. If not specified, all text will be removed.
2140 """
2141 if not font_names:
2142 font_names = []
2144 for page in self.pages:
2145 resource_ids_to_remove = []
2147 # Content streams reference fonts and other resources with names like "/F1" or "/T1_0"
2148 # Font names need to be converted to resource names/IDs for easier removal
2149 if font_names:
2150 # Recursively loop through page objects to gather font info
2151 def get_font_info(
2152 obj: Any,
2153 font_info: Optional[dict[str, Any]] = None,
2154 key: Optional[str] = None
2155 ) -> dict[str, Any]:
2156 if font_info is None:
2157 font_info = {}
2158 if isinstance(obj, IndirectObject):
2159 obj = obj.get_object()
2160 if isinstance(obj, dict):
2161 if obj.get("/Type") == "/Font":
2162 font_name = obj.get("/BaseFont", "")
2163 # Normalize font names like "/RRXFFV+Palatino-Bold" to "Palatino-Bold"
2164 normalized_font_name = font_name.lstrip("/").split("+")[-1]
2165 if normalized_font_name not in font_info:
2166 font_info[normalized_font_name] = {
2167 "normalized_font_name": normalized_font_name,
2168 "resource_ids": [],
2169 }
2170 if key not in font_info[normalized_font_name]["resource_ids"]:
2171 font_info[normalized_font_name]["resource_ids"].append(key)
2172 for k in obj:
2173 font_info = get_font_info(obj[k], font_info, k)
2174 elif isinstance(obj, (list, ArrayObject)):
2175 for child_obj in obj:
2176 font_info = get_font_info(child_obj, font_info)
2177 return font_info
2179 # Add relevant resource names for removal
2180 font_info = get_font_info(page.get("/Resources"))
2181 for font_name in font_names:
2182 if font_name in font_info:
2183 resource_ids_to_remove.extend(font_info[font_name]["resource_ids"])
2185 text_filters = {}
2186 if font_names:
2187 text_filters["font_ids"] = resource_ids_to_remove
2188 self.remove_objects_from_page(page, ObjectDeletionFlag.TEXT, text_filters=text_filters)
2190 def add_uri(
2191 self,
2192 page_number: int,
2193 uri: str,
2194 rect: RectangleObject,
2195 border: Optional[ArrayObject] = None,
2196 ) -> None:
2197 """
2198 Add an URI from a rectangular area to the specified page.
2200 Args:
2201 page_number: index of the page on which to place the URI action.
2202 uri: URI of resource to link to.
2203 rect: :class:`RectangleObject<pypdf.generic.RectangleObject>` or
2204 array of four integers specifying the clickable rectangular area
2205 ``[xLL, yLL, xUR, yUR]``, or string in the form
2206 ``"[ xLL yLL xUR yUR ]"``.
2207 border: if provided, an array describing border-drawing
2208 properties. See the PDF spec for details. No border will be
2209 drawn if this argument is omitted.
2211 """
2212 page_link = self.get_object(self._pages)[PagesAttributes.KIDS][page_number] # type: ignore
2213 page_ref = cast(dict[str, Any], self.get_object(page_link))
2215 border_arr: BorderArrayType
2216 if border is not None:
2217 border_arr = [NumberObject(n) for n in border[:3]]
2218 if len(border) == 4:
2219 dash_pattern = ArrayObject([NumberObject(n) for n in border[3]])
2220 border_arr.append(dash_pattern)
2221 else:
2222 border_arr = [NumberObject(2), NumberObject(2), NumberObject(2)]
2224 if isinstance(rect, str):
2225 rect = NumberObject(rect)
2226 elif isinstance(rect, RectangleObject):
2227 pass
2228 else:
2229 rect = RectangleObject(rect)
2231 lnk2 = DictionaryObject()
2232 lnk2.update(
2233 {
2234 NameObject("/S"): NameObject("/URI"),
2235 NameObject("/URI"): TextStringObject(uri),
2236 }
2237 )
2238 lnk = DictionaryObject()
2239 lnk.update(
2240 {
2241 NameObject(AA.Type): NameObject("/Annot"),
2242 NameObject(AA.Subtype): NameObject("/Link"),
2243 NameObject(AA.P): page_link,
2244 NameObject(AA.Rect): rect,
2245 NameObject("/H"): NameObject("/I"),
2246 NameObject(AA.Border): ArrayObject(border_arr),
2247 NameObject("/A"): lnk2,
2248 }
2249 )
2250 lnk_ref = self._add_object(lnk)
2252 if PG.ANNOTS in page_ref:
2253 page_ref[PG.ANNOTS].append(lnk_ref)
2254 else:
2255 page_ref[NameObject(PG.ANNOTS)] = ArrayObject([lnk_ref])
2257 _valid_layouts = (
2258 "/NoLayout",
2259 "/SinglePage",
2260 "/OneColumn",
2261 "/TwoColumnLeft",
2262 "/TwoColumnRight",
2263 "/TwoPageLeft",
2264 "/TwoPageRight",
2265 )
2267 def _get_page_layout(self) -> Optional[LayoutType]:
2268 try:
2269 return cast(LayoutType, self._root_object["/PageLayout"])
2270 except KeyError:
2271 return None
2273 def _set_page_layout(self, layout: Union[NameObject, LayoutType]) -> None:
2274 """
2275 Set the page layout.
2277 Args:
2278 layout: The page layout to be used.
2280 .. list-table:: Valid ``layout`` arguments
2281 :widths: 50 200
2283 * - /NoLayout
2284 - Layout explicitly not specified
2285 * - /SinglePage
2286 - Show one page at a time
2287 * - /OneColumn
2288 - Show one column at a time
2289 * - /TwoColumnLeft
2290 - Show pages in two columns, odd-numbered pages on the left
2291 * - /TwoColumnRight
2292 - Show pages in two columns, odd-numbered pages on the right
2293 * - /TwoPageLeft
2294 - Show two pages at a time, odd-numbered pages on the left
2295 * - /TwoPageRight
2296 - Show two pages at a time, odd-numbered pages on the right
2298 """
2299 if not isinstance(layout, NameObject):
2300 if layout not in self._valid_layouts:
2301 logger_warning(
2302 f"Layout should be one of: {'', ''.join(self._valid_layouts)}",
2303 __name__,
2304 )
2305 layout = NameObject(layout)
2306 self._root_object.update({NameObject("/PageLayout"): layout})
2308 def set_page_layout(self, layout: LayoutType) -> None:
2309 """
2310 Set the page layout.
2312 Args:
2313 layout: The page layout to be used
2315 .. list-table:: Valid ``layout`` arguments
2316 :widths: 50 200
2318 * - /NoLayout
2319 - Layout explicitly not specified
2320 * - /SinglePage
2321 - Show one page at a time
2322 * - /OneColumn
2323 - Show one column at a time
2324 * - /TwoColumnLeft
2325 - Show pages in two columns, odd-numbered pages on the left
2326 * - /TwoColumnRight
2327 - Show pages in two columns, odd-numbered pages on the right
2328 * - /TwoPageLeft
2329 - Show two pages at a time, odd-numbered pages on the left
2330 * - /TwoPageRight
2331 - Show two pages at a time, odd-numbered pages on the right
2333 """
2334 self._set_page_layout(layout)
2336 @property
2337 def page_layout(self) -> Optional[LayoutType]:
2338 """
2339 Page layout property.
2341 .. list-table:: Valid ``layout`` values
2342 :widths: 50 200
2344 * - /NoLayout
2345 - Layout explicitly not specified
2346 * - /SinglePage
2347 - Show one page at a time
2348 * - /OneColumn
2349 - Show one column at a time
2350 * - /TwoColumnLeft
2351 - Show pages in two columns, odd-numbered pages on the left
2352 * - /TwoColumnRight
2353 - Show pages in two columns, odd-numbered pages on the right
2354 * - /TwoPageLeft
2355 - Show two pages at a time, odd-numbered pages on the left
2356 * - /TwoPageRight
2357 - Show two pages at a time, odd-numbered pages on the right
2358 """
2359 return self._get_page_layout()
2361 @page_layout.setter
2362 def page_layout(self, layout: LayoutType) -> None:
2363 self._set_page_layout(layout)
2365 _valid_modes = (
2366 "/UseNone",
2367 "/UseOutlines",
2368 "/UseThumbs",
2369 "/FullScreen",
2370 "/UseOC",
2371 "/UseAttachments",
2372 )
2374 def _get_page_mode(self) -> Optional[PagemodeType]:
2375 try:
2376 return cast(PagemodeType, self._root_object["/PageMode"])
2377 except KeyError:
2378 return None
2380 @property
2381 def page_mode(self) -> Optional[PagemodeType]:
2382 """
2383 Page mode property.
2385 .. list-table:: Valid ``mode`` values
2386 :widths: 50 200
2388 * - /UseNone
2389 - Do not show outline or thumbnails panels
2390 * - /UseOutlines
2391 - Show outline (aka bookmarks) panel
2392 * - /UseThumbs
2393 - Show page thumbnails panel
2394 * - /FullScreen
2395 - Fullscreen view
2396 * - /UseOC
2397 - Show Optional Content Group (OCG) panel
2398 * - /UseAttachments
2399 - Show attachments panel
2400 """
2401 return self._get_page_mode()
2403 @page_mode.setter
2404 def page_mode(self, mode: PagemodeType) -> None:
2405 if isinstance(mode, NameObject):
2406 mode_name: NameObject = mode
2407 else:
2408 if mode not in self._valid_modes:
2409 logger_warning(
2410 f"Mode should be one of: {', '.join(self._valid_modes)}", __name__
2411 )
2412 mode_name = NameObject(mode)
2413 self._root_object.update({NameObject("/PageMode"): mode_name})
2415 def add_annotation(
2416 self,
2417 page_number: Union[int, PageObject],
2418 annotation: dict[str, Any],
2419 ) -> DictionaryObject:
2420 """
2421 Add a single annotation to the page.
2422 The added annotation must be a new annotation.
2423 It cannot be recycled.
2425 Args:
2426 page_number: PageObject or page index.
2427 annotation: Annotation to be added (created with annotation).
2429 Returns:
2430 The inserted object.
2431 This can be used for popup creation, for example.
2433 """
2434 page = page_number
2435 if isinstance(page, int):
2436 page = self.pages[page]
2437 elif not isinstance(page, PageObject):
2438 raise TypeError("page: invalid type")
2440 to_add = cast(DictionaryObject, _pdf_objectify(annotation))
2441 to_add[NameObject("/P")] = page.indirect_reference
2443 if page.annotations is None:
2444 page[NameObject("/Annots")] = ArrayObject()
2445 assert page.annotations is not None
2447 # Internal link annotations need the correct object type for the
2448 # destination
2449 if to_add.get("/Subtype") == "/Link" and "/Dest" in to_add:
2450 tmp = cast(dict[Any, Any], to_add[NameObject("/Dest")])
2451 dest = Destination(
2452 NameObject("/LinkName"),
2453 tmp["target_page_index"],
2454 Fit(
2455 fit_type=tmp["fit"], fit_args=dict(tmp)["fit_args"]
2456 ), # I have no clue why this dict-hack is necessary
2457 )
2458 to_add[NameObject("/Dest")] = dest.dest_array
2460 page.annotations.append(self._add_object(to_add))
2462 if to_add.get("/Subtype") == "/Popup" and NameObject("/Parent") in to_add:
2463 cast(DictionaryObject, to_add["/Parent"].get_object())[
2464 NameObject("/Popup")
2465 ] = to_add.indirect_reference
2467 return to_add
2469 def clean_page(self, page: Union[PageObject, IndirectObject]) -> PageObject:
2470 """
2471 Perform some clean up in the page.
2472 Currently: convert NameObject named destination to TextStringObject
2473 (required for names/dests list)
2475 Args:
2476 page:
2478 Returns:
2479 The cleaned PageObject
2481 """
2482 page = cast("PageObject", page.get_object())
2483 for a in page.get("/Annots", []):
2484 a_obj = a.get_object()
2485 d = a_obj.get("/Dest", None)
2486 act = a_obj.get("/A", None)
2487 if isinstance(d, NameObject):
2488 a_obj[NameObject("/Dest")] = TextStringObject(d)
2489 elif act is not None:
2490 act = act.get_object()
2491 d = act.get("/D", None)
2492 if isinstance(d, NameObject):
2493 act[NameObject("/D")] = TextStringObject(d)
2494 return page
2496 def _create_stream(
2497 self, fileobj: Union[Path, StrByteType, PdfReader]
2498 ) -> tuple[IOBase, Optional[Encryption]]:
2499 # If the fileobj parameter is a string, assume it is a path
2500 # and create a file object at that location. If it is a file,
2501 # copy the file's contents into a BytesIO stream object; if
2502 # it is a PdfReader, copy that reader's stream into a
2503 # BytesIO stream.
2504 # If fileobj is none of the above types, it is not modified
2505 encryption_obj = None
2506 stream: IOBase
2507 if isinstance(fileobj, (str, Path)):
2508 with FileIO(fileobj, "rb") as f:
2509 stream = BytesIO(f.read())
2510 elif isinstance(fileobj, PdfReader):
2511 if fileobj._encryption:
2512 encryption_obj = fileobj._encryption
2513 orig_tell = fileobj.stream.tell()
2514 fileobj.stream.seek(0)
2515 stream = BytesIO(fileobj.stream.read())
2517 # reset the stream to its original location
2518 fileobj.stream.seek(orig_tell)
2519 elif hasattr(fileobj, "seek") and hasattr(fileobj, "read"):
2520 fileobj.seek(0)
2521 filecontent = fileobj.read()
2522 stream = BytesIO(filecontent)
2523 else:
2524 raise NotImplementedError(
2525 "Merging requires an object that PdfReader can parse. "
2526 "Typically, that is a Path or a string representing a Path, "
2527 "a file object, or an object implementing .seek and .read. "
2528 "Passing a PdfReader directly works as well."
2529 )
2530 return stream, encryption_obj
2532 def append(
2533 self,
2534 fileobj: Union[StrByteType, PdfReader, Path],
2535 outline_item: Union[
2536 str, None, PageRange, tuple[int, int], tuple[int, int, int], list[int]
2537 ] = None,
2538 pages: Union[
2539 None,
2540 PageRange,
2541 tuple[int, int],
2542 tuple[int, int, int],
2543 list[int],
2544 list[PageObject],
2545 ] = None,
2546 import_outline: bool = True,
2547 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = None,
2548 ) -> None:
2549 """
2550 Identical to the :meth:`merge()<merge>` method, but assumes you want to
2551 concatenate all pages onto the end of the file instead of specifying a
2552 position.
2554 Args:
2555 fileobj: A File Object or an object that supports the standard
2556 read and seek methods similar to a File Object. Could also be a
2557 string representing a path to a PDF file.
2558 outline_item: Optionally, you may specify a string to build an
2559 outline (aka 'bookmark') to identify the beginning of the
2560 included file.
2561 pages: Can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2562 or a ``(start, stop[, step])`` tuple
2563 or a list of pages to be processed
2564 to merge only the specified range of pages from the source
2565 document into the output document.
2566 import_outline: You may prevent the source document's
2567 outline (collection of outline items, previously referred to as
2568 'bookmarks') from being imported by specifying this as ``False``.
2569 excluded_fields: Provide the list of fields/keys to be ignored
2570 if ``/Annots`` is part of the list, the annotation will be ignored
2571 if ``/B`` is part of the list, the articles will be ignored
2573 """
2574 if excluded_fields is None:
2575 excluded_fields = ()
2576 if isinstance(outline_item, (tuple, list, PageRange)):
2577 if isinstance(pages, bool):
2578 if not isinstance(import_outline, bool):
2579 excluded_fields = import_outline
2580 import_outline = pages
2581 pages = outline_item
2582 self.merge(
2583 None,
2584 fileobj,
2585 None,
2586 pages,
2587 import_outline,
2588 excluded_fields,
2589 )
2590 else: # if isinstance(outline_item, str):
2591 self.merge(
2592 None,
2593 fileobj,
2594 outline_item,
2595 pages,
2596 import_outline,
2597 excluded_fields,
2598 )
2600 def merge(
2601 self,
2602 position: Optional[int],
2603 fileobj: Union[Path, StrByteType, PdfReader],
2604 outline_item: Optional[str] = None,
2605 pages: Optional[Union[PageRangeSpec, list[PageObject]]] = None,
2606 import_outline: bool = True,
2607 excluded_fields: Optional[Union[list[str], tuple[str, ...]]] = (),
2608 ) -> None:
2609 """
2610 Merge the pages from the given file into the output file at the
2611 specified page number.
2613 Args:
2614 position: The *page number* to insert this file. File will
2615 be inserted after the given number.
2616 fileobj: A File Object or an object that supports the standard
2617 read and seek methods similar to a File Object. Could also be a
2618 string representing a path to a PDF file.
2619 outline_item: Optionally, you may specify a string to build an outline
2620 (aka 'bookmark') to identify the
2621 beginning of the included file.
2622 pages: can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2623 or a ``(start, stop[, step])`` tuple
2624 or a list of pages to be processed
2625 to merge only the specified range of pages from the source
2626 document into the output document.
2627 import_outline: You may prevent the source document's
2628 outline (collection of outline items, previously referred to as
2629 'bookmarks') from being imported by specifying this as ``False``.
2630 excluded_fields: provide the list of fields/keys to be ignored
2631 if ``/Annots`` is part of the list, the annotation will be ignored
2632 if ``/B`` is part of the list, the articles will be ignored
2634 Raises:
2635 TypeError: The pages attribute is not configured properly
2637 """
2638 if isinstance(fileobj, PdfDocCommon):
2639 reader = fileobj
2640 else:
2641 stream, _encryption_obj = self._create_stream(fileobj)
2642 # Create a new PdfReader instance using the stream
2643 # (either file or BytesIO or StringIO) created above
2644 reader = PdfReader(stream, strict=False) # type: ignore[arg-type]
2646 if excluded_fields is None:
2647 excluded_fields = ()
2648 # Find the range of pages to merge.
2649 if pages is None:
2650 pages = list(range(len(reader.pages)))
2651 elif isinstance(pages, PageRange):
2652 pages = list(range(*pages.indices(len(reader.pages))))
2653 elif isinstance(pages, list):
2654 pass # keep unchanged
2655 elif isinstance(pages, tuple) and len(pages) <= 3:
2656 pages = list(range(*pages))
2657 elif not isinstance(pages, tuple):
2658 raise TypeError(
2659 '"pages" must be a tuple of (start, stop[, step]) or a list'
2660 )
2662 srcpages = {}
2663 for page in pages:
2664 if isinstance(page, PageObject):
2665 pg = page
2666 else:
2667 pg = reader.pages[page]
2668 assert pg.indirect_reference is not None
2669 if position is None:
2670 # numbers in the exclude list identifies that the exclusion is
2671 # only applicable to 1st level of cloning
2672 srcpages[pg.indirect_reference.idnum] = self.add_page(
2673 pg, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2674 )
2675 else:
2676 srcpages[pg.indirect_reference.idnum] = self.insert_page(
2677 pg, position, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2678 )
2679 position += 1
2680 srcpages[pg.indirect_reference.idnum].original_page = pg
2682 reader._named_destinations = (
2683 reader.named_destinations
2684 ) # need for the outline processing below
2686 arr: Any
2688 def _process_named_dests(dest: Any) -> None:
2689 arr = dest.dest_array
2690 if "/Names" in self._root_object and dest["/Title"] in cast(
2691 list[Any],
2692 cast(
2693 DictionaryObject,
2694 cast(DictionaryObject, self._root_object["/Names"]).get("/Dests", DictionaryObject()),
2695 ).get("/Names", DictionaryObject()),
2696 ):
2697 # already exists: should not duplicate it
2698 pass
2699 elif dest["/Page"] is None or isinstance(dest["/Page"], NullObject):
2700 pass
2701 elif isinstance(dest["/Page"], int):
2702 # the page reference is a page number normally not a PDF Reference
2703 # page numbers as int are normally accepted only in external goto
2704 try:
2705 p = reader.pages[dest["/Page"]]
2706 except IndexError:
2707 return
2708 assert p.indirect_reference is not None
2709 try:
2710 arr[NumberObject(0)] = NumberObject(
2711 srcpages[p.indirect_reference.idnum].page_number
2712 )
2713 self.add_named_destination_array(dest["/Title"], arr)
2714 except KeyError:
2715 pass
2716 elif dest["/Page"].indirect_reference.idnum in srcpages:
2717 arr[NumberObject(0)] = srcpages[
2718 dest["/Page"].indirect_reference.idnum
2719 ].indirect_reference
2720 self.add_named_destination_array(dest["/Title"], arr)
2722 for dest in reader._named_destinations.values():
2723 _process_named_dests(dest)
2725 outline_item_typ: TreeObject
2726 if outline_item is not None:
2727 outline_item_typ = cast(
2728 "TreeObject",
2729 self.add_outline_item(
2730 TextStringObject(outline_item),
2731 next(iter(srcpages.values())).indirect_reference,
2732 fit=PAGE_FIT,
2733 ).get_object(),
2734 )
2735 else:
2736 outline_item_typ = self.get_outline_root()
2738 _ro = reader.root_object
2739 if import_outline and CO.OUTLINES in _ro:
2740 outline = self._get_filtered_outline(
2741 _ro.get(CO.OUTLINES, None), srcpages, reader
2742 )
2743 self._insert_filtered_outline(
2744 outline, outline_item_typ, None
2745 ) # TODO: use before parameter
2747 if "/Annots" not in excluded_fields:
2748 for pag in srcpages.values():
2749 lst = self._insert_filtered_annotations(
2750 pag.original_page.get("/Annots", []), pag, srcpages, reader
2751 )
2752 if len(lst) > 0:
2753 pag[NameObject("/Annots")] = lst
2754 self.clean_page(pag)
2756 if "/AcroForm" in _ro and _ro["/AcroForm"] is not None:
2757 if "/AcroForm" not in self._root_object:
2758 self._root_object[NameObject("/AcroForm")] = self._add_object(
2759 cast(
2760 DictionaryObject,
2761 reader.root_object["/AcroForm"],
2762 ).clone(self, False, ("/Fields",))
2763 )
2764 arr = ArrayObject()
2765 else:
2766 arr = cast(
2767 ArrayObject,
2768 cast(DictionaryObject, self._root_object["/AcroForm"])["/Fields"],
2769 )
2770 trslat = self._id_translated[id(reader)]
2771 try:
2772 for f in reader.root_object["/AcroForm"]["/Fields"]: # type: ignore
2773 try:
2774 ind = IndirectObject(trslat[f.idnum], 0, self)
2775 if ind not in arr:
2776 arr.append(ind)
2777 except KeyError:
2778 # for trslat[] which mean the field has not be copied
2779 # through the page
2780 pass
2781 except KeyError: # for /Acroform or /Fields are not existing
2782 arr = self._add_object(ArrayObject())
2783 cast(DictionaryObject, self._root_object["/AcroForm"])[
2784 NameObject("/Fields")
2785 ] = arr
2787 if "/B" not in excluded_fields:
2788 self.add_filtered_articles("", srcpages, reader)
2790 def _add_articles_thread(
2791 self,
2792 thread: DictionaryObject, # thread entry from the reader's array of threads
2793 pages: dict[int, PageObject],
2794 reader: PdfReader,
2795 ) -> IndirectObject:
2796 """
2797 Clone the thread with only the applicable articles.
2799 Args:
2800 thread:
2801 pages:
2802 reader:
2804 Returns:
2805 The added thread as an indirect reference
2807 """
2808 nthread = thread.clone(
2809 self, force_duplicate=True, ignore_fields=("/F",)
2810 ) # use of clone to keep link between reader and writer
2811 self.threads.append(nthread.indirect_reference)
2812 first_article = cast("DictionaryObject", thread["/F"])
2813 current_article: Optional[DictionaryObject] = first_article
2814 new_article: Optional[DictionaryObject] = None
2815 while current_article is not None:
2816 pag = self._get_cloned_page(
2817 cast("PageObject", current_article["/P"]), pages, reader
2818 )
2819 if pag is not None:
2820 if new_article is None:
2821 new_article = cast(
2822 "DictionaryObject",
2823 self._add_object(DictionaryObject()).get_object(),
2824 )
2825 new_first = new_article
2826 nthread[NameObject("/F")] = new_article.indirect_reference
2827 else:
2828 new_article2 = cast(
2829 "DictionaryObject",
2830 self._add_object(
2831 DictionaryObject(
2832 {NameObject("/V"): new_article.indirect_reference}
2833 )
2834 ).get_object(),
2835 )
2836 new_article[NameObject("/N")] = new_article2.indirect_reference
2837 new_article = new_article2
2838 new_article[NameObject("/P")] = pag
2839 new_article[NameObject("/T")] = nthread.indirect_reference
2840 new_article[NameObject("/R")] = current_article["/R"]
2841 pag_obj = cast("PageObject", pag.get_object())
2842 if "/B" not in pag_obj:
2843 pag_obj[NameObject("/B")] = ArrayObject()
2844 cast("ArrayObject", pag_obj["/B"]).append(
2845 new_article.indirect_reference
2846 )
2847 current_article = cast("DictionaryObject", current_article["/N"])
2848 if current_article == first_article:
2849 new_article[NameObject("/N")] = new_first.indirect_reference # type: ignore
2850 new_first[NameObject("/V")] = new_article.indirect_reference # type: ignore
2851 current_article = None
2852 assert nthread.indirect_reference is not None
2853 return nthread.indirect_reference
2855 def add_filtered_articles(
2856 self,
2857 fltr: Union[
2858 Pattern[Any], str
2859 ], # thread entry from the reader's array of threads
2860 pages: dict[int, PageObject],
2861 reader: PdfReader,
2862 ) -> None:
2863 """
2864 Add articles matching the defined criteria.
2866 Args:
2867 fltr:
2868 pages:
2869 reader:
2871 """
2872 if isinstance(fltr, str):
2873 fltr = re.compile(fltr)
2874 elif not isinstance(fltr, Pattern):
2875 fltr = re.compile("")
2876 for p in pages.values():
2877 pp = p.original_page
2878 for a in pp.get("/B", ()):
2879 a_obj = a.get_object()
2880 if is_null_or_none(a_obj):
2881 continue
2882 thr = a_obj.get("/T")
2883 if thr is None:
2884 continue
2885 thr = thr.get_object()
2886 if thr.indirect_reference.idnum not in self._id_translated[
2887 id(reader)
2888 ] and fltr.search((thr.get("/I", {})).get("/Title", "")):
2889 self._add_articles_thread(thr, pages, reader)
2891 def _get_cloned_page(
2892 self,
2893 page: Union[None, IndirectObject, PageObject, NullObject],
2894 pages: dict[int, PageObject],
2895 reader: PdfReader,
2896 ) -> Optional[IndirectObject]:
2897 if isinstance(page, NullObject):
2898 return None
2899 if isinstance(page, DictionaryObject) and page.get("/Type", "") == "/Page":
2900 _i = page.indirect_reference
2901 elif isinstance(page, IndirectObject):
2902 _i = page
2903 try:
2904 return pages[_i.idnum].indirect_reference # type: ignore
2905 except Exception:
2906 return None
2908 def _insert_filtered_annotations(
2909 self,
2910 annots: Union[IndirectObject, list[DictionaryObject], None],
2911 page: PageObject,
2912 pages: dict[int, PageObject],
2913 reader: PdfReader,
2914 ) -> list[Destination]:
2915 outlist = ArrayObject()
2916 if isinstance(annots, IndirectObject):
2917 annots = cast("list[Any]", annots.get_object())
2918 if annots is None:
2919 return outlist
2920 if not isinstance(annots, list):
2921 logger_warning(f"Expected list of annotations, got {annots} of type {annots.__class__.__name__}.", __name__)
2922 return outlist
2923 for an in annots:
2924 ano = cast("DictionaryObject", an.get_object())
2925 if (
2926 ano["/Subtype"] != "/Link"
2927 or "/A" not in ano
2928 or cast("DictionaryObject", ano["/A"])["/S"] != "/GoTo"
2929 or "/Dest" in ano
2930 ):
2931 if "/Dest" not in ano:
2932 outlist.append(self._add_object(ano.clone(self)))
2933 else:
2934 d = ano["/Dest"]
2935 if isinstance(d, str):
2936 # it is a named dest
2937 if str(d) in self.get_named_dest_root():
2938 outlist.append(ano.clone(self).indirect_reference)
2939 else:
2940 d = cast("ArrayObject", d)
2941 p = self._get_cloned_page(d[0], pages, reader)
2942 if p is not None:
2943 anc = ano.clone(self, ignore_fields=("/Dest",))
2944 anc[NameObject("/Dest")] = ArrayObject([p, *d[1:]])
2945 outlist.append(self._add_object(anc))
2946 else:
2947 d = cast("DictionaryObject", ano["/A"]).get("/D", NullObject())
2948 if d is None or isinstance(d, NullObject):
2949 continue
2950 if isinstance(d, str):
2951 # it is a named dest
2952 if str(d) in self.get_named_dest_root():
2953 outlist.append(ano.clone(self).indirect_reference)
2954 else:
2955 d = cast("ArrayObject", d)
2956 p = self._get_cloned_page(d[0], pages, reader)
2957 if p is not None:
2958 anc = ano.clone(self, ignore_fields=("/D",))
2959 cast("DictionaryObject", anc["/A"])[
2960 NameObject("/D")
2961 ] = ArrayObject([p, *d[1:]])
2962 outlist.append(self._add_object(anc))
2963 return outlist
2965 def _get_filtered_outline(
2966 self,
2967 node: Any,
2968 pages: dict[int, PageObject],
2969 reader: PdfReader,
2970 ) -> list[Destination]:
2971 """
2972 Extract outline item entries that are part of the specified page set.
2974 Args:
2975 node:
2976 pages:
2977 reader:
2979 Returns:
2980 A list of destination objects.
2982 """
2983 new_outline = []
2984 if node is None:
2985 node = NullObject()
2986 node = node.get_object()
2987 if is_null_or_none(node):
2988 node = DictionaryObject()
2989 if node.get("/Type", "") == "/Outlines" or "/Title" not in node:
2990 node = node.get("/First", None)
2991 if node is not None:
2992 node = node.get_object()
2993 new_outline += self._get_filtered_outline(node, pages, reader)
2994 else:
2995 v: Union[None, IndirectObject, NullObject]
2996 while node is not None:
2997 node = node.get_object()
2998 o = cast("Destination", reader._build_outline_item(node))
2999 v = self._get_cloned_page(cast("PageObject", o["/Page"]), pages, reader)
3000 if v is None:
3001 v = NullObject()
3002 o[NameObject("/Page")] = v
3003 if "/First" in node:
3004 o._filtered_children = self._get_filtered_outline(
3005 node["/First"], pages, reader
3006 )
3007 else:
3008 o._filtered_children = []
3009 if (
3010 not isinstance(o["/Page"], NullObject)
3011 or len(o._filtered_children) > 0
3012 ):
3013 new_outline.append(o)
3014 node = node.get("/Next", None)
3015 return new_outline
3017 def _clone_outline(self, dest: Destination) -> TreeObject:
3018 n_ol = TreeObject()
3019 self._add_object(n_ol)
3020 n_ol[NameObject("/Title")] = TextStringObject(dest["/Title"])
3021 if not isinstance(dest["/Page"], NullObject):
3022 if dest.node is not None and "/A" in dest.node:
3023 n_ol[NameObject("/A")] = dest.node["/A"].clone(self)
3024 else:
3025 n_ol[NameObject("/Dest")] = dest.dest_array
3026 # TODO: /SE
3027 if dest.node is not None:
3028 n_ol[NameObject("/F")] = NumberObject(dest.node.get("/F", 0))
3029 n_ol[NameObject("/C")] = ArrayObject(
3030 dest.node.get(
3031 "/C", [FloatObject(0.0), FloatObject(0.0), FloatObject(0.0)]
3032 )
3033 )
3034 return n_ol
3036 def _insert_filtered_outline(
3037 self,
3038 outlines: list[Destination],
3039 parent: Union[TreeObject, IndirectObject],
3040 before: Union[None, TreeObject, IndirectObject] = None,
3041 ) -> None:
3042 for dest in outlines:
3043 # TODO: can be improved to keep A and SE entries (ignored for the moment)
3044 # with np=self.add_outline_item_destination(dest,parent,before)
3045 if dest.get("/Type", "") == "/Outlines" or "/Title" not in dest:
3046 np = parent
3047 else:
3048 np = self._clone_outline(dest)
3049 cast(TreeObject, parent.get_object()).insert_child(np, before, self)
3050 self._insert_filtered_outline(dest._filtered_children, np, None)
3052 def close(self) -> None:
3053 """Implemented for API harmonization."""
3054 return
3056 def find_outline_item(
3057 self,
3058 outline_item: dict[str, Any],
3059 root: Optional[OutlineType] = None,
3060 ) -> Optional[list[int]]:
3061 if root is None:
3062 o = self.get_outline_root()
3063 else:
3064 o = cast("TreeObject", root)
3066 i = 0
3067 while o is not None:
3068 if (
3069 o.indirect_reference == outline_item
3070 or o.get("/Title", None) == outline_item
3071 ):
3072 return [i]
3073 if "/First" in o:
3074 res = self.find_outline_item(
3075 outline_item, cast(OutlineType, o["/First"])
3076 )
3077 if res:
3078 return ([i] if "/Title" in o else []) + res
3079 if "/Next" in o:
3080 i += 1
3081 o = cast(TreeObject, o["/Next"])
3082 else:
3083 return None
3084 raise PyPdfError("This line is theoretically unreachable.") # pragma: no cover
3086 def reset_translation(
3087 self, reader: Union[None, PdfReader, IndirectObject] = None
3088 ) -> None:
3089 """
3090 Reset the translation table between reader and the writer object.
3092 Late cloning will create new independent objects.
3094 Args:
3095 reader: PdfReader or IndirectObject referencing a PdfReader object.
3096 if set to None or omitted, all tables will be reset.
3098 """
3099 if reader is None:
3100 self._id_translated = {}
3101 elif isinstance(reader, PdfReader):
3102 try:
3103 del self._id_translated[id(reader)]
3104 except Exception:
3105 pass
3106 elif isinstance(reader, IndirectObject):
3107 try:
3108 del self._id_translated[id(reader.pdf)]
3109 except Exception:
3110 pass
3111 else:
3112 raise Exception("invalid parameter {reader}")
3114 def set_page_label(
3115 self,
3116 page_index_from: int,
3117 page_index_to: int,
3118 style: Optional[PageLabelStyle] = None,
3119 prefix: Optional[str] = None,
3120 start: Optional[int] = 0,
3121 ) -> None:
3122 """
3123 Set a page label to a range of pages.
3125 Page indexes must be given starting from 0.
3126 Labels must have a style, a prefix or both.
3127 If a range is not assigned any page label, a decimal label starting from 1 is applied.
3129 Args:
3130 page_index_from: page index of the beginning of the range starting from 0
3131 page_index_to: page index of the beginning of the range starting from 0
3132 style: The numbering style to be used for the numeric portion of each page label:
3134 * ``/D`` Decimal Arabic numerals
3135 * ``/R`` Uppercase Roman numerals
3136 * ``/r`` Lowercase Roman numerals
3137 * ``/A`` Uppercase letters (A to Z for the first 26 pages,
3138 AA to ZZ for the next 26, and so on)
3139 * ``/a`` Lowercase letters (a to z for the first 26 pages,
3140 aa to zz for the next 26, and so on)
3142 prefix: The label prefix for page labels in this range.
3143 start: The value of the numeric portion for the first page label
3144 in the range.
3145 Subsequent pages are numbered sequentially from this value,
3146 which must be greater than or equal to 1.
3147 Default value: 1.
3149 """
3150 if style is None and prefix is None:
3151 raise ValueError("At least one of style and prefix must be given")
3152 if page_index_from < 0:
3153 raise ValueError("page_index_from must be greater or equal than 0")
3154 if page_index_to < page_index_from:
3155 raise ValueError(
3156 "page_index_to must be greater or equal than page_index_from"
3157 )
3158 if page_index_to >= len(self.pages):
3159 raise ValueError("page_index_to exceeds number of pages")
3160 if start is not None and start != 0 and start < 1:
3161 raise ValueError("If given, start must be greater or equal than one")
3163 self._set_page_label(page_index_from, page_index_to, style, prefix, start)
3165 def _set_page_label(
3166 self,
3167 page_index_from: int,
3168 page_index_to: int,
3169 style: Optional[PageLabelStyle] = None,
3170 prefix: Optional[str] = None,
3171 start: Optional[int] = 0,
3172 ) -> None:
3173 """
3174 Set a page label to a range of pages.
3176 Page indexes must be given starting from 0.
3177 Labels must have a style, a prefix or both.
3178 If a range is not assigned any page label a decimal label starting from 1 is applied.
3180 Args:
3181 page_index_from: page index of the beginning of the range starting from 0
3182 page_index_to: page index of the beginning of the range starting from 0
3183 style: The numbering style to be used for the numeric portion of each page label:
3184 /D Decimal Arabic numerals
3185 /R Uppercase Roman numerals
3186 /r Lowercase Roman numerals
3187 /A Uppercase letters (A to Z for the first 26 pages,
3188 AA to ZZ for the next 26, and so on)
3189 /a Lowercase letters (a to z for the first 26 pages,
3190 aa to zz for the next 26, and so on)
3191 prefix: The label prefix for page labels in this range.
3192 start: The value of the numeric portion for the first page label
3193 in the range.
3194 Subsequent pages are numbered sequentially from this value,
3195 which must be greater than or equal to 1. Default value: 1.
3197 """
3198 default_page_label = DictionaryObject()
3199 default_page_label[NameObject("/S")] = NameObject("/D")
3201 new_page_label = DictionaryObject()
3202 if style is not None:
3203 new_page_label[NameObject("/S")] = NameObject(style)
3204 if prefix is not None:
3205 new_page_label[NameObject("/P")] = TextStringObject(prefix)
3206 if start != 0:
3207 new_page_label[NameObject("/St")] = NumberObject(start)
3209 if NameObject(CatalogDictionary.PAGE_LABELS) not in self._root_object:
3210 nums = ArrayObject()
3211 nums_insert(NumberObject(0), default_page_label, nums)
3212 page_labels = TreeObject()
3213 page_labels[NameObject("/Nums")] = nums
3214 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3216 page_labels = cast(
3217 TreeObject, self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)]
3218 )
3219 nums = cast(ArrayObject, page_labels[NameObject("/Nums")])
3221 nums_insert(NumberObject(page_index_from), new_page_label, nums)
3222 nums_clear_range(NumberObject(page_index_from), page_index_to, nums)
3223 next_label_pos, *_ = nums_next(NumberObject(page_index_from), nums)
3224 if next_label_pos != page_index_to + 1 and page_index_to + 1 < len(self.pages):
3225 nums_insert(NumberObject(page_index_to + 1), default_page_label, nums)
3227 page_labels[NameObject("/Nums")] = nums
3228 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3230 def _repr_mimebundle_(
3231 self,
3232 include: Union[None, Iterable[str]] = None,
3233 exclude: Union[None, Iterable[str]] = None,
3234 ) -> dict[str, Any]:
3235 """
3236 Integration into Jupyter Notebooks.
3238 This method returns a dictionary that maps a mime-type to its
3239 representation.
3241 .. seealso::
3243 https://ipython.readthedocs.io/en/stable/config/integrating.html
3244 """
3245 pdf_data = BytesIO()
3246 self.write(pdf_data)
3247 data = {
3248 "application/pdf": pdf_data,
3249 }
3251 if include is not None:
3252 # Filter representations based on include list
3253 data = {k: v for k, v in data.items() if k in include}
3255 if exclude is not None:
3256 # Remove representations based on exclude list
3257 data = {k: v for k, v in data.items() if k not in exclude}
3259 return data
3262def _pdf_objectify(obj: Union[dict[str, Any], str, float, list[Any]]) -> PdfObject:
3263 if isinstance(obj, PdfObject):
3264 return obj
3265 if isinstance(obj, dict):
3266 to_add = DictionaryObject()
3267 for key, value in obj.items():
3268 to_add[NameObject(key)] = _pdf_objectify(value)
3269 return to_add
3270 if isinstance(obj, str):
3271 if obj.startswith("/"):
3272 return NameObject(obj)
3273 return TextStringObject(obj)
3274 if isinstance(obj, (float, int)):
3275 return FloatObject(obj)
3276 if isinstance(obj, list):
3277 return ArrayObject(_pdf_objectify(i) for i in obj)
3278 raise NotImplementedError(
3279 f"{type(obj)=} could not be cast to a PdfObject"
3280 )
3283def _create_outline_item(
3284 action_ref: Union[None, IndirectObject],
3285 title: str,
3286 color: Union[tuple[float, float, float], str, None],
3287 italic: bool,
3288 bold: bool,
3289) -> TreeObject:
3290 outline_item = TreeObject()
3291 if action_ref is not None:
3292 outline_item[NameObject("/A")] = action_ref
3293 outline_item.update(
3294 {
3295 NameObject("/Title"): create_string_object(title),
3296 }
3297 )
3298 if color:
3299 if isinstance(color, str):
3300 color = hex_to_rgb(color)
3301 outline_item.update(
3302 {NameObject("/C"): ArrayObject([FloatObject(c) for c in color])}
3303 )
3304 if italic or bold:
3305 format_flag = 0
3306 if italic:
3307 format_flag += OutlineFontFlag.italic
3308 if bold:
3309 format_flag += OutlineFontFlag.bold
3310 outline_item.update({NameObject("/F"): NumberObject(format_flag)})
3311 return outline_item