Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_writer.py: 15%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com>
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15# * The name of the author may not be used to endorse or promote products
16# derived from this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
30import decimal
31import enum
32import hashlib
33import re
34import struct
35import uuid
36from io import BytesIO, FileIO, IOBase
37from itertools import compress
38from pathlib import Path
39from types import TracebackType
40from typing import (
41 IO,
42 Any,
43 Callable,
44 Dict,
45 Iterable,
46 List,
47 Optional,
48 Pattern,
49 Tuple,
50 Type,
51 Union,
52 cast,
53)
55from ._cmap import _default_fonts_space_width, build_char_map_from_dict
56from ._doc_common import DocumentInformation, PdfDocCommon
57from ._encryption import EncryptAlgorithm, Encryption
58from ._page import PageObject
59from ._page_labels import nums_clear_range, nums_insert, nums_next
60from ._reader import PdfReader
61from ._utils import (
62 StrByteType,
63 StreamType,
64 _get_max_pdf_version_header,
65 deprecate,
66 deprecate_no_replacement,
67 deprecation_with_replacement,
68 logger_warning,
69)
70from .constants import AnnotationDictionaryAttributes as AA
71from .constants import CatalogAttributes as CA
72from .constants import (
73 CatalogDictionary,
74 FileSpecificationDictionaryEntries,
75 GoToActionArguments,
76 ImageType,
77 InteractiveFormDictEntries,
78 OutlineFontFlag,
79 PageLabelStyle,
80 TypFitArguments,
81 UserAccessPermissions,
82)
83from .constants import Core as CO
84from .constants import FieldDictionaryAttributes as FA
85from .constants import PageAttributes as PG
86from .constants import PagesAttributes as PA
87from .constants import TrailerKeys as TK
88from .errors import PyPdfError
89from .generic import (
90 PAGE_FIT,
91 ArrayObject,
92 BooleanObject,
93 ByteStringObject,
94 ContentStream,
95 DecodedStreamObject,
96 Destination,
97 DictionaryObject,
98 Fit,
99 FloatObject,
100 IndirectObject,
101 NameObject,
102 NullObject,
103 NumberObject,
104 PdfObject,
105 RectangleObject,
106 StreamObject,
107 TextStringObject,
108 TreeObject,
109 ViewerPreferences,
110 create_string_object,
111 hex_to_rgb,
112 is_null_or_none,
113)
114from .pagerange import PageRange, PageRangeSpec
115from .types import (
116 AnnotationSubtype,
117 BorderArrayType,
118 LayoutType,
119 OutlineItemType,
120 OutlineType,
121 PagemodeType,
122)
123from .xmp import XmpInformation
125ALL_DOCUMENT_PERMISSIONS = UserAccessPermissions.all()
126DEFAULT_FONT_HEIGHT_IN_MULTILINE = 12
129class ObjectDeletionFlag(enum.IntFlag):
130 NONE = 0
131 TEXT = enum.auto()
132 LINKS = enum.auto()
133 ATTACHMENTS = enum.auto()
134 OBJECTS_3D = enum.auto()
135 ALL_ANNOTATIONS = enum.auto()
136 XOBJECT_IMAGES = enum.auto()
137 INLINE_IMAGES = enum.auto()
138 DRAWING_IMAGES = enum.auto()
139 IMAGES = XOBJECT_IMAGES | INLINE_IMAGES | DRAWING_IMAGES
142def _rolling_checksum(stream: BytesIO, blocksize: int = 65536) -> str:
143 hash = hashlib.md5()
144 for block in iter(lambda: stream.read(blocksize), b""):
145 hash.update(block)
146 return hash.hexdigest()
149class PdfWriter(PdfDocCommon):
150 """
151 Write a PDF file out, given pages produced by another class or through
152 cloning a PDF file during initialization.
154 Typically data is added from a :class:`PdfReader<pypdf.PdfReader>`.
156 Args:
157 clone_from: identical to fileobj (for compatibility)
159 incremental: If true, loads the document and set the PdfWriter in incremental mode.
161 When writing incrementally, the original document is written first and new/modified
162 content is appended. To be used for signed document/forms to keep signature valid.
164 full: If true, loads all the objects (always full if incremental = True).
165 This parameter may allow loading large PDFs.
167 """
169 def __init__(
170 self,
171 fileobj: Union[None, PdfReader, StrByteType, Path] = "",
172 clone_from: Union[None, PdfReader, StrByteType, Path] = None,
173 incremental: bool = False,
174 full: bool = False,
175 ) -> None:
176 self.incremental = incremental or full
177 """
178 Returns if the PdfWriter object has been started in incremental mode.
179 """
181 self._objects: List[Optional[PdfObject]] = []
182 """
183 The indirect objects in the PDF.
184 For the incremental case, it will be filled with None
185 in clone_reader_document_root.
186 """
188 self._original_hash: List[int] = []
189 """
190 List of hashes after import; used to identify changes.
191 """
193 self._idnum_hash: Dict[bytes, Tuple[IndirectObject, List[IndirectObject]]] = {}
194 """
195 Maps hash values of indirect objects to the list of IndirectObjects.
196 This is used for compression.
197 """
199 self._id_translated: Dict[int, Dict[int, int]] = {}
200 """List of already translated IDs.
201 dict[id(pdf)][(idnum, generation)]
202 """
204 self._info_obj: Optional[PdfObject]
205 """The PDF files's document information dictionary,
206 the Info entry in the PDF file's trailer dictionary."""
208 self._ID: Union[ArrayObject, None] = None
209 """The PDF file identifier,
210 defined by the ID in the PDF file's trailer dictionary."""
212 if self.incremental:
213 if isinstance(fileobj, (str, Path)):
214 with open(fileobj, "rb") as f:
215 fileobj = BytesIO(f.read(-1))
216 if isinstance(fileobj, BytesIO):
217 fileobj = PdfReader(fileobj)
218 if not isinstance(fileobj, PdfReader):
219 raise PyPdfError("Invalid type for incremental mode")
220 self._reader = fileobj # prev content is in _reader.stream
221 self._header = fileobj.pdf_header.encode()
222 self._readonly = True # TODO: to be analysed
223 else:
224 self._header = b"%PDF-1.3"
225 self._info_obj = self._add_object(
226 DictionaryObject(
227 {NameObject("/Producer"): create_string_object("pypdf")}
228 )
229 )
231 def _get_clone_from(
232 fileobj: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
233 clone_from: Union[None, PdfReader, str, Path, IO[Any], BytesIO],
234 ) -> Union[None, PdfReader, str, Path, IO[Any], BytesIO]:
235 if isinstance(fileobj, (str, Path, IO, BytesIO)) and (
236 fileobj == "" or clone_from is not None
237 ):
238 return clone_from
239 cloning = True
240 if isinstance(fileobj, (str, Path)) and (
241 not Path(str(fileobj)).exists()
242 or Path(str(fileobj)).stat().st_size == 0
243 ):
244 cloning = False
245 if isinstance(fileobj, (IOBase, BytesIO)):
246 t = fileobj.tell()
247 if fileobj.seek(0, 2) == 0:
248 cloning = False
249 fileobj.seek(t, 0)
250 if cloning:
251 clone_from = fileobj
252 return clone_from
254 clone_from = _get_clone_from(fileobj, clone_from)
255 # To prevent overwriting
256 self.temp_fileobj = fileobj
257 self.fileobj = ""
258 self._with_as_usage = False
259 self._cloned = False
260 # The root of our page tree node
261 pages = DictionaryObject(
262 {
263 NameObject(PA.TYPE): NameObject("/Pages"),
264 NameObject(PA.COUNT): NumberObject(0),
265 NameObject(PA.KIDS): ArrayObject(),
266 }
267 )
268 self.flattened_pages = []
269 self._encryption: Optional[Encryption] = None
270 self._encrypt_entry: Optional[DictionaryObject] = None
272 if clone_from is not None:
273 if not isinstance(clone_from, PdfReader):
274 clone_from = PdfReader(clone_from)
275 self.clone_document_from_reader(clone_from)
276 self._cloned = True
277 else:
278 self._pages = self._add_object(pages)
279 self._root_object = DictionaryObject(
280 {
281 NameObject(PA.TYPE): NameObject(CO.CATALOG),
282 NameObject(CO.PAGES): self._pages,
283 }
284 )
285 self._add_object(self._root_object)
286 if full and not incremental:
287 self.incremental = False
288 if isinstance(self._ID, list):
289 if isinstance(self._ID[0], TextStringObject):
290 self._ID[0] = ByteStringObject(self._ID[0].get_original_bytes())
291 if isinstance(self._ID[1], TextStringObject):
292 self._ID[1] = ByteStringObject(self._ID[1].get_original_bytes())
294 # for commonality
295 @property
296 def is_encrypted(self) -> bool:
297 """
298 Read-only boolean property showing whether this PDF file is encrypted.
300 Note that this property, if true, will remain true even after the
301 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
302 """
303 return False
305 @property
306 def root_object(self) -> DictionaryObject:
307 """
308 Provide direct access to PDF Structure.
310 Note:
311 Recommended only for read access.
313 """
314 return self._root_object
316 @property
317 def _info(self) -> Optional[DictionaryObject]:
318 """
319 Provide access to "/Info". Standardized with PdfReader.
321 Returns:
322 /Info Dictionary; None if the entry does not exist
324 """
325 return (
326 None
327 if self._info_obj is None
328 else cast(DictionaryObject, self._info_obj.get_object())
329 )
331 @_info.setter
332 def _info(self, value: Optional[Union[IndirectObject, DictionaryObject]]) -> None:
333 if value is None:
334 try:
335 self._objects[self._info_obj.indirect_reference.idnum - 1] = None # type: ignore
336 except (KeyError, AttributeError):
337 pass
338 self._info_obj = None
339 else:
340 if self._info_obj is None:
341 self._info_obj = self._add_object(DictionaryObject())
342 obj = cast(DictionaryObject, self._info_obj.get_object())
343 obj.clear()
344 obj.update(cast(DictionaryObject, value.get_object()))
346 @property
347 def xmp_metadata(self) -> Optional[XmpInformation]:
348 """XMP (Extensible Metadata Platform) data."""
349 return cast(XmpInformation, self.root_object.xmp_metadata)
351 @xmp_metadata.setter
352 def xmp_metadata(self, value: Optional[XmpInformation]) -> None:
353 """XMP (Extensible Metadata Platform) data."""
354 if value is None:
355 if "/Metadata" in self.root_object:
356 del self.root_object["/Metadata"]
357 else:
358 self.root_object[NameObject("/Metadata")] = value
360 return self.root_object.xmp_metadata # type: ignore
362 @property
363 def with_as_usage(self) -> bool:
364 deprecate_no_replacement("with_as_usage", "6.0")
365 return self._with_as_usage
367 @with_as_usage.setter
368 def with_as_usage(self, value: bool) -> None:
369 deprecate_no_replacement("with_as_usage", "6.0")
370 self._with_as_usage = value
372 def __enter__(self) -> "PdfWriter":
373 """Store how writer is initialized by 'with'."""
374 c: bool = self._cloned
375 t = self.temp_fileobj
376 self.__init__() # type: ignore
377 self._cloned = c
378 self._with_as_usage = True
379 self.fileobj = t # type: ignore
380 return self
382 def __exit__(
383 self,
384 exc_type: Optional[Type[BaseException]],
385 exc: Optional[BaseException],
386 traceback: Optional[TracebackType],
387 ) -> None:
388 """Write data to the fileobj."""
389 if self.fileobj and not self._cloned:
390 self.write(self.fileobj)
392 @property
393 def pdf_header(self) -> str:
394 """
395 Read/Write property of the PDF header that is written.
397 This should be something like ``'%PDF-1.5'``. It is recommended to set
398 the lowest version that supports all features which are used within the
399 PDF file.
401 Note: `pdf_header` returns a string but accepts bytes or str for writing
402 """
403 return self._header.decode()
405 @pdf_header.setter
406 def pdf_header(self, new_header: Union[str, bytes]) -> None:
407 if isinstance(new_header, str):
408 new_header = new_header.encode()
409 self._header = new_header
411 def _add_object(self, obj: PdfObject) -> IndirectObject:
412 if (
413 getattr(obj, "indirect_reference", None) is not None
414 and obj.indirect_reference.pdf == self # type: ignore
415 ):
416 return obj.indirect_reference # type: ignore
417 # check for /Contents in Pages (/Contents in annotations are strings)
418 if isinstance(obj, DictionaryObject) and isinstance(
419 obj.get(PG.CONTENTS, None), (ArrayObject, DictionaryObject)
420 ):
421 obj[NameObject(PG.CONTENTS)] = self._add_object(obj[PG.CONTENTS])
422 self._objects.append(obj)
423 obj.indirect_reference = IndirectObject(len(self._objects), 0, self)
424 return obj.indirect_reference
426 def get_object(
427 self,
428 indirect_reference: Union[int, IndirectObject],
429 ) -> PdfObject:
430 if isinstance(indirect_reference, int):
431 obj = self._objects[indirect_reference - 1]
432 elif indirect_reference.pdf != self:
433 raise ValueError("PDF must be self")
434 else:
435 obj = self._objects[indirect_reference.idnum - 1]
436 assert obj is not None, "mypy"
437 return obj
439 def _replace_object(
440 self,
441 indirect_reference: Union[int, IndirectObject],
442 obj: PdfObject,
443 ) -> PdfObject:
444 if isinstance(indirect_reference, IndirectObject):
445 if indirect_reference.pdf != self:
446 raise ValueError("PDF must be self")
447 indirect_reference = indirect_reference.idnum
448 gen = self._objects[indirect_reference - 1].indirect_reference.generation # type: ignore
449 if (
450 getattr(obj, "indirect_reference", None) is not None
451 and obj.indirect_reference.pdf != self # type: ignore
452 ):
453 obj = obj.clone(self)
454 self._objects[indirect_reference - 1] = obj
455 obj.indirect_reference = IndirectObject(indirect_reference, gen, self)
457 assert isinstance(obj, PdfObject), "mypy"
458 return obj
460 def _add_page(
461 self,
462 page: PageObject,
463 index: int,
464 excluded_keys: Iterable[str] = (),
465 ) -> PageObject:
466 if not isinstance(page, PageObject) or page.get(PA.TYPE, None) != CO.PAGE:
467 raise ValueError("Invalid page object")
468 assert self.flattened_pages is not None, "for mypy"
469 page_org = page
470 excluded_keys = list(excluded_keys)
471 excluded_keys += [PA.PARENT, "/StructParents"]
472 # Acrobat does not accept two indirect references pointing on the same
473 # page; therefore in order to add multiple copies of the same
474 # page, we need to create a new dictionary for the page, however the
475 # objects below (including content) are not duplicated:
476 try: # delete an already existing page
477 del self._id_translated[id(page_org.indirect_reference.pdf)][ # type: ignore
478 page_org.indirect_reference.idnum # type: ignore
479 ]
480 except Exception:
481 pass
482 page = cast(
483 "PageObject", page_org.clone(self, False, excluded_keys).get_object()
484 )
485 if page_org.pdf is not None:
486 other = page_org.pdf.pdf_header
487 self.pdf_header = _get_max_pdf_version_header(self.pdf_header, other)
488 node, idx = self._get_page_in_node(index)
489 page[NameObject(PA.PARENT)] = node.indirect_reference
491 if idx >= 0:
492 cast(ArrayObject, node[PA.KIDS]).insert(idx, page.indirect_reference)
493 self.flattened_pages.insert(index, page)
494 else:
495 cast(ArrayObject, node[PA.KIDS]).append(page.indirect_reference)
496 self.flattened_pages.append(page)
497 recurse = 0
498 while not is_null_or_none(node):
499 node = cast(DictionaryObject, node.get_object())
500 node[NameObject(PA.COUNT)] = NumberObject(cast(int, node[PA.COUNT]) + 1)
501 node = node.get(PA.PARENT, None) # type: ignore[assignment] # TODO: Fix.
502 recurse += 1
503 if recurse > 1000:
504 raise PyPdfError("Too many recursive calls!")
505 return page
507 def set_need_appearances_writer(self, state: bool = True) -> None:
508 """
509 Sets the "NeedAppearances" flag in the PDF writer.
511 The "NeedAppearances" flag indicates whether the appearance dictionary
512 for form fields should be automatically generated by the PDF viewer or
513 if the embedded appearance should be used.
515 Args:
516 state: The actual value of the NeedAppearances flag.
518 Returns:
519 None
521 """
522 # See §12.7.2 and §7.7.2 for more information:
523 # https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
524 try:
525 # get the AcroForm tree
526 if CatalogDictionary.ACRO_FORM not in self._root_object:
527 self._root_object[
528 NameObject(CatalogDictionary.ACRO_FORM)
529 ] = self._add_object(DictionaryObject())
531 need_appearances = NameObject(InteractiveFormDictEntries.NeedAppearances)
532 cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])[
533 need_appearances
534 ] = BooleanObject(state)
535 except Exception as exc: # pragma: no cover
536 logger_warning(
537 f"set_need_appearances_writer({state}) catch : {exc}", __name__
538 )
540 def create_viewer_preferences(self) -> ViewerPreferences:
541 o = ViewerPreferences()
542 self._root_object[
543 NameObject(CatalogDictionary.VIEWER_PREFERENCES)
544 ] = self._add_object(o)
545 return o
547 def add_page(
548 self,
549 page: PageObject,
550 excluded_keys: Iterable[str] = (),
551 ) -> PageObject:
552 """
553 Add a page to this PDF file.
555 Recommended for advanced usage including the adequate excluded_keys.
557 The page is usually acquired from a :class:`PdfReader<pypdf.PdfReader>`
558 instance.
560 Args:
561 page: The page to add to the document. Should be
562 an instance of :class:`PageObject<pypdf._page.PageObject>`
563 excluded_keys:
565 Returns:
566 The added PageObject.
568 """
569 assert self.flattened_pages is not None, "mypy"
570 return self._add_page(page, len(self.flattened_pages), excluded_keys)
572 def insert_page(
573 self,
574 page: PageObject,
575 index: int = 0,
576 excluded_keys: Iterable[str] = (),
577 ) -> PageObject:
578 """
579 Insert a page in this PDF file. The page is usually acquired from a
580 :class:`PdfReader<pypdf.PdfReader>` instance.
582 Args:
583 page: The page to add to the document.
584 index: Position at which the page will be inserted.
585 excluded_keys:
587 Returns:
588 The added PageObject.
590 """
591 assert self.flattened_pages is not None, "mypy"
592 if index < 0:
593 index = len(self.flattened_pages) + index
594 if index < 0:
595 raise ValueError("Invalid index value")
596 if index >= len(self.flattened_pages):
597 return self.add_page(page, excluded_keys)
598 return self._add_page(page, index, excluded_keys)
600 def _get_page_number_by_indirect(
601 self, indirect_reference: Union[None, int, NullObject, IndirectObject]
602 ) -> Optional[int]:
603 """
604 Generate _page_id2num.
606 Args:
607 indirect_reference:
609 Returns:
610 The page number or None
612 """
613 # To provide same function as in PdfReader
614 if is_null_or_none(indirect_reference):
615 return None
616 assert indirect_reference is not None, "mypy"
617 if isinstance(indirect_reference, int):
618 indirect_reference = IndirectObject(indirect_reference, 0, self)
619 obj = indirect_reference.get_object()
620 if isinstance(obj, PageObject):
621 return obj.page_number
622 return None
624 def add_blank_page(
625 self, width: Optional[float] = None, height: Optional[float] = None
626 ) -> PageObject:
627 """
628 Append a blank page to this PDF file and return it.
630 If no page size is specified, use the size of the last page.
632 Args:
633 width: The width of the new page expressed in default user
634 space units.
635 height: The height of the new page expressed in default
636 user space units.
638 Returns:
639 The newly appended page.
641 Raises:
642 PageSizeNotDefinedError: if width and height are not defined
643 and previous page does not exist.
645 """
646 page = PageObject.create_blank_page(self, width, height)
647 return self.add_page(page)
649 def insert_blank_page(
650 self,
651 width: Optional[Union[float, decimal.Decimal]] = None,
652 height: Optional[Union[float, decimal.Decimal]] = None,
653 index: int = 0,
654 ) -> PageObject:
655 """
656 Insert a blank page to this PDF file and return it.
658 If no page size is specified, use the size of the last page.
660 Args:
661 width: The width of the new page expressed in default user
662 space units.
663 height: The height of the new page expressed in default
664 user space units.
665 index: Position to add the page.
667 Returns:
668 The newly inserted page.
670 Raises:
671 PageSizeNotDefinedError: if width and height are not defined
672 and previous page does not exist.
674 """
675 if width is None or (height is None and index < self.get_num_pages()):
676 oldpage = self.pages[index]
677 width = oldpage.mediabox.width
678 height = oldpage.mediabox.height
679 page = PageObject.create_blank_page(self, width, height)
680 self.insert_page(page, index)
681 return page
683 @property
684 def open_destination(
685 self,
686 ) -> Union[None, Destination, TextStringObject, ByteStringObject]:
687 return super().open_destination
689 @open_destination.setter
690 def open_destination(self, dest: Union[None, str, Destination, PageObject]) -> None:
691 if dest is None:
692 try:
693 del self._root_object["/OpenAction"]
694 except KeyError:
695 pass
696 elif isinstance(dest, str):
697 self._root_object[NameObject("/OpenAction")] = TextStringObject(dest)
698 elif isinstance(dest, Destination):
699 self._root_object[NameObject("/OpenAction")] = dest.dest_array
700 elif isinstance(dest, PageObject):
701 self._root_object[NameObject("/OpenAction")] = Destination(
702 "Opening",
703 dest.indirect_reference
704 if dest.indirect_reference is not None
705 else NullObject(),
706 PAGE_FIT,
707 ).dest_array
709 def add_js(self, javascript: str) -> None:
710 """
711 Add JavaScript which will launch upon opening this PDF.
713 Args:
714 javascript: Your JavaScript.
716 >>> output.add_js("this.print({bUI:true,bSilent:false,bShrinkToFit:true});")
717 # Example: This will launch the print window when the PDF is opened.
719 """
720 # Names / JavaScript preferred to be able to add multiple scripts
721 if "/Names" not in self._root_object:
722 self._root_object[NameObject(CA.NAMES)] = DictionaryObject()
723 names = cast(DictionaryObject, self._root_object[CA.NAMES])
724 if "/JavaScript" not in names:
725 names[NameObject("/JavaScript")] = DictionaryObject(
726 {NameObject("/Names"): ArrayObject()}
727 )
728 js_list = cast(
729 ArrayObject, cast(DictionaryObject, names["/JavaScript"])["/Names"]
730 )
731 # We need a name for parameterized JavaScript in the PDF file,
732 # but it can be anything.
733 js_list.append(create_string_object(str(uuid.uuid4())))
735 js = DictionaryObject(
736 {
737 NameObject(PA.TYPE): NameObject("/Action"),
738 NameObject("/S"): NameObject("/JavaScript"),
739 NameObject("/JS"): TextStringObject(f"{javascript}"),
740 }
741 )
742 js_list.append(self._add_object(js))
744 def add_attachment(self, filename: str, data: Union[str, bytes]) -> None:
745 """
746 Embed a file inside the PDF.
748 Reference:
749 https://opensource.adobe.com/dc-acrobat-sdk-docs/pdfstandards/PDF32000_2008.pdf
750 Section 7.11.3
752 Args:
753 filename: The filename to display.
754 data: The data in the file.
756 """
757 # We need three entries:
758 # * The file's data
759 # * The /Filespec entry
760 # * The file's name, which goes in the Catalog
762 # The entry for the file
763 # Sample:
764 # 8 0 obj
765 # <<
766 # /Length 12
767 # /Type /EmbeddedFile
768 # >>
769 # stream
770 # Hello world!
771 # endstream
772 # endobj
774 if isinstance(data, str):
775 data = data.encode("latin-1")
776 file_entry = DecodedStreamObject()
777 file_entry.set_data(data)
778 file_entry.update({NameObject(PA.TYPE): NameObject("/EmbeddedFile")})
780 # The Filespec entry
781 # Sample:
782 # 7 0 obj
783 # <<
784 # /Type /Filespec
785 # /F (hello.txt)
786 # /EF << /F 8 0 R >>
787 # >>
788 # endobj
790 ef_entry = DictionaryObject()
791 ef_entry.update({NameObject("/F"): self._add_object(file_entry)})
793 filespec = DictionaryObject()
794 filespec.update(
795 {
796 NameObject(PA.TYPE): NameObject("/Filespec"),
797 NameObject(FileSpecificationDictionaryEntries.F): create_string_object(
798 filename
799 ), # Perhaps also try TextStringObject
800 NameObject(FileSpecificationDictionaryEntries.EF): ef_entry,
801 }
802 )
804 # Then create the entry for the root, as it needs
805 # a reference to the Filespec
806 # Sample:
807 # 1 0 obj
808 # <<
809 # /Type /Catalog
810 # /Outlines 2 0 R
811 # /Pages 3 0 R
812 # /Names << /EmbeddedFiles << /Names [(hello.txt) 7 0 R] >> >>
813 # >>
814 # endobj
816 if CA.NAMES not in self._root_object:
817 self._root_object[NameObject(CA.NAMES)] = self._add_object(
818 DictionaryObject()
819 )
820 if "/EmbeddedFiles" not in cast(DictionaryObject, self._root_object[CA.NAMES]):
821 embedded_files_names_dictionary = DictionaryObject(
822 {NameObject(CA.NAMES): ArrayObject()}
823 )
824 cast(DictionaryObject, self._root_object[CA.NAMES])[
825 NameObject("/EmbeddedFiles")
826 ] = self._add_object(embedded_files_names_dictionary)
827 else:
828 embedded_files_names_dictionary = cast(
829 DictionaryObject,
830 cast(DictionaryObject, self._root_object[CA.NAMES])["/EmbeddedFiles"],
831 )
832 cast(ArrayObject, embedded_files_names_dictionary[CA.NAMES]).extend(
833 [create_string_object(filename), filespec]
834 )
836 def append_pages_from_reader(
837 self,
838 reader: PdfReader,
839 after_page_append: Optional[Callable[[PageObject], None]] = None,
840 ) -> None:
841 """
842 Copy pages from reader to writer. Includes an optional callback
843 parameter which is invoked after pages are appended to the writer.
845 ``append`` should be preferred.
847 Args:
848 reader: a PdfReader object from which to copy page
849 annotations to this writer object. The writer's annots
850 will then be updated.
851 after_page_append:
852 Callback function that is invoked after each page is appended to
853 the writer. Signature includes a reference to the appended page
854 (delegates to append_pages_from_reader). The single parameter of
855 the callback is a reference to the page just appended to the
856 document.
858 """
859 reader_num_pages = len(reader.pages)
860 # Copy pages from reader to writer
861 for reader_page_number in range(reader_num_pages):
862 reader_page = reader.pages[reader_page_number]
863 writer_page = self.add_page(reader_page)
864 # Trigger callback, pass writer page as parameter
865 if callable(after_page_append):
866 after_page_append(writer_page)
868 def _update_field_annotation(
869 self,
870 field: DictionaryObject,
871 annotation: DictionaryObject,
872 font_name: str = "",
873 font_size: float = -1,
874 ) -> None:
875 # Calculate rectangle dimensions
876 _rct = cast(RectangleObject, annotation[AA.Rect])
877 rct = RectangleObject((0, 0, abs(_rct[2] - _rct[0]), abs(_rct[3] - _rct[1])))
879 # Extract font information
880 da = annotation.get_inherited(
881 AA.DA,
882 cast(DictionaryObject, self.root_object[CatalogDictionary.ACRO_FORM]).get(
883 AA.DA, None
884 ),
885 )
886 if da is None:
887 da = TextStringObject("/Helv 0 Tf 0 g")
888 else:
889 da = da.get_object()
890 font_properties = da.replace("\n", " ").replace("\r", " ").split(" ")
891 font_properties = [x for x in font_properties if x != ""]
892 if font_name:
893 font_properties[font_properties.index("Tf") - 2] = font_name
894 else:
895 font_name = font_properties[font_properties.index("Tf") - 2]
896 font_height = (
897 font_size
898 if font_size >= 0
899 else float(font_properties[font_properties.index("Tf") - 1])
900 )
901 if font_height == 0:
902 if field.get(FA.Ff, 0) & FA.FfBits.Multiline:
903 font_height = DEFAULT_FONT_HEIGHT_IN_MULTILINE
904 else:
905 font_height = rct.height - 2
906 font_properties[font_properties.index("Tf") - 1] = str(font_height)
907 da = " ".join(font_properties)
908 y_offset = rct.height - 1 - font_height
910 # Retrieve font information from local DR ...
911 dr: Any = cast(
912 DictionaryObject,
913 cast(
914 DictionaryObject,
915 annotation.get_inherited(
916 "/DR",
917 cast(
918 DictionaryObject, self.root_object[CatalogDictionary.ACRO_FORM]
919 ).get("/DR", DictionaryObject()),
920 ),
921 ).get_object(),
922 )
923 dr = dr.get("/Font", DictionaryObject()).get_object()
924 # _default_fonts_space_width keys is the list of Standard fonts
925 if font_name not in dr and font_name not in _default_fonts_space_width:
926 # ...or AcroForm dictionary
927 dr = cast(
928 Dict[Any, Any],
929 cast(
930 DictionaryObject, self.root_object[CatalogDictionary.ACRO_FORM]
931 ).get("/DR", {}),
932 )
933 dr = dr.get_object().get("/Font", DictionaryObject()).get_object()
934 font_res = dr.get(font_name, None)
935 if not is_null_or_none(font_res):
936 font_res = cast(DictionaryObject, font_res.get_object())
937 font_subtype, _, font_encoding, font_map = build_char_map_from_dict(
938 200, font_res
939 )
940 try: # remove width stored in -1 key
941 del font_map[-1]
942 except KeyError:
943 pass
944 font_full_rev: Dict[str, bytes]
945 if isinstance(font_encoding, str):
946 font_full_rev = {
947 v: k.encode(font_encoding) for k, v in font_map.items()
948 }
949 else:
950 font_full_rev = {v: bytes((k,)) for k, v in font_encoding.items()}
951 font_encoding_rev = {v: bytes((k,)) for k, v in font_encoding.items()}
952 for key, value in font_map.items():
953 font_full_rev[value] = font_encoding_rev.get(key, key)
954 else:
955 logger_warning(f"Font dictionary for {font_name} not found.", __name__)
956 font_full_rev = {}
958 # Retrieve field text and selected values
959 field_flags = field.get(FA.Ff, 0)
960 if field.get(FA.FT, "/Tx") == "/Ch" and field_flags & FA.FfBits.Combo == 0:
961 txt = "\n".join(annotation.get_inherited(FA.Opt, []))
962 sel = field.get("/V", [])
963 if not isinstance(sel, list):
964 sel = [sel]
965 else: # /Tx
966 txt = field.get("/V", "")
967 sel = []
968 # Escape parentheses (PDF 1.7 reference, table 3.2, Literal Strings)
969 txt = txt.replace("\\", "\\\\").replace("(", r"\(").replace(")", r"\)")
970 # Generate appearance stream
971 ap_stream = generate_appearance_stream(
972 txt, sel, da, font_full_rev, rct, font_height, y_offset
973 )
975 # Create appearance dictionary
976 dct = DecodedStreamObject.initialize_from_dictionary(
977 {
978 NameObject("/Type"): NameObject("/XObject"),
979 NameObject("/Subtype"): NameObject("/Form"),
980 NameObject("/BBox"): rct,
981 "__streamdata__": ByteStringObject(ap_stream),
982 "/Length": 0,
983 }
984 )
985 if AA.AP in annotation:
986 for k, v in cast(DictionaryObject, annotation[AA.AP]).get("/N", {}).items():
987 if k not in {"/BBox", "/Length", "/Subtype", "/Type", "/Filter"}:
988 dct[k] = v
990 # Update Resources with font information if necessary
991 if font_res is not None:
992 dct[NameObject("/Resources")] = DictionaryObject(
993 {
994 NameObject("/Font"): DictionaryObject(
995 {
996 NameObject(font_name): getattr(
997 font_res, "indirect_reference", font_res
998 )
999 }
1000 )
1001 }
1002 )
1003 if AA.AP not in annotation:
1004 annotation[NameObject(AA.AP)] = DictionaryObject(
1005 {NameObject("/N"): self._add_object(dct)}
1006 )
1007 elif "/N" not in cast(DictionaryObject, annotation[AA.AP]):
1008 cast(DictionaryObject, annotation[NameObject(AA.AP)])[
1009 NameObject("/N")
1010 ] = self._add_object(dct)
1011 else: # [/AP][/N] exists
1012 n = annotation[AA.AP]["/N"].indirect_reference.idnum # type: ignore
1013 self._objects[n - 1] = dct
1014 dct.indirect_reference = IndirectObject(n, 0, self)
1016 FFBITS_NUL = FA.FfBits(0)
1018 def update_page_form_field_values(
1019 self,
1020 page: Union[PageObject, List[PageObject], None],
1021 fields: Dict[str, Union[str, List[str], Tuple[str, str, float]]],
1022 flags: FA.FfBits = FFBITS_NUL,
1023 auto_regenerate: Optional[bool] = True,
1024 ) -> None:
1025 """
1026 Update the form field values for a given page from a fields dictionary.
1028 Copy field texts and values from fields to page.
1029 If the field links to a parent object, add the information to the parent.
1031 Args:
1032 page: `PageObject` - references **PDF writer's page** where the
1033 annotations and field data will be updated.
1034 `List[Pageobject]` - provides list of pages to be processed.
1035 `None` - all pages.
1036 fields: a Python dictionary of:
1038 * field names (/T) as keys and text values (/V) as value
1039 * field names (/T) as keys and list of text values (/V) for multiple choice list
1040 * field names (/T) as keys and tuple of:
1041 * text values (/V)
1042 * font id (e.g. /F1, the font id must exist)
1043 * font size (0 for autosize)
1045 flags: A set of flags from :class:`~pypdf.constants.FieldDictionaryAttributes.FfBits`.
1047 auto_regenerate: Set/unset the need_appearances flag;
1048 the flag is unchanged if auto_regenerate is None.
1050 """
1051 if CatalogDictionary.ACRO_FORM not in self._root_object:
1052 raise PyPdfError("No /AcroForm dictionary in PDF of PdfWriter Object")
1053 af = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
1054 if InteractiveFormDictEntries.Fields not in af:
1055 raise PyPdfError("No /Fields dictionary in PDF of PdfWriter Object")
1056 if isinstance(auto_regenerate, bool):
1057 self.set_need_appearances_writer(auto_regenerate)
1058 # Iterate through pages, update field values
1059 if page is None:
1060 page = list(self.pages)
1061 if isinstance(page, list):
1062 for p in page:
1063 if PG.ANNOTS in p: # just to prevent warnings
1064 self.update_page_form_field_values(p, fields, flags, None)
1065 return
1066 if PG.ANNOTS not in page:
1067 logger_warning("No fields to update on this page", __name__)
1068 return
1069 for annotation in page[PG.ANNOTS]: # type: ignore
1070 annotation = cast(DictionaryObject, annotation.get_object())
1071 if annotation.get("/Subtype", "") != "/Widget":
1072 continue
1073 if "/FT" in annotation and "/T" in annotation:
1074 parent_annotation = annotation
1075 else:
1076 parent_annotation = annotation.get(
1077 PG.PARENT, DictionaryObject()
1078 ).get_object()
1080 for field, value in fields.items():
1081 if not (
1082 self._get_qualified_field_name(parent_annotation) == field
1083 or parent_annotation.get("/T", None) == field
1084 ):
1085 continue
1086 if (
1087 parent_annotation.get("/FT", None) == "/Ch"
1088 and "/I" in parent_annotation
1089 ):
1090 del parent_annotation["/I"]
1091 if flags:
1092 annotation[NameObject(FA.Ff)] = NumberObject(flags)
1093 if isinstance(value, list):
1094 lst = ArrayObject(TextStringObject(v) for v in value)
1095 parent_annotation[NameObject(FA.V)] = lst
1096 elif isinstance(value, tuple):
1097 annotation[NameObject(FA.V)] = TextStringObject(
1098 value[0],
1099 )
1100 else:
1101 parent_annotation[NameObject(FA.V)] = TextStringObject(value)
1102 if parent_annotation.get(FA.FT) == "/Btn":
1103 # Checkbox button (no /FT found in Radio widgets)
1104 v = NameObject(value)
1105 ap = cast(DictionaryObject, annotation[NameObject(AA.AP)])
1106 if v not in cast(ArrayObject, ap[NameObject("/N")]):
1107 v = NameObject("/Off")
1108 # other cases will be updated through the for loop
1109 annotation[NameObject(AA.AS)] = v
1110 annotation[NameObject(FA.V)] = v
1111 elif (
1112 parent_annotation.get(FA.FT) == "/Tx"
1113 or parent_annotation.get(FA.FT) == "/Ch"
1114 ):
1115 # textbox
1116 if isinstance(value, tuple):
1117 self._update_field_annotation(
1118 parent_annotation, annotation, value[1], value[2]
1119 )
1120 else:
1121 self._update_field_annotation(parent_annotation, annotation)
1122 elif (
1123 annotation.get(FA.FT) == "/Sig"
1124 ): # deprecated # not implemented yet
1125 logger_warning("Signature forms not implemented yet", __name__)
1127 def reattach_fields(
1128 self, page: Optional[PageObject] = None
1129 ) -> List[DictionaryObject]:
1130 """
1131 Parse annotations within the page looking for orphan fields and
1132 reattach then into the Fields Structure.
1134 Args:
1135 page: page to analyze.
1136 If none is provided, all pages will be analyzed.
1138 Returns:
1139 list of reattached fields.
1141 """
1142 lst = []
1143 if page is None:
1144 for p in self.pages:
1145 lst += self.reattach_fields(p)
1146 return lst
1148 try:
1149 af = cast(DictionaryObject, self._root_object[CatalogDictionary.ACRO_FORM])
1150 except KeyError:
1151 af = DictionaryObject()
1152 self._root_object[NameObject(CatalogDictionary.ACRO_FORM)] = af
1153 try:
1154 fields = cast(ArrayObject, af[InteractiveFormDictEntries.Fields])
1155 except KeyError:
1156 fields = ArrayObject()
1157 af[NameObject(InteractiveFormDictEntries.Fields)] = fields
1159 if "/Annots" not in page:
1160 return lst
1161 annotations = cast(ArrayObject, page["/Annots"])
1162 for idx, annotation in enumerate(annotations):
1163 is_indirect = isinstance(annotation, IndirectObject)
1164 annotation = cast(DictionaryObject, annotation.get_object())
1165 if annotation.get("/Subtype", "") == "/Widget" and "/FT" in annotation:
1166 if (
1167 "indirect_reference" in annotation.__dict__
1168 and annotation.indirect_reference in fields
1169 ):
1170 continue
1171 if not is_indirect:
1172 annotations[idx] = self._add_object(annotation)
1173 fields.append(annotation.indirect_reference)
1174 lst.append(annotation)
1175 return lst
1177 def clone_reader_document_root(self, reader: PdfReader) -> None:
1178 """
1179 Copy the reader document root to the writer and all sub-elements,
1180 including pages, threads, outlines,... For partial insertion, ``append``
1181 should be considered.
1183 Args:
1184 reader: PdfReader from which the document root should be copied.
1186 """
1187 self._info_obj = None
1188 if self.incremental:
1189 self._objects = [None] * (cast(int, reader.trailer["/Size"]) - 1)
1190 for i in range(len(self._objects)):
1191 o = reader.get_object(i + 1)
1192 if o is not None:
1193 self._objects[i] = o.replicate(self)
1194 else:
1195 self._objects.clear()
1196 self._root_object = reader.root_object.clone(self)
1197 self._pages = self._root_object.raw_get("/Pages")
1199 assert len(self._objects) <= cast(int, reader.trailer["/Size"]) # for pytest
1200 # must be done here before rewriting
1201 if self.incremental:
1202 self._original_hash = [
1203 (obj.hash_bin() if obj is not None else 0) for obj in self._objects
1204 ]
1205 self._flatten()
1206 assert self.flattened_pages is not None
1207 for p in self.flattened_pages:
1208 self._replace_object(cast(IndirectObject, p.indirect_reference).idnum, p)
1209 if not self.incremental:
1210 p[NameObject("/Parent")] = self._pages
1211 if not self.incremental:
1212 cast(DictionaryObject, self._pages.get_object())[
1213 NameObject("/Kids")
1214 ] = ArrayObject([p.indirect_reference for p in self.flattened_pages])
1216 def clone_document_from_reader(
1217 self,
1218 reader: PdfReader,
1219 after_page_append: Optional[Callable[[PageObject], None]] = None,
1220 ) -> None:
1221 """
1222 Create a copy (clone) of a document from a PDF file reader cloning
1223 section '/Root' and '/Info' and '/ID' of the pdf.
1225 Args:
1226 reader: PDF file reader instance from which the clone
1227 should be created.
1228 after_page_append:
1229 Callback function that is invoked after each page is appended to
1230 the writer. Signature includes a reference to the appended page
1231 (delegates to append_pages_from_reader). The single parameter of
1232 the callback is a reference to the page just appended to the
1233 document.
1235 """
1236 self.clone_reader_document_root(reader)
1237 inf = reader._info
1238 if self.incremental:
1239 if inf is not None:
1240 self._info_obj = cast(
1241 IndirectObject, inf.clone(self).indirect_reference
1242 )
1243 assert isinstance(self._info, DictionaryObject), "for mypy"
1244 self._original_hash[
1245 self._info_obj.indirect_reference.idnum - 1
1246 ] = self._info.hash_bin()
1247 elif inf is not None:
1248 self._info_obj = self._add_object(
1249 DictionaryObject(cast(DictionaryObject, inf.get_object()))
1250 )
1251 # else: _info_obj = None done in clone_reader_document_root()
1253 try:
1254 self._ID = cast(ArrayObject, reader._ID).clone(self)
1255 except AttributeError:
1256 pass
1258 if callable(after_page_append):
1259 for page in cast(
1260 ArrayObject, cast(DictionaryObject, self._pages.get_object())["/Kids"]
1261 ):
1262 after_page_append(page.get_object())
1264 def _compute_document_identifier(self) -> ByteStringObject:
1265 stream = BytesIO()
1266 self._write_pdf_structure(stream)
1267 stream.seek(0)
1268 return ByteStringObject(_rolling_checksum(stream).encode("utf8"))
1270 def generate_file_identifiers(self) -> None:
1271 """
1272 Generate an identifier for the PDF that will be written.
1274 The only point of this is ensuring uniqueness. Reproducibility is not
1275 required.
1276 When a file is first written, both identifiers shall be set to the same value.
1277 If both identifiers match when a file reference is resolved, it is very
1278 likely that the correct and unchanged file has been found. If only the first
1279 identifier matches, a different version of the correct file has been found.
1280 see §14.4 "File Identifiers".
1281 """
1282 if self._ID:
1283 id1 = self._ID[0]
1284 id2 = self._compute_document_identifier()
1285 else:
1286 id1 = self._compute_document_identifier()
1287 id2 = id1
1288 self._ID = ArrayObject((id1, id2))
1290 def encrypt(
1291 self,
1292 user_password: str,
1293 owner_password: Optional[str] = None,
1294 use_128bit: bool = True,
1295 permissions_flag: UserAccessPermissions = ALL_DOCUMENT_PERMISSIONS,
1296 *,
1297 algorithm: Optional[str] = None,
1298 ) -> None:
1299 """
1300 Encrypt this PDF file with the PDF Standard encryption handler.
1302 Args:
1303 user_password: The password which allows for opening
1304 and reading the PDF file with the restrictions provided.
1305 owner_password: The password which allows for
1306 opening the PDF files without any restrictions. By default,
1307 the owner password is the same as the user password.
1308 use_128bit: flag as to whether to use 128bit
1309 encryption. When false, 40bit encryption will be used.
1310 By default, this flag is on.
1311 permissions_flag: permissions as described in
1312 Table 3.20 of the PDF 1.7 specification. A bit value of 1 means
1313 the permission is granted.
1314 Hence an integer value of -1 will set all flags.
1315 Bit position 3 is for printing, 4 is for modifying content,
1316 5 and 6 control annotations, 9 for form fields,
1317 10 for extraction of text and graphics.
1318 algorithm: encrypt algorithm. Values may be one of "RC4-40", "RC4-128",
1319 "AES-128", "AES-256-R5", "AES-256". If it is valid,
1320 `use_128bit` will be ignored.
1322 """
1323 if owner_password is None:
1324 owner_password = user_password
1326 if algorithm is not None:
1327 try:
1328 alg = getattr(EncryptAlgorithm, algorithm.replace("-", "_"))
1329 except AttributeError:
1330 raise ValueError(f"Algorithm '{algorithm}' NOT supported")
1331 else:
1332 alg = EncryptAlgorithm.RC4_128
1333 if not use_128bit:
1334 alg = EncryptAlgorithm.RC4_40
1335 self.generate_file_identifiers()
1336 assert self._ID
1337 self._encryption = Encryption.make(alg, permissions_flag, self._ID[0])
1338 # in case call `encrypt` again
1339 entry = self._encryption.write_entry(user_password, owner_password)
1340 if self._encrypt_entry:
1341 # replace old encrypt_entry
1342 assert self._encrypt_entry.indirect_reference is not None
1343 entry.indirect_reference = self._encrypt_entry.indirect_reference
1344 self._objects[entry.indirect_reference.idnum - 1] = entry
1345 else:
1346 self._add_object(entry)
1347 self._encrypt_entry = entry
1349 def write_stream(self, stream: StreamType) -> None:
1350 if hasattr(stream, "mode") and "b" not in stream.mode:
1351 logger_warning(
1352 f"File <{stream.name}> to write to is not in binary mode. "
1353 "It may not be written to correctly.",
1354 __name__,
1355 )
1356 # deprecated to be removed in pypdf 6.0.0 :
1357 # if not self._root:
1358 # self._root = self._add_object(self._root_object)
1359 # self._sweep_indirect_references(self._root)
1361 if self.incremental:
1362 self._reader.stream.seek(0)
1363 stream.write(self._reader.stream.read(-1))
1364 if len(self.list_objects_in_increment()) > 0:
1365 self._write_increment(stream) # writes objs, xref stream and startxref
1366 else:
1367 object_positions, free_objects = self._write_pdf_structure(stream)
1368 xref_location = self._write_xref_table(
1369 stream, object_positions, free_objects
1370 )
1371 self._write_trailer(stream, xref_location)
1373 def write(self, stream: Union[Path, StrByteType]) -> Tuple[bool, IO[Any]]:
1374 """
1375 Write the collection of pages added to this object out as a PDF file.
1377 Args:
1378 stream: An object to write the file to. The object can support
1379 the write method and the tell method, similar to a file object, or
1380 be a file path, just like the fileobj, just named it stream to keep
1381 existing workflow.
1383 Returns:
1384 A tuple (bool, IO).
1386 """
1387 my_file = False
1389 if stream == "":
1390 raise ValueError(f"Output({stream=}) is empty.")
1392 if isinstance(stream, (str, Path)):
1393 stream = FileIO(stream, "wb")
1394 my_file = True
1396 self.write_stream(stream)
1398 if my_file:
1399 stream.close()
1400 else:
1401 stream.flush()
1403 return my_file, stream
1405 def list_objects_in_increment(self) -> List[IndirectObject]:
1406 """
1407 For analysis or debugging.
1408 Provides the list of new or modified objects that will be written
1409 in the increment.
1410 Deleted objects will not be freed but will become orphans.
1412 Returns:
1413 List of new or modified IndirectObjects
1415 """
1416 original_hash_count = len(self._original_hash)
1417 return [
1418 cast(IndirectObject, obj).indirect_reference
1419 for i, obj in enumerate(self._objects)
1420 if (
1421 obj is not None
1422 and (
1423 i >= original_hash_count
1424 or obj.hash_bin() != self._original_hash[i]
1425 )
1426 )
1427 ]
1429 def _write_increment(self, stream: StreamType) -> None:
1430 object_positions = {}
1431 object_blocks = []
1432 current_start = -1
1433 current_stop = -2
1434 original_hash_count = len(self._original_hash)
1435 for i, obj in enumerate(self._objects):
1436 if obj is not None and (
1437 i >= original_hash_count
1438 or obj.hash_bin() != self._original_hash[i]
1439 ):
1440 idnum = i + 1
1441 assert isinstance(obj, PdfObject), "mypy"
1442 # first write new/modified object
1443 object_positions[idnum] = stream.tell()
1444 stream.write(f"{idnum} 0 obj\n".encode())
1445 """ encryption is not operational
1446 if self._encryption and obj != self._encrypt_entry:
1447 obj = self._encryption.encrypt_object(obj, idnum, 0)
1448 """
1449 obj.write_to_stream(stream)
1450 stream.write(b"\nendobj\n")
1452 # prepare xref
1453 if idnum != current_stop:
1454 if current_start > 0:
1455 object_blocks.append(
1456 [current_start, current_stop - current_start]
1457 )
1458 current_start = idnum
1459 current_stop = idnum + 1
1460 assert current_start > 0, "for pytest only"
1461 object_blocks.append([current_start, current_stop - current_start])
1462 # write incremented xref
1463 xref_location = stream.tell()
1464 xr_id = len(self._objects) + 1
1465 stream.write(f"{xr_id} 0 obj".encode())
1466 init_data = {
1467 NameObject("/Type"): NameObject("/XRef"),
1468 NameObject("/Size"): NumberObject(xr_id + 1),
1469 NameObject("/Root"): self.root_object.indirect_reference,
1470 NameObject("/Filter"): NameObject("/FlateDecode"),
1471 NameObject("/Index"): ArrayObject(
1472 [NumberObject(_it) for _su in object_blocks for _it in _su]
1473 ),
1474 NameObject("/W"): ArrayObject(
1475 [NumberObject(1), NumberObject(4), NumberObject(1)]
1476 ),
1477 "__streamdata__": b"",
1478 }
1479 if self._info is not None and (
1480 self._info.indirect_reference.idnum - 1 # type: ignore
1481 >= len(self._original_hash)
1482 or cast(IndirectObject, self._info).hash_bin() # kept for future
1483 != self._original_hash[
1484 self._info.indirect_reference.idnum - 1 # type: ignore
1485 ]
1486 ):
1487 init_data[NameObject(TK.INFO)] = self._info.indirect_reference
1488 init_data[NameObject(TK.PREV)] = NumberObject(self._reader._startxref)
1489 if self._ID:
1490 init_data[NameObject(TK.ID)] = self._ID
1491 xr = StreamObject.initialize_from_dictionary(init_data)
1492 xr.set_data(
1493 b"".join(
1494 [struct.pack(b">BIB", 1, _pos, 0) for _pos in object_positions.values()]
1495 )
1496 )
1497 xr.write_to_stream(stream)
1498 stream.write(f"\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1500 def _write_pdf_structure(self, stream: StreamType) -> Tuple[List[int], List[int]]:
1501 object_positions = []
1502 free_objects = []
1503 stream.write(self.pdf_header.encode() + b"\n")
1504 stream.write(b"%\xE2\xE3\xCF\xD3\n")
1506 for idnum, obj in enumerate(self._objects, start=1):
1507 if obj is not None:
1508 object_positions.append(stream.tell())
1509 stream.write(f"{idnum} 0 obj\n".encode())
1510 if self._encryption and obj != self._encrypt_entry:
1511 obj = self._encryption.encrypt_object(obj, idnum, 0)
1512 obj.write_to_stream(stream)
1513 stream.write(b"\nendobj\n")
1514 else:
1515 object_positions.append(-1)
1516 free_objects.append(idnum)
1517 free_objects.append(0) # add 0 to loop in accordance with specification
1518 return object_positions, free_objects
1520 def _write_xref_table(
1521 self, stream: StreamType, object_positions: List[int], free_objects: List[int]
1522 ) -> int:
1523 xref_location = stream.tell()
1524 stream.write(b"xref\n")
1525 stream.write(f"0 {len(self._objects) + 1}\n".encode())
1526 stream.write(f"{free_objects[0]:0>10} {65535:0>5} f \n".encode())
1527 free_idx = 1
1528 for offset in object_positions:
1529 if offset > 0:
1530 stream.write(f"{offset:0>10} {0:0>5} n \n".encode())
1531 else:
1532 stream.write(f"{free_objects[free_idx]:0>10} {1:0>5} f \n".encode())
1533 free_idx += 1
1534 return xref_location
1536 def _write_trailer(self, stream: StreamType, xref_location: int) -> None:
1537 """
1538 Write the PDF trailer to the stream.
1540 To quote the PDF specification:
1541 [The] trailer [gives] the location of the cross-reference table and
1542 of certain special objects within the body of the file.
1543 """
1544 stream.write(b"trailer\n")
1545 trailer = DictionaryObject(
1546 {
1547 NameObject(TK.SIZE): NumberObject(len(self._objects) + 1),
1548 NameObject(TK.ROOT): self.root_object.indirect_reference,
1549 }
1550 )
1551 if self._info is not None:
1552 trailer[NameObject(TK.INFO)] = self._info.indirect_reference
1553 if self._ID is not None:
1554 trailer[NameObject(TK.ID)] = self._ID
1555 if self._encrypt_entry:
1556 trailer[NameObject(TK.ENCRYPT)] = self._encrypt_entry.indirect_reference
1557 trailer.write_to_stream(stream)
1558 stream.write(f"\nstartxref\n{xref_location}\n%%EOF\n".encode()) # eof
1560 @property
1561 def metadata(self) -> Optional[DocumentInformation]:
1562 """
1563 Retrieve/set the PDF file's document information dictionary, if it exists.
1565 Args:
1566 value: dict with the entries to be set. if None : remove the /Info entry from the pdf.
1568 Note that some PDF files use (XMP) metadata streams instead of document
1569 information dictionaries, and these metadata streams will not be
1570 accessed by this function, but by :meth:`~xmp_metadata`.
1572 """
1573 return super().metadata
1575 @metadata.setter
1576 def metadata(
1577 self,
1578 value: Optional[Union[DocumentInformation, DictionaryObject, Dict[Any, Any]]],
1579 ) -> None:
1580 if value is None:
1581 self._info = None
1582 else:
1583 if self._info is not None:
1584 self._info.clear()
1586 self.add_metadata(value)
1588 def add_metadata(self, infos: Dict[str, Any]) -> None:
1589 """
1590 Add custom metadata to the output.
1592 Args:
1593 infos: a Python dictionary where each key is a field
1594 and each value is your new metadata.
1596 """
1597 args = {}
1598 if isinstance(infos, PdfObject):
1599 infos = cast(DictionaryObject, infos.get_object())
1600 for key, value in list(infos.items()):
1601 if isinstance(value, PdfObject):
1602 value = value.get_object()
1603 args[NameObject(key)] = create_string_object(str(value))
1604 if self._info is None:
1605 self._info = DictionaryObject()
1606 self._info.update(args)
1608 def compress_identical_objects(
1609 self,
1610 remove_identicals: bool = True,
1611 remove_orphans: bool = True,
1612 ) -> None:
1613 """
1614 Parse the PDF file and merge objects that have the same hash.
1615 This will make objects common to multiple pages.
1616 Recommended to be used just before writing output.
1618 Args:
1619 remove_identicals: Remove identical objects.
1620 remove_orphans: Remove unreferenced objects.
1622 """
1624 def replace_in_obj(
1625 obj: PdfObject, crossref: Dict[IndirectObject, IndirectObject]
1626 ) -> None:
1627 if isinstance(obj, DictionaryObject):
1628 key_val = obj.items()
1629 elif isinstance(obj, ArrayObject):
1630 key_val = enumerate(obj) # type: ignore
1631 else:
1632 return
1633 assert isinstance(obj, (DictionaryObject, ArrayObject))
1634 for k, v in key_val:
1635 if isinstance(v, IndirectObject):
1636 orphans[v.idnum - 1] = False
1637 if v in crossref:
1638 obj[k] = crossref[v]
1639 else:
1640 """the filtering on DictionaryObject and ArrayObject only
1641 will be performed within replace_in_obj"""
1642 replace_in_obj(v, crossref)
1644 # _idnum_hash :dict[hash]=(1st_ind_obj,[other_indir_objs,...])
1645 self._idnum_hash = {}
1646 orphans = [True] * len(self._objects)
1647 # look for similar objects
1648 for idx, obj in enumerate(self._objects):
1649 if is_null_or_none(obj):
1650 continue
1651 assert obj is not None, "mypy" # mypy: TypeGuard of `is_null_or_none` does not help here.
1652 assert isinstance(obj.indirect_reference, IndirectObject)
1653 h = obj.hash_value()
1654 if remove_identicals and h in self._idnum_hash:
1655 self._idnum_hash[h][1].append(obj.indirect_reference)
1656 self._objects[idx] = None
1657 else:
1658 self._idnum_hash[h] = (obj.indirect_reference, [])
1660 # generate the dict converting others to 1st
1661 cnv = {v[0]: v[1] for v in self._idnum_hash.values() if len(v[1]) > 0}
1662 cnv_rev: Dict[IndirectObject, IndirectObject] = {}
1663 for k, v in cnv.items():
1664 cnv_rev.update(zip(v, (k,) * len(v)))
1666 # replace reference to merged objects
1667 for obj in self._objects:
1668 if isinstance(obj, (DictionaryObject, ArrayObject)):
1669 replace_in_obj(obj, cnv_rev)
1671 # remove orphans (if applicable)
1672 orphans[self.root_object.indirect_reference.idnum - 1] = False # type: ignore
1674 orphans[self._info.indirect_reference.idnum - 1] = False # type: ignore
1676 try:
1677 orphans[self._ID.indirect_reference.idnum - 1] = False # type: ignore
1678 except AttributeError:
1679 pass
1680 for i in compress(range(len(self._objects)), orphans):
1681 self._objects[i] = None
1683 def _sweep_indirect_references(
1684 self,
1685 root: Union[
1686 ArrayObject,
1687 BooleanObject,
1688 DictionaryObject,
1689 FloatObject,
1690 IndirectObject,
1691 NameObject,
1692 PdfObject,
1693 NumberObject,
1694 TextStringObject,
1695 NullObject,
1696 ],
1697 ) -> None: # deprecated
1698 """
1699 Resolving any circular references to Page objects.
1701 Circular references to Page objects can arise when objects such as
1702 annotations refer to their associated page. If these references are not
1703 properly handled, the PDF file will contain multiple copies of the same
1704 Page object. To address this problem, Page objects store their original
1705 object reference number. This method adds the reference number of any
1706 circularly referenced Page objects to an external reference map. This
1707 ensures that self-referencing trees reference the correct new object
1708 location, rather than copying in a new copy of the Page object.
1710 Args:
1711 root: The root of the PDF object tree to sweep.
1713 """
1714 deprecate(
1715 "_sweep_indirect_references has been removed, please report to dev team if this warning is observed",
1716 )
1718 def _resolve_indirect_object(
1719 self, data: IndirectObject
1720 ) -> IndirectObject: # deprecated
1721 """
1722 Resolves an indirect object to an indirect object in this PDF file.
1724 If the input indirect object already belongs to this PDF file, it is
1725 returned directly. Otherwise, the object is retrieved from the input
1726 object's PDF file using the object's ID number and generation number. If
1727 the object cannot be found, a warning is logged and a `NullObject` is
1728 returned.
1730 If the object is not already in this PDF file, it is added to the file's
1731 list of objects and assigned a new ID number and generation number of 0.
1732 The hash value of the object is then added to the `_idnum_hash`
1733 dictionary, with the corresponding `IndirectObject` reference as the
1734 value.
1736 Args:
1737 data: The `IndirectObject` to resolve.
1739 Returns:
1740 The resolved `IndirectObject` in this PDF file.
1742 Raises:
1743 ValueError: If the input stream is closed.
1745 """
1746 deprecate(
1747 "_resolve_indirect_object has been removed, please report to dev team if this warning is observed",
1748 )
1749 return IndirectObject(0, 0, self)
1751 def get_reference(self, obj: PdfObject) -> IndirectObject:
1752 idnum = self._objects.index(obj) + 1
1753 ref = IndirectObject(idnum, 0, self)
1754 assert ref.get_object() == obj
1755 return ref
1757 def get_outline_root(self) -> TreeObject:
1758 if CO.OUTLINES in self._root_object:
1759 # Entries in the catalog dictionary
1760 outline = cast(TreeObject, self._root_object[CO.OUTLINES])
1761 if not isinstance(outline, TreeObject):
1762 t = TreeObject(outline)
1763 self._replace_object(outline.indirect_reference.idnum, t)
1764 outline = t
1765 idnum = self._objects.index(outline) + 1
1766 outline_ref = IndirectObject(idnum, 0, self)
1767 assert outline_ref.get_object() == outline
1768 else:
1769 outline = TreeObject()
1770 outline.update({})
1771 outline_ref = self._add_object(outline)
1772 self._root_object[NameObject(CO.OUTLINES)] = outline_ref
1774 return outline
1776 def get_threads_root(self) -> ArrayObject:
1777 """
1778 The list of threads.
1780 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1782 Returns:
1783 An array (possibly empty) of Dictionaries with an ``/F`` key,
1784 and optionally information about the thread in ``/I`` or ``/Metadata`` keys.
1786 """
1787 if CO.THREADS in self._root_object:
1788 # Entries in the catalog dictionary
1789 threads = cast(ArrayObject, self._root_object[CO.THREADS])
1790 else:
1791 threads = ArrayObject()
1792 self._root_object[NameObject(CO.THREADS)] = threads
1793 return threads
1795 @property
1796 def threads(self) -> ArrayObject:
1797 """
1798 Read-only property for the list of threads.
1800 See §12.4.3 of the PDF 1.7 or PDF 2.0 specification.
1802 Each element is a dictionary with an ``/F`` key, and optionally
1803 information about the thread in ``/I`` or ``/Metadata`` keys.
1804 """
1805 return self.get_threads_root()
1807 def add_outline_item_destination(
1808 self,
1809 page_destination: Union[IndirectObject, PageObject, TreeObject],
1810 parent: Union[None, TreeObject, IndirectObject] = None,
1811 before: Union[None, TreeObject, IndirectObject] = None,
1812 is_open: bool = True,
1813 ) -> IndirectObject:
1814 page_destination = cast(PageObject, page_destination.get_object())
1815 if isinstance(page_destination, PageObject):
1816 return self.add_outline_item_destination(
1817 Destination(
1818 f"page #{page_destination.page_number}",
1819 cast(IndirectObject, page_destination.indirect_reference),
1820 Fit.fit(),
1821 )
1822 )
1824 if parent is None:
1825 parent = self.get_outline_root()
1827 page_destination[NameObject("/%is_open%")] = BooleanObject(is_open)
1828 parent = cast(TreeObject, parent.get_object())
1829 page_destination_ref = self._add_object(page_destination)
1830 if before is not None:
1831 before = before.indirect_reference
1832 parent.insert_child(
1833 page_destination_ref,
1834 before,
1835 self,
1836 page_destination.inc_parent_counter_outline
1837 if is_open
1838 else (lambda x, y: 0), # noqa: ARG005
1839 )
1840 if "/Count" not in page_destination:
1841 page_destination[NameObject("/Count")] = NumberObject(0)
1843 return page_destination_ref
1845 def add_outline_item_dict(
1846 self,
1847 outline_item: OutlineItemType,
1848 parent: Union[None, TreeObject, IndirectObject] = None,
1849 before: Union[None, TreeObject, IndirectObject] = None,
1850 is_open: bool = True,
1851 ) -> IndirectObject:
1852 outline_item_object = TreeObject()
1853 outline_item_object.update(outline_item)
1855 """code currently unreachable
1856 if "/A" in outline_item:
1857 action = DictionaryObject()
1858 a_dict = cast(DictionaryObject, outline_item["/A"])
1859 for k, v in list(a_dict.items()):
1860 action[NameObject(str(k))] = v
1861 action_ref = self._add_object(action)
1862 outline_item_object[NameObject("/A")] = action_ref
1863 """
1864 return self.add_outline_item_destination(
1865 outline_item_object, parent, before, is_open
1866 )
1868 def add_outline_item(
1869 self,
1870 title: str,
1871 page_number: Union[None, PageObject, IndirectObject, int],
1872 parent: Union[None, TreeObject, IndirectObject] = None,
1873 before: Union[None, TreeObject, IndirectObject] = None,
1874 color: Optional[Union[Tuple[float, float, float], str]] = None,
1875 bold: bool = False,
1876 italic: bool = False,
1877 fit: Fit = PAGE_FIT,
1878 is_open: bool = True,
1879 ) -> IndirectObject:
1880 """
1881 Add an outline item (commonly referred to as a "Bookmark") to the PDF file.
1883 Args:
1884 title: Title to use for this outline item.
1885 page_number: Page number this outline item will point to.
1886 parent: A reference to a parent outline item to create nested
1887 outline items.
1888 before:
1889 color: Color of the outline item's font as a red, green, blue tuple
1890 from 0.0 to 1.0 or as a Hex String (#RRGGBB)
1891 bold: Outline item font is bold
1892 italic: Outline item font is italic
1893 fit: The fit of the destination page.
1895 Returns:
1896 The added outline item as an indirect object.
1898 """
1899 page_ref: Union[None, NullObject, IndirectObject, NumberObject]
1900 if isinstance(italic, Fit): # it means that we are on the old params
1901 if fit is not None and page_number is None:
1902 page_number = fit
1903 return self.add_outline_item(
1904 title, page_number, parent, None, before, color, bold, italic, is_open=is_open
1905 )
1906 if page_number is None:
1907 action_ref = None
1908 else:
1909 if isinstance(page_number, IndirectObject):
1910 page_ref = page_number
1911 elif isinstance(page_number, PageObject):
1912 page_ref = page_number.indirect_reference
1913 elif isinstance(page_number, int):
1914 try:
1915 page_ref = self.pages[page_number].indirect_reference
1916 except IndexError:
1917 page_ref = NumberObject(page_number)
1918 if page_ref is None:
1919 logger_warning(
1920 f"can not find reference of page {page_number}",
1921 __name__,
1922 )
1923 page_ref = NullObject()
1924 dest = Destination(
1925 NameObject("/" + title + " outline item"),
1926 page_ref,
1927 fit,
1928 )
1930 action_ref = self._add_object(
1931 DictionaryObject(
1932 {
1933 NameObject(GoToActionArguments.D): dest.dest_array,
1934 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1935 }
1936 )
1937 )
1938 outline_item = self._add_object(
1939 _create_outline_item(action_ref, title, color, italic, bold)
1940 )
1942 if parent is None:
1943 parent = self.get_outline_root()
1944 return self.add_outline_item_destination(outline_item, parent, before, is_open)
1946 def add_outline(self) -> None:
1947 raise NotImplementedError(
1948 "This method is not yet implemented. Use :meth:`add_outline_item` instead."
1949 )
1951 def add_named_destination_array(
1952 self, title: TextStringObject, destination: Union[IndirectObject, ArrayObject]
1953 ) -> None:
1954 named_dest = self.get_named_dest_root()
1955 i = 0
1956 while i < len(named_dest):
1957 if title < named_dest[i]:
1958 named_dest.insert(i, destination)
1959 named_dest.insert(i, TextStringObject(title))
1960 return
1961 i += 2
1962 named_dest.extend([TextStringObject(title), destination])
1963 return
1965 def add_named_destination_object(
1966 self,
1967 page_destination: PdfObject,
1968 ) -> IndirectObject:
1969 page_destination_ref = self._add_object(page_destination.dest_array) # type: ignore
1970 self.add_named_destination_array(
1971 cast("TextStringObject", page_destination["/Title"]), page_destination_ref # type: ignore
1972 )
1974 return page_destination_ref
1976 def add_named_destination(
1977 self,
1978 title: str,
1979 page_number: int,
1980 ) -> IndirectObject:
1981 page_ref = self.get_object(self._pages)[PA.KIDS][page_number] # type: ignore
1982 dest = DictionaryObject()
1983 dest.update(
1984 {
1985 NameObject(GoToActionArguments.D): ArrayObject(
1986 [page_ref, NameObject(TypFitArguments.FIT_H), NumberObject(826)]
1987 ),
1988 NameObject(GoToActionArguments.S): NameObject("/GoTo"),
1989 }
1990 )
1992 dest_ref = self._add_object(dest)
1993 if not isinstance(title, TextStringObject):
1994 title = TextStringObject(str(title))
1996 self.add_named_destination_array(title, dest_ref)
1997 return dest_ref
1999 def remove_links(self) -> None:
2000 """Remove links and annotations from this output."""
2001 for page in self.pages:
2002 self.remove_objects_from_page(page, ObjectDeletionFlag.ALL_ANNOTATIONS)
2004 def remove_annotations(
2005 self, subtypes: Optional[Union[AnnotationSubtype, Iterable[AnnotationSubtype]]]
2006 ) -> None:
2007 """
2008 Remove annotations by annotation subtype.
2010 Args:
2011 subtypes: subtype or list of subtypes to be removed.
2012 Examples are: "/Link", "/FileAttachment", "/Sound",
2013 "/Movie", "/Screen", ...
2014 If you want to remove all annotations, use subtypes=None.
2016 """
2017 for page in self.pages:
2018 self._remove_annots_from_page(page, subtypes)
2020 def _remove_annots_from_page(
2021 self,
2022 page: Union[IndirectObject, PageObject, DictionaryObject],
2023 subtypes: Optional[Iterable[str]],
2024 ) -> None:
2025 page = cast(DictionaryObject, page.get_object())
2026 if PG.ANNOTS in page:
2027 i = 0
2028 while i < len(cast(ArrayObject, page[PG.ANNOTS])):
2029 an = cast(ArrayObject, page[PG.ANNOTS])[i]
2030 obj = cast(DictionaryObject, an.get_object())
2031 if subtypes is None or cast(str, obj["/Subtype"]) in subtypes:
2032 if isinstance(an, IndirectObject):
2033 self._objects[an.idnum - 1] = NullObject() # to reduce PDF size
2034 del page[PG.ANNOTS][i] # type:ignore
2035 else:
2036 i += 1
2038 def remove_objects_from_page(
2039 self,
2040 page: Union[PageObject, DictionaryObject],
2041 to_delete: Union[ObjectDeletionFlag, Iterable[ObjectDeletionFlag]],
2042 text_filters: Optional[Dict[str, Any]] = None
2043 ) -> None:
2044 """
2045 Remove objects specified by ``to_delete`` from the given page.
2047 Args:
2048 page: Page object to clean up.
2049 to_delete: Objects to be deleted; can be a ``ObjectDeletionFlag``
2050 or a list of ObjectDeletionFlag
2051 text_filters: Properties of text to be deleted, if applicable. Optional.
2052 This is a Python dictionary with the following properties:
2054 * font_ids: List of font resource IDs (such as /F1 or /T1_0) to be deleted.
2056 """
2057 if isinstance(to_delete, (list, tuple)):
2058 for to_d in to_delete:
2059 self.remove_objects_from_page(page, to_d)
2060 return None
2061 assert isinstance(to_delete, ObjectDeletionFlag)
2063 if to_delete & ObjectDeletionFlag.LINKS:
2064 return self._remove_annots_from_page(page, ("/Link",))
2065 if to_delete & ObjectDeletionFlag.ATTACHMENTS:
2066 return self._remove_annots_from_page(
2067 page, ("/FileAttachment", "/Sound", "/Movie", "/Screen")
2068 )
2069 if to_delete & ObjectDeletionFlag.OBJECTS_3D:
2070 return self._remove_annots_from_page(page, ("/3D",))
2071 if to_delete & ObjectDeletionFlag.ALL_ANNOTATIONS:
2072 return self._remove_annots_from_page(page, None)
2074 jump_operators = []
2075 if to_delete & ObjectDeletionFlag.DRAWING_IMAGES:
2076 jump_operators = (
2077 [
2078 b"w", b"J", b"j", b"M", b"d", b"i",
2079 b"W", b"W*",
2080 b"b", b"b*", b"B", b"B*", b"S", b"s", b"f", b"f*", b"F", b"n",
2081 b"m", b"l", b"c", b"v", b"y", b"h", b"re",
2082 b"sh"
2083 ]
2084 )
2085 if to_delete & ObjectDeletionFlag.TEXT:
2086 jump_operators = [b"Tj", b"TJ", b"'", b'"']
2088 def clean(
2089 content: ContentStream,
2090 images: List[str],
2091 forms: List[str],
2092 text_filters: Optional[Dict[str, Any]] = None
2093 ) -> None:
2094 nonlocal jump_operators, to_delete
2096 font_id = None
2097 font_ids_to_delete = []
2098 if text_filters and to_delete & ObjectDeletionFlag.TEXT:
2099 font_ids_to_delete = text_filters.get("font_ids", [])
2101 i = 0
2102 while i < len(content.operations):
2103 operands, operator = content.operations[i]
2104 if operator == b"Tf":
2105 font_id = operands[0]
2106 if (
2107 (
2108 operator == b"INLINE IMAGE"
2109 and (to_delete & ObjectDeletionFlag.INLINE_IMAGES)
2110 )
2111 or (operator in jump_operators)
2112 or (
2113 operator == b"Do"
2114 and (to_delete & ObjectDeletionFlag.XOBJECT_IMAGES)
2115 and (operands[0] in images)
2116 )
2117 ):
2118 if (
2119 not to_delete & ObjectDeletionFlag.TEXT
2120 or (to_delete & ObjectDeletionFlag.TEXT and not text_filters)
2121 or (to_delete & ObjectDeletionFlag.TEXT and font_id in font_ids_to_delete)
2122 ):
2123 del content.operations[i]
2124 else:
2125 i += 1
2126 else:
2127 i += 1
2128 content.get_data() # this ensures ._data is rebuilt from the .operations
2130 def clean_forms(
2131 elt: DictionaryObject, stack: List[DictionaryObject]
2132 ) -> Tuple[List[str], List[str]]:
2133 nonlocal to_delete
2134 # elt in recursive call is a new ContentStream object, so we have to check the indirect_reference
2135 if (elt in stack) or (
2136 hasattr(elt, "indirect_reference")
2137 and any(
2138 elt.indirect_reference == getattr(x, "indirect_reference", -1)
2139 for x in stack
2140 )
2141 ):
2142 # to prevent infinite looping
2143 return [], [] # pragma: no cover
2144 try:
2145 d = cast(
2146 Dict[Any, Any],
2147 cast(DictionaryObject, elt["/Resources"])["/XObject"],
2148 )
2149 except KeyError:
2150 d = {}
2151 images = []
2152 forms = []
2153 for k, v in d.items():
2154 o = v.get_object()
2155 try:
2156 content: Any = None
2157 if (
2158 to_delete & ObjectDeletionFlag.XOBJECT_IMAGES
2159 and o["/Subtype"] == "/Image"
2160 ):
2161 content = NullObject() # to delete the image keeping the entry
2162 images.append(k)
2163 if o["/Subtype"] == "/Form":
2164 forms.append(k)
2165 if isinstance(o, ContentStream):
2166 content = o
2167 else:
2168 content = ContentStream(o, self)
2169 content.update(
2170 {
2171 k1: v1
2172 for k1, v1 in o.items()
2173 if k1 not in ["/Length", "/Filter", "/DecodeParms"]
2174 }
2175 )
2176 try:
2177 content.indirect_reference = o.indirect_reference
2178 except AttributeError: # pragma: no cover
2179 pass
2180 stack.append(elt)
2181 clean_forms(content, stack) # clean subforms
2182 if content is not None:
2183 if isinstance(v, IndirectObject):
2184 self._objects[v.idnum - 1] = content
2185 else:
2186 # should only occur in a PDF not respecting PDF spec
2187 # where streams must be indirected.
2188 d[k] = self._add_object(content) # pragma: no cover
2189 except (TypeError, KeyError):
2190 pass
2191 for im in images:
2192 del d[im] # for clean-up
2193 if isinstance(elt, StreamObject): # for /Form
2194 if not isinstance(elt, ContentStream): # pragma: no cover
2195 e = ContentStream(elt, self)
2196 e.update(elt.items())
2197 elt = e
2198 clean(elt, images, forms, text_filters) # clean the content
2199 return images, forms
2201 if not isinstance(page, PageObject):
2202 page = PageObject(self, page.indirect_reference) # pragma: no cover
2203 if "/Contents" in page:
2204 content = cast(ContentStream, page.get_contents())
2206 images, forms = clean_forms(page, [])
2208 clean(content, images, forms, text_filters)
2209 page.replace_contents(content)
2211 def remove_images(
2212 self,
2213 to_delete: ImageType = ImageType.ALL,
2214 ) -> None:
2215 """
2216 Remove images from this output.
2218 Args:
2219 to_delete: The type of images to be deleted
2220 (default = all images types)
2222 """
2223 if isinstance(to_delete, bool):
2224 to_delete = ImageType.ALL
2226 i = ObjectDeletionFlag.NONE
2228 for image in ("XOBJECT_IMAGES", "INLINE_IMAGES", "DRAWING_IMAGES"):
2229 if to_delete & ImageType[image]:
2230 i |= ObjectDeletionFlag[image]
2232 for page in self.pages:
2233 self.remove_objects_from_page(page, i)
2235 def remove_text(self, font_names: Optional[List[str]] = None) -> None:
2236 """
2237 Remove text from the PDF.
2239 Args:
2240 font_names: List of font names to remove, such as "Helvetica-Bold".
2241 Optional. If not specified, all text will be removed.
2242 """
2243 if not font_names:
2244 font_names = []
2246 for page in self.pages:
2247 resource_ids_to_remove = []
2249 # Content streams reference fonts and other resources with names like "/F1" or "/T1_0"
2250 # Font names need to be converted to resource names/IDs for easier removal
2251 if font_names:
2252 # Recursively loop through page objects to gather font info
2253 def get_font_info(
2254 obj: Any,
2255 font_info: Optional[Dict[str, Any]] = None,
2256 key: Optional[str] = None
2257 ) -> Dict[str, Any]:
2258 if font_info is None:
2259 font_info = {}
2260 if isinstance(obj, IndirectObject):
2261 obj = obj.get_object()
2262 if isinstance(obj, dict):
2263 if obj.get("/Type") == "/Font":
2264 font_name = obj.get("/BaseFont", "")
2265 # Normalize font names like "/RRXFFV+Palatino-Bold" to "Palatino-Bold"
2266 normalized_font_name = font_name.lstrip("/").split("+")[-1]
2267 if normalized_font_name not in font_info:
2268 font_info[normalized_font_name] = {
2269 "normalized_font_name": normalized_font_name,
2270 "resource_ids": [],
2271 }
2272 if key not in font_info[normalized_font_name]["resource_ids"]:
2273 font_info[normalized_font_name]["resource_ids"].append(key)
2274 for k in obj:
2275 font_info = get_font_info(obj[k], font_info, k)
2276 elif isinstance(obj, (list, ArrayObject)):
2277 for child_obj in obj:
2278 font_info = get_font_info(child_obj, font_info)
2279 return font_info
2281 # Add relevant resource names for removal
2282 font_info = get_font_info(page.get("/Resources"))
2283 for font_name in font_names:
2284 if font_name in font_info:
2285 resource_ids_to_remove.extend(font_info[font_name]["resource_ids"])
2287 text_filters = {}
2288 if font_names:
2289 text_filters["font_ids"] = resource_ids_to_remove
2290 self.remove_objects_from_page(page, ObjectDeletionFlag.TEXT, text_filters=text_filters)
2292 def add_uri(
2293 self,
2294 page_number: int,
2295 uri: str,
2296 rect: RectangleObject,
2297 border: Optional[ArrayObject] = None,
2298 ) -> None:
2299 """
2300 Add an URI from a rectangular area to the specified page.
2302 Args:
2303 page_number: index of the page on which to place the URI action.
2304 uri: URI of resource to link to.
2305 rect: :class:`RectangleObject<pypdf.generic.RectangleObject>` or
2306 array of four integers specifying the clickable rectangular area
2307 ``[xLL, yLL, xUR, yUR]``, or string in the form
2308 ``"[ xLL yLL xUR yUR ]"``.
2309 border: if provided, an array describing border-drawing
2310 properties. See the PDF spec for details. No border will be
2311 drawn if this argument is omitted.
2313 """
2314 page_link = self.get_object(self._pages)[PA.KIDS][page_number] # type: ignore
2315 page_ref = cast(Dict[str, Any], self.get_object(page_link))
2317 border_arr: BorderArrayType
2318 if border is not None:
2319 border_arr = [NumberObject(n) for n in border[:3]]
2320 if len(border) == 4:
2321 dash_pattern = ArrayObject([NumberObject(n) for n in border[3]])
2322 border_arr.append(dash_pattern)
2323 else:
2324 border_arr = [NumberObject(2), NumberObject(2), NumberObject(2)]
2326 if isinstance(rect, str):
2327 rect = NumberObject(rect)
2328 elif isinstance(rect, RectangleObject):
2329 pass
2330 else:
2331 rect = RectangleObject(rect)
2333 lnk2 = DictionaryObject()
2334 lnk2.update(
2335 {
2336 NameObject("/S"): NameObject("/URI"),
2337 NameObject("/URI"): TextStringObject(uri),
2338 }
2339 )
2340 lnk = DictionaryObject()
2341 lnk.update(
2342 {
2343 NameObject(AA.Type): NameObject("/Annot"),
2344 NameObject(AA.Subtype): NameObject("/Link"),
2345 NameObject(AA.P): page_link,
2346 NameObject(AA.Rect): rect,
2347 NameObject("/H"): NameObject("/I"),
2348 NameObject(AA.Border): ArrayObject(border_arr),
2349 NameObject("/A"): lnk2,
2350 }
2351 )
2352 lnk_ref = self._add_object(lnk)
2354 if PG.ANNOTS in page_ref:
2355 page_ref[PG.ANNOTS].append(lnk_ref)
2356 else:
2357 page_ref[NameObject(PG.ANNOTS)] = ArrayObject([lnk_ref])
2359 _valid_layouts = (
2360 "/NoLayout",
2361 "/SinglePage",
2362 "/OneColumn",
2363 "/TwoColumnLeft",
2364 "/TwoColumnRight",
2365 "/TwoPageLeft",
2366 "/TwoPageRight",
2367 )
2369 def _get_page_layout(self) -> Optional[LayoutType]:
2370 try:
2371 return cast(LayoutType, self._root_object["/PageLayout"])
2372 except KeyError:
2373 return None
2375 def _set_page_layout(self, layout: Union[NameObject, LayoutType]) -> None:
2376 """
2377 Set the page layout.
2379 Args:
2380 layout: The page layout to be used.
2382 .. list-table:: Valid ``layout`` arguments
2383 :widths: 50 200
2385 * - /NoLayout
2386 - Layout explicitly not specified
2387 * - /SinglePage
2388 - Show one page at a time
2389 * - /OneColumn
2390 - Show one column at a time
2391 * - /TwoColumnLeft
2392 - Show pages in two columns, odd-numbered pages on the left
2393 * - /TwoColumnRight
2394 - Show pages in two columns, odd-numbered pages on the right
2395 * - /TwoPageLeft
2396 - Show two pages at a time, odd-numbered pages on the left
2397 * - /TwoPageRight
2398 - Show two pages at a time, odd-numbered pages on the right
2400 """
2401 if not isinstance(layout, NameObject):
2402 if layout not in self._valid_layouts:
2403 logger_warning(
2404 f"Layout should be one of: {'', ''.join(self._valid_layouts)}",
2405 __name__,
2406 )
2407 layout = NameObject(layout)
2408 self._root_object.update({NameObject("/PageLayout"): layout})
2410 def set_page_layout(self, layout: LayoutType) -> None:
2411 """
2412 Set the page layout.
2414 Args:
2415 layout: The page layout to be used
2417 .. list-table:: Valid ``layout`` arguments
2418 :widths: 50 200
2420 * - /NoLayout
2421 - Layout explicitly not specified
2422 * - /SinglePage
2423 - Show one page at a time
2424 * - /OneColumn
2425 - Show one column at a time
2426 * - /TwoColumnLeft
2427 - Show pages in two columns, odd-numbered pages on the left
2428 * - /TwoColumnRight
2429 - Show pages in two columns, odd-numbered pages on the right
2430 * - /TwoPageLeft
2431 - Show two pages at a time, odd-numbered pages on the left
2432 * - /TwoPageRight
2433 - Show two pages at a time, odd-numbered pages on the right
2435 """
2436 self._set_page_layout(layout)
2438 @property
2439 def page_layout(self) -> Optional[LayoutType]:
2440 """
2441 Page layout property.
2443 .. list-table:: Valid ``layout`` values
2444 :widths: 50 200
2446 * - /NoLayout
2447 - Layout explicitly not specified
2448 * - /SinglePage
2449 - Show one page at a time
2450 * - /OneColumn
2451 - Show one column at a time
2452 * - /TwoColumnLeft
2453 - Show pages in two columns, odd-numbered pages on the left
2454 * - /TwoColumnRight
2455 - Show pages in two columns, odd-numbered pages on the right
2456 * - /TwoPageLeft
2457 - Show two pages at a time, odd-numbered pages on the left
2458 * - /TwoPageRight
2459 - Show two pages at a time, odd-numbered pages on the right
2460 """
2461 return self._get_page_layout()
2463 @page_layout.setter
2464 def page_layout(self, layout: LayoutType) -> None:
2465 self._set_page_layout(layout)
2467 _valid_modes = (
2468 "/UseNone",
2469 "/UseOutlines",
2470 "/UseThumbs",
2471 "/FullScreen",
2472 "/UseOC",
2473 "/UseAttachments",
2474 )
2476 def _get_page_mode(self) -> Optional[PagemodeType]:
2477 try:
2478 return cast(PagemodeType, self._root_object["/PageMode"])
2479 except KeyError:
2480 return None
2482 @property
2483 def page_mode(self) -> Optional[PagemodeType]:
2484 """
2485 Page mode property.
2487 .. list-table:: Valid ``mode`` values
2488 :widths: 50 200
2490 * - /UseNone
2491 - Do not show outline or thumbnails panels
2492 * - /UseOutlines
2493 - Show outline (aka bookmarks) panel
2494 * - /UseThumbs
2495 - Show page thumbnails panel
2496 * - /FullScreen
2497 - Fullscreen view
2498 * - /UseOC
2499 - Show Optional Content Group (OCG) panel
2500 * - /UseAttachments
2501 - Show attachments panel
2502 """
2503 return self._get_page_mode()
2505 @page_mode.setter
2506 def page_mode(self, mode: PagemodeType) -> None:
2507 if isinstance(mode, NameObject):
2508 mode_name: NameObject = mode
2509 else:
2510 if mode not in self._valid_modes:
2511 logger_warning(
2512 f"Mode should be one of: {', '.join(self._valid_modes)}", __name__
2513 )
2514 mode_name = NameObject(mode)
2515 self._root_object.update({NameObject("/PageMode"): mode_name})
2517 def add_annotation(
2518 self,
2519 page_number: Union[int, PageObject],
2520 annotation: Dict[str, Any],
2521 ) -> DictionaryObject:
2522 """
2523 Add a single annotation to the page.
2524 The added annotation must be a new annotation.
2525 It cannot be recycled.
2527 Args:
2528 page_number: PageObject or page index.
2529 annotation: Annotation to be added (created with annotation).
2531 Returns:
2532 The inserted object.
2533 This can be used for popup creation, for example.
2535 """
2536 page = page_number
2537 if isinstance(page, int):
2538 page = self.pages[page]
2539 elif not isinstance(page, PageObject):
2540 raise TypeError("page: invalid type")
2542 to_add = cast(DictionaryObject, _pdf_objectify(annotation))
2543 to_add[NameObject("/P")] = page.indirect_reference
2545 if page.annotations is None:
2546 page[NameObject("/Annots")] = ArrayObject()
2547 assert page.annotations is not None
2549 # Internal link annotations need the correct object type for the
2550 # destination
2551 if to_add.get("/Subtype") == "/Link" and "/Dest" in to_add:
2552 tmp = cast(Dict[Any, Any], to_add[NameObject("/Dest")])
2553 dest = Destination(
2554 NameObject("/LinkName"),
2555 tmp["target_page_index"],
2556 Fit(
2557 fit_type=tmp["fit"], fit_args=dict(tmp)["fit_args"]
2558 ), # I have no clue why this dict-hack is necessary
2559 )
2560 to_add[NameObject("/Dest")] = dest.dest_array
2562 page.annotations.append(self._add_object(to_add))
2564 if to_add.get("/Subtype") == "/Popup" and NameObject("/Parent") in to_add:
2565 cast(DictionaryObject, to_add["/Parent"].get_object())[
2566 NameObject("/Popup")
2567 ] = to_add.indirect_reference
2569 return to_add
2571 def clean_page(self, page: Union[PageObject, IndirectObject]) -> PageObject:
2572 """
2573 Perform some clean up in the page.
2574 Currently: convert NameObject named destination to TextStringObject
2575 (required for names/dests list)
2577 Args:
2578 page:
2580 Returns:
2581 The cleaned PageObject
2583 """
2584 page = cast("PageObject", page.get_object())
2585 for a in page.get("/Annots", []):
2586 a_obj = a.get_object()
2587 d = a_obj.get("/Dest", None)
2588 act = a_obj.get("/A", None)
2589 if isinstance(d, NameObject):
2590 a_obj[NameObject("/Dest")] = TextStringObject(d)
2591 elif act is not None:
2592 act = act.get_object()
2593 d = act.get("/D", None)
2594 if isinstance(d, NameObject):
2595 act[NameObject("/D")] = TextStringObject(d)
2596 return page
2598 def _create_stream(
2599 self, fileobj: Union[Path, StrByteType, PdfReader]
2600 ) -> Tuple[IOBase, Optional[Encryption]]:
2601 # If the fileobj parameter is a string, assume it is a path
2602 # and create a file object at that location. If it is a file,
2603 # copy the file's contents into a BytesIO stream object; if
2604 # it is a PdfReader, copy that reader's stream into a
2605 # BytesIO stream.
2606 # If fileobj is none of the above types, it is not modified
2607 encryption_obj = None
2608 stream: IOBase
2609 if isinstance(fileobj, (str, Path)):
2610 with FileIO(fileobj, "rb") as f:
2611 stream = BytesIO(f.read())
2612 elif isinstance(fileobj, PdfReader):
2613 if fileobj._encryption:
2614 encryption_obj = fileobj._encryption
2615 orig_tell = fileobj.stream.tell()
2616 fileobj.stream.seek(0)
2617 stream = BytesIO(fileobj.stream.read())
2619 # reset the stream to its original location
2620 fileobj.stream.seek(orig_tell)
2621 elif hasattr(fileobj, "seek") and hasattr(fileobj, "read"):
2622 fileobj.seek(0)
2623 filecontent = fileobj.read()
2624 stream = BytesIO(filecontent)
2625 else:
2626 raise NotImplementedError(
2627 "Merging requires an object that PdfReader can parse. "
2628 "Typically, that is a Path or a string representing a Path, "
2629 "a file object, or an object implementing .seek and .read. "
2630 "Passing a PdfReader directly works as well."
2631 )
2632 return stream, encryption_obj
2634 def append(
2635 self,
2636 fileobj: Union[StrByteType, PdfReader, Path],
2637 outline_item: Union[
2638 str, None, PageRange, Tuple[int, int], Tuple[int, int, int], List[int]
2639 ] = None,
2640 pages: Union[
2641 None,
2642 PageRange,
2643 Tuple[int, int],
2644 Tuple[int, int, int],
2645 List[int],
2646 List[PageObject],
2647 ] = None,
2648 import_outline: bool = True,
2649 excluded_fields: Optional[Union[List[str], Tuple[str, ...]]] = None,
2650 ) -> None:
2651 """
2652 Identical to the :meth:`merge()<merge>` method, but assumes you want to
2653 concatenate all pages onto the end of the file instead of specifying a
2654 position.
2656 Args:
2657 fileobj: A File Object or an object that supports the standard
2658 read and seek methods similar to a File Object. Could also be a
2659 string representing a path to a PDF file.
2660 outline_item: Optionally, you may specify a string to build an
2661 outline (aka 'bookmark') to identify the beginning of the
2662 included file.
2663 pages: Can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2664 or a ``(start, stop[, step])`` tuple
2665 or a list of pages to be processed
2666 to merge only the specified range of pages from the source
2667 document into the output document.
2668 import_outline: You may prevent the source document's
2669 outline (collection of outline items, previously referred to as
2670 'bookmarks') from being imported by specifying this as ``False``.
2671 excluded_fields: Provide the list of fields/keys to be ignored
2672 if ``/Annots`` is part of the list, the annotation will be ignored
2673 if ``/B`` is part of the list, the articles will be ignored
2675 """
2676 if excluded_fields is None:
2677 excluded_fields = ()
2678 if isinstance(outline_item, (tuple, list, PageRange)):
2679 if isinstance(pages, bool):
2680 if not isinstance(import_outline, bool):
2681 excluded_fields = import_outline
2682 import_outline = pages
2683 pages = outline_item
2684 self.merge(
2685 None,
2686 fileobj,
2687 None,
2688 pages,
2689 import_outline,
2690 excluded_fields,
2691 )
2692 else: # if isinstance(outline_item, str):
2693 self.merge(
2694 None,
2695 fileobj,
2696 outline_item,
2697 pages,
2698 import_outline,
2699 excluded_fields,
2700 )
2702 def merge(
2703 self,
2704 position: Optional[int],
2705 fileobj: Union[Path, StrByteType, PdfReader],
2706 outline_item: Optional[str] = None,
2707 pages: Optional[Union[PageRangeSpec, List[PageObject]]] = None,
2708 import_outline: bool = True,
2709 excluded_fields: Optional[Union[List[str], Tuple[str, ...]]] = (),
2710 ) -> None:
2711 """
2712 Merge the pages from the given file into the output file at the
2713 specified page number.
2715 Args:
2716 position: The *page number* to insert this file. File will
2717 be inserted after the given number.
2718 fileobj: A File Object or an object that supports the standard
2719 read and seek methods similar to a File Object. Could also be a
2720 string representing a path to a PDF file.
2721 outline_item: Optionally, you may specify a string to build an outline
2722 (aka 'bookmark') to identify the
2723 beginning of the included file.
2724 pages: can be a :class:`PageRange<pypdf.pagerange.PageRange>`
2725 or a ``(start, stop[, step])`` tuple
2726 or a list of pages to be processed
2727 to merge only the specified range of pages from the source
2728 document into the output document.
2729 import_outline: You may prevent the source document's
2730 outline (collection of outline items, previously referred to as
2731 'bookmarks') from being imported by specifying this as ``False``.
2732 excluded_fields: provide the list of fields/keys to be ignored
2733 if ``/Annots`` is part of the list, the annotation will be ignored
2734 if ``/B`` is part of the list, the articles will be ignored
2736 Raises:
2737 TypeError: The pages attribute is not configured properly
2739 """
2740 if isinstance(fileobj, PdfDocCommon):
2741 reader = fileobj
2742 else:
2743 stream, encryption_obj = self._create_stream(fileobj)
2744 # Create a new PdfReader instance using the stream
2745 # (either file or BytesIO or StringIO) created above
2746 reader = PdfReader(stream, strict=False) # type: ignore[arg-type]
2748 if excluded_fields is None:
2749 excluded_fields = ()
2750 # Find the range of pages to merge.
2751 if pages is None:
2752 pages = list(range(len(reader.pages)))
2753 elif isinstance(pages, PageRange):
2754 pages = list(range(*pages.indices(len(reader.pages))))
2755 elif isinstance(pages, list):
2756 pass # keep unchanged
2757 elif isinstance(pages, tuple) and len(pages) <= 3:
2758 pages = list(range(*pages))
2759 elif not isinstance(pages, tuple):
2760 raise TypeError(
2761 '"pages" must be a tuple of (start, stop[, step]) or a list'
2762 )
2764 srcpages = {}
2765 for page in pages:
2766 if isinstance(page, PageObject):
2767 pg = page
2768 else:
2769 pg = reader.pages[page]
2770 assert pg.indirect_reference is not None
2771 if position is None:
2772 # numbers in the exclude list identifies that the exclusion is
2773 # only applicable to 1st level of cloning
2774 srcpages[pg.indirect_reference.idnum] = self.add_page(
2775 pg, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2776 )
2777 else:
2778 srcpages[pg.indirect_reference.idnum] = self.insert_page(
2779 pg, position, [*list(excluded_fields), 1, "/B", 1, "/Annots"] # type: ignore
2780 )
2781 position += 1
2782 srcpages[pg.indirect_reference.idnum].original_page = pg
2784 reader._named_destinations = (
2785 reader.named_destinations
2786 ) # need for the outline processing below
2788 arr: Any
2790 def _process_named_dests(dest: Any) -> None:
2791 arr = dest.dest_array
2792 if "/Names" in self._root_object and dest["/Title"] in cast(
2793 List[Any],
2794 cast(
2795 DictionaryObject,
2796 cast(DictionaryObject, self._root_object["/Names"]).get("/Dests", DictionaryObject()),
2797 ).get("/Names", DictionaryObject()),
2798 ):
2799 # already exists: should not duplicate it
2800 pass
2801 elif dest["/Page"] is None or isinstance(dest["/Page"], NullObject):
2802 pass
2803 elif isinstance(dest["/Page"], int):
2804 # the page reference is a page number normally not a PDF Reference
2805 # page numbers as int are normally accepted only in external goto
2806 try:
2807 p = reader.pages[dest["/Page"]]
2808 except IndexError:
2809 return
2810 assert p.indirect_reference is not None
2811 try:
2812 arr[NumberObject(0)] = NumberObject(
2813 srcpages[p.indirect_reference.idnum].page_number
2814 )
2815 self.add_named_destination_array(dest["/Title"], arr)
2816 except KeyError:
2817 pass
2818 elif dest["/Page"].indirect_reference.idnum in srcpages:
2819 arr[NumberObject(0)] = srcpages[
2820 dest["/Page"].indirect_reference.idnum
2821 ].indirect_reference
2822 self.add_named_destination_array(dest["/Title"], arr)
2824 for dest in reader._named_destinations.values():
2825 _process_named_dests(dest)
2827 outline_item_typ: TreeObject
2828 if outline_item is not None:
2829 outline_item_typ = cast(
2830 "TreeObject",
2831 self.add_outline_item(
2832 TextStringObject(outline_item),
2833 next(iter(srcpages.values())).indirect_reference,
2834 fit=PAGE_FIT,
2835 ).get_object(),
2836 )
2837 else:
2838 outline_item_typ = self.get_outline_root()
2840 _ro = reader.root_object
2841 if import_outline and CO.OUTLINES in _ro:
2842 outline = self._get_filtered_outline(
2843 _ro.get(CO.OUTLINES, None), srcpages, reader
2844 )
2845 self._insert_filtered_outline(
2846 outline, outline_item_typ, None
2847 ) # TODO: use before parameter
2849 if "/Annots" not in excluded_fields:
2850 for pag in srcpages.values():
2851 lst = self._insert_filtered_annotations(
2852 pag.original_page.get("/Annots", []), pag, srcpages, reader
2853 )
2854 if len(lst) > 0:
2855 pag[NameObject("/Annots")] = lst
2856 self.clean_page(pag)
2858 if "/AcroForm" in _ro and _ro["/AcroForm"] is not None:
2859 if "/AcroForm" not in self._root_object:
2860 self._root_object[NameObject("/AcroForm")] = self._add_object(
2861 cast(
2862 DictionaryObject,
2863 reader.root_object["/AcroForm"],
2864 ).clone(self, False, ("/Fields",))
2865 )
2866 arr = ArrayObject()
2867 else:
2868 arr = cast(
2869 ArrayObject,
2870 cast(DictionaryObject, self._root_object["/AcroForm"])["/Fields"],
2871 )
2872 trslat = self._id_translated[id(reader)]
2873 try:
2874 for f in reader.root_object["/AcroForm"]["/Fields"]: # type: ignore
2875 try:
2876 ind = IndirectObject(trslat[f.idnum], 0, self)
2877 if ind not in arr:
2878 arr.append(ind)
2879 except KeyError:
2880 # for trslat[] which mean the field has not be copied
2881 # through the page
2882 pass
2883 except KeyError: # for /Acroform or /Fields are not existing
2884 arr = self._add_object(ArrayObject())
2885 cast(DictionaryObject, self._root_object["/AcroForm"])[
2886 NameObject("/Fields")
2887 ] = arr
2889 if "/B" not in excluded_fields:
2890 self.add_filtered_articles("", srcpages, reader)
2892 def _add_articles_thread(
2893 self,
2894 thread: DictionaryObject, # thread entry from the reader's array of threads
2895 pages: Dict[int, PageObject],
2896 reader: PdfReader,
2897 ) -> IndirectObject:
2898 """
2899 Clone the thread with only the applicable articles.
2901 Args:
2902 thread:
2903 pages:
2904 reader:
2906 Returns:
2907 The added thread as an indirect reference
2909 """
2910 nthread = thread.clone(
2911 self, force_duplicate=True, ignore_fields=("/F",)
2912 ) # use of clone to keep link between reader and writer
2913 self.threads.append(nthread.indirect_reference)
2914 first_article = cast("DictionaryObject", thread["/F"])
2915 current_article: Optional[DictionaryObject] = first_article
2916 new_article: Optional[DictionaryObject] = None
2917 while current_article is not None:
2918 pag = self._get_cloned_page(
2919 cast("PageObject", current_article["/P"]), pages, reader
2920 )
2921 if pag is not None:
2922 if new_article is None:
2923 new_article = cast(
2924 "DictionaryObject",
2925 self._add_object(DictionaryObject()).get_object(),
2926 )
2927 new_first = new_article
2928 nthread[NameObject("/F")] = new_article.indirect_reference
2929 else:
2930 new_article2 = cast(
2931 "DictionaryObject",
2932 self._add_object(
2933 DictionaryObject(
2934 {NameObject("/V"): new_article.indirect_reference}
2935 )
2936 ).get_object(),
2937 )
2938 new_article[NameObject("/N")] = new_article2.indirect_reference
2939 new_article = new_article2
2940 new_article[NameObject("/P")] = pag
2941 new_article[NameObject("/T")] = nthread.indirect_reference
2942 new_article[NameObject("/R")] = current_article["/R"]
2943 pag_obj = cast("PageObject", pag.get_object())
2944 if "/B" not in pag_obj:
2945 pag_obj[NameObject("/B")] = ArrayObject()
2946 cast("ArrayObject", pag_obj["/B"]).append(
2947 new_article.indirect_reference
2948 )
2949 current_article = cast("DictionaryObject", current_article["/N"])
2950 if current_article == first_article:
2951 new_article[NameObject("/N")] = new_first.indirect_reference # type: ignore
2952 new_first[NameObject("/V")] = new_article.indirect_reference # type: ignore
2953 current_article = None
2954 assert nthread.indirect_reference is not None
2955 return nthread.indirect_reference
2957 def add_filtered_articles(
2958 self,
2959 fltr: Union[
2960 Pattern[Any], str
2961 ], # thread entry from the reader's array of threads
2962 pages: Dict[int, PageObject],
2963 reader: PdfReader,
2964 ) -> None:
2965 """
2966 Add articles matching the defined criteria.
2968 Args:
2969 fltr:
2970 pages:
2971 reader:
2973 """
2974 if isinstance(fltr, str):
2975 fltr = re.compile(fltr)
2976 elif not isinstance(fltr, Pattern):
2977 fltr = re.compile("")
2978 for p in pages.values():
2979 pp = p.original_page
2980 for a in pp.get("/B", ()):
2981 thr = a.get_object().get("/T")
2982 if thr is None:
2983 continue
2984 thr = thr.get_object()
2985 if thr.indirect_reference.idnum not in self._id_translated[
2986 id(reader)
2987 ] and fltr.search((thr.get("/I", {})).get("/Title", "")):
2988 self._add_articles_thread(thr, pages, reader)
2990 def _get_cloned_page(
2991 self,
2992 page: Union[None, IndirectObject, PageObject, NullObject],
2993 pages: Dict[int, PageObject],
2994 reader: PdfReader,
2995 ) -> Optional[IndirectObject]:
2996 if isinstance(page, NullObject):
2997 return None
2998 if isinstance(page, DictionaryObject) and page.get("/Type", "") == "/Page":
2999 _i = page.indirect_reference
3000 elif isinstance(page, IndirectObject):
3001 _i = page
3002 try:
3003 return pages[_i.idnum].indirect_reference # type: ignore
3004 except Exception:
3005 return None
3007 def _insert_filtered_annotations(
3008 self,
3009 annots: Union[IndirectObject, List[DictionaryObject], None],
3010 page: PageObject,
3011 pages: Dict[int, PageObject],
3012 reader: PdfReader,
3013 ) -> List[Destination]:
3014 outlist = ArrayObject()
3015 if isinstance(annots, IndirectObject):
3016 annots = cast("List[Any]", annots.get_object())
3017 if annots is None:
3018 return outlist
3019 if not isinstance(annots, list):
3020 logger_warning(f"Expected list of annotations, got {annots} of type {annots.__class__.__name__}.", __name__)
3021 return outlist
3022 for an in annots:
3023 ano = cast("DictionaryObject", an.get_object())
3024 if (
3025 ano["/Subtype"] != "/Link"
3026 or "/A" not in ano
3027 or cast("DictionaryObject", ano["/A"])["/S"] != "/GoTo"
3028 or "/Dest" in ano
3029 ):
3030 if "/Dest" not in ano:
3031 outlist.append(self._add_object(ano.clone(self)))
3032 else:
3033 d = ano["/Dest"]
3034 if isinstance(d, str):
3035 # it is a named dest
3036 if str(d) in self.get_named_dest_root():
3037 outlist.append(ano.clone(self).indirect_reference)
3038 else:
3039 d = cast("ArrayObject", d)
3040 p = self._get_cloned_page(d[0], pages, reader)
3041 if p is not None:
3042 anc = ano.clone(self, ignore_fields=("/Dest",))
3043 anc[NameObject("/Dest")] = ArrayObject([p, *d[1:]])
3044 outlist.append(self._add_object(anc))
3045 else:
3046 d = cast("DictionaryObject", ano["/A"]).get("/D", NullObject())
3047 if d is None or isinstance(d, NullObject):
3048 continue
3049 if isinstance(d, str):
3050 # it is a named dest
3051 if str(d) in self.get_named_dest_root():
3052 outlist.append(ano.clone(self).indirect_reference)
3053 else:
3054 d = cast("ArrayObject", d)
3055 p = self._get_cloned_page(d[0], pages, reader)
3056 if p is not None:
3057 anc = ano.clone(self, ignore_fields=("/D",))
3058 cast("DictionaryObject", anc["/A"])[
3059 NameObject("/D")
3060 ] = ArrayObject([p, *d[1:]])
3061 outlist.append(self._add_object(anc))
3062 return outlist
3064 def _get_filtered_outline(
3065 self,
3066 node: Any,
3067 pages: Dict[int, PageObject],
3068 reader: PdfReader,
3069 ) -> List[Destination]:
3070 """
3071 Extract outline item entries that are part of the specified page set.
3073 Args:
3074 node:
3075 pages:
3076 reader:
3078 Returns:
3079 A list of destination objects.
3081 """
3082 new_outline = []
3083 if node is None:
3084 node = NullObject()
3085 node = node.get_object()
3086 if is_null_or_none(node):
3087 node = DictionaryObject()
3088 if node.get("/Type", "") == "/Outlines" or "/Title" not in node:
3089 node = node.get("/First", None)
3090 if node is not None:
3091 node = node.get_object()
3092 new_outline += self._get_filtered_outline(node, pages, reader)
3093 else:
3094 v: Union[None, IndirectObject, NullObject]
3095 while node is not None:
3096 node = node.get_object()
3097 o = cast("Destination", reader._build_outline_item(node))
3098 v = self._get_cloned_page(cast("PageObject", o["/Page"]), pages, reader)
3099 if v is None:
3100 v = NullObject()
3101 o[NameObject("/Page")] = v
3102 if "/First" in node:
3103 o._filtered_children = self._get_filtered_outline(
3104 node["/First"], pages, reader
3105 )
3106 else:
3107 o._filtered_children = []
3108 if (
3109 not isinstance(o["/Page"], NullObject)
3110 or len(o._filtered_children) > 0
3111 ):
3112 new_outline.append(o)
3113 node = node.get("/Next", None)
3114 return new_outline
3116 def _clone_outline(self, dest: Destination) -> TreeObject:
3117 n_ol = TreeObject()
3118 self._add_object(n_ol)
3119 n_ol[NameObject("/Title")] = TextStringObject(dest["/Title"])
3120 if not isinstance(dest["/Page"], NullObject):
3121 if dest.node is not None and "/A" in dest.node:
3122 n_ol[NameObject("/A")] = dest.node["/A"].clone(self)
3123 else:
3124 n_ol[NameObject("/Dest")] = dest.dest_array
3125 # TODO: /SE
3126 if dest.node is not None:
3127 n_ol[NameObject("/F")] = NumberObject(dest.node.get("/F", 0))
3128 n_ol[NameObject("/C")] = ArrayObject(
3129 dest.node.get(
3130 "/C", [FloatObject(0.0), FloatObject(0.0), FloatObject(0.0)]
3131 )
3132 )
3133 return n_ol
3135 def _insert_filtered_outline(
3136 self,
3137 outlines: List[Destination],
3138 parent: Union[TreeObject, IndirectObject],
3139 before: Union[None, TreeObject, IndirectObject] = None,
3140 ) -> None:
3141 for dest in outlines:
3142 # TODO: can be improved to keep A and SE entries (ignored for the moment)
3143 # with np=self.add_outline_item_destination(dest,parent,before)
3144 if dest.get("/Type", "") == "/Outlines" or "/Title" not in dest:
3145 np = parent
3146 else:
3147 np = self._clone_outline(dest)
3148 cast(TreeObject, parent.get_object()).insert_child(np, before, self)
3149 self._insert_filtered_outline(dest._filtered_children, np, None)
3151 def close(self) -> None:
3152 """Implemented for API harmonization."""
3153 return
3155 def find_outline_item(
3156 self,
3157 outline_item: Dict[str, Any],
3158 root: Optional[OutlineType] = None,
3159 ) -> Optional[List[int]]:
3160 if root is None:
3161 o = self.get_outline_root()
3162 else:
3163 o = cast("TreeObject", root)
3165 i = 0
3166 while o is not None:
3167 if (
3168 o.indirect_reference == outline_item
3169 or o.get("/Title", None) == outline_item
3170 ):
3171 return [i]
3172 if "/First" in o:
3173 res = self.find_outline_item(
3174 outline_item, cast(OutlineType, o["/First"])
3175 )
3176 if res:
3177 return ([i] if "/Title" in o else []) + res
3178 if "/Next" in o:
3179 i += 1
3180 o = cast(TreeObject, o["/Next"])
3181 else:
3182 return None
3184 def find_bookmark(
3185 self,
3186 outline_item: Dict[str, Any],
3187 root: Optional[OutlineType] = None,
3188 ) -> None: # deprecated
3189 """
3190 .. deprecated:: 2.9.0
3191 Use :meth:`find_outline_item` instead.
3192 """
3193 deprecation_with_replacement("find_bookmark", "find_outline_item", "5.0.0")
3195 def reset_translation(
3196 self, reader: Union[None, PdfReader, IndirectObject] = None
3197 ) -> None:
3198 """
3199 Reset the translation table between reader and the writer object.
3201 Late cloning will create new independent objects.
3203 Args:
3204 reader: PdfReader or IndirectObject referencing a PdfReader object.
3205 if set to None or omitted, all tables will be reset.
3207 """
3208 if reader is None:
3209 self._id_translated = {}
3210 elif isinstance(reader, PdfReader):
3211 try:
3212 del self._id_translated[id(reader)]
3213 except Exception:
3214 pass
3215 elif isinstance(reader, IndirectObject):
3216 try:
3217 del self._id_translated[id(reader.pdf)]
3218 except Exception:
3219 pass
3220 else:
3221 raise Exception("invalid parameter {reader}")
3223 def set_page_label(
3224 self,
3225 page_index_from: int,
3226 page_index_to: int,
3227 style: Optional[PageLabelStyle] = None,
3228 prefix: Optional[str] = None,
3229 start: Optional[int] = 0,
3230 ) -> None:
3231 """
3232 Set a page label to a range of pages.
3234 Page indexes must be given starting from 0.
3235 Labels must have a style, a prefix or both.
3236 If a range is not assigned any page label, a decimal label starting from 1 is applied.
3238 Args:
3239 page_index_from: page index of the beginning of the range starting from 0
3240 page_index_to: page index of the beginning of the range starting from 0
3241 style: The numbering style to be used for the numeric portion of each page label:
3243 * ``/D`` Decimal Arabic numerals
3244 * ``/R`` Uppercase Roman numerals
3245 * ``/r`` Lowercase Roman numerals
3246 * ``/A`` Uppercase letters (A to Z for the first 26 pages,
3247 AA to ZZ for the next 26, and so on)
3248 * ``/a`` Lowercase letters (a to z for the first 26 pages,
3249 aa to zz for the next 26, and so on)
3251 prefix: The label prefix for page labels in this range.
3252 start: The value of the numeric portion for the first page label
3253 in the range.
3254 Subsequent pages are numbered sequentially from this value,
3255 which must be greater than or equal to 1.
3256 Default value: 1.
3258 """
3259 if style is None and prefix is None:
3260 raise ValueError("At least one of style and prefix must be given")
3261 if page_index_from < 0:
3262 raise ValueError("page_index_from must be greater or equal than 0")
3263 if page_index_to < page_index_from:
3264 raise ValueError(
3265 "page_index_to must be greater or equal than page_index_from"
3266 )
3267 if page_index_to >= len(self.pages):
3268 raise ValueError("page_index_to exceeds number of pages")
3269 if start is not None and start != 0 and start < 1:
3270 raise ValueError("If given, start must be greater or equal than one")
3272 self._set_page_label(page_index_from, page_index_to, style, prefix, start)
3274 def _set_page_label(
3275 self,
3276 page_index_from: int,
3277 page_index_to: int,
3278 style: Optional[PageLabelStyle] = None,
3279 prefix: Optional[str] = None,
3280 start: Optional[int] = 0,
3281 ) -> None:
3282 """
3283 Set a page label to a range of pages.
3285 Page indexes must be given starting from 0.
3286 Labels must have a style, a prefix or both.
3287 If a range is not assigned any page label a decimal label starting from 1 is applied.
3289 Args:
3290 page_index_from: page index of the beginning of the range starting from 0
3291 page_index_to: page index of the beginning of the range starting from 0
3292 style: The numbering style to be used for the numeric portion of each page label:
3293 /D Decimal Arabic numerals
3294 /R Uppercase Roman numerals
3295 /r Lowercase Roman numerals
3296 /A Uppercase letters (A to Z for the first 26 pages,
3297 AA to ZZ for the next 26, and so on)
3298 /a Lowercase letters (a to z for the first 26 pages,
3299 aa to zz for the next 26, and so on)
3300 prefix: The label prefix for page labels in this range.
3301 start: The value of the numeric portion for the first page label
3302 in the range.
3303 Subsequent pages are numbered sequentially from this value,
3304 which must be greater than or equal to 1. Default value: 1.
3306 """
3307 default_page_label = DictionaryObject()
3308 default_page_label[NameObject("/S")] = NameObject("/D")
3310 new_page_label = DictionaryObject()
3311 if style is not None:
3312 new_page_label[NameObject("/S")] = NameObject(style)
3313 if prefix is not None:
3314 new_page_label[NameObject("/P")] = TextStringObject(prefix)
3315 if start != 0:
3316 new_page_label[NameObject("/St")] = NumberObject(start)
3318 if NameObject(CatalogDictionary.PAGE_LABELS) not in self._root_object:
3319 nums = ArrayObject()
3320 nums_insert(NumberObject(0), default_page_label, nums)
3321 page_labels = TreeObject()
3322 page_labels[NameObject("/Nums")] = nums
3323 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3325 page_labels = cast(
3326 TreeObject, self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)]
3327 )
3328 nums = cast(ArrayObject, page_labels[NameObject("/Nums")])
3330 nums_insert(NumberObject(page_index_from), new_page_label, nums)
3331 nums_clear_range(NumberObject(page_index_from), page_index_to, nums)
3332 next_label_pos, *_ = nums_next(NumberObject(page_index_from), nums)
3333 if next_label_pos != page_index_to + 1 and page_index_to + 1 < len(self.pages):
3334 nums_insert(NumberObject(page_index_to + 1), default_page_label, nums)
3336 page_labels[NameObject("/Nums")] = nums
3337 self._root_object[NameObject(CatalogDictionary.PAGE_LABELS)] = page_labels
3339 def _repr_mimebundle_(
3340 self,
3341 include: Union[None, Iterable[str]] = None,
3342 exclude: Union[None, Iterable[str]] = None,
3343 ) -> Dict[str, Any]:
3344 """
3345 Integration into Jupyter Notebooks.
3347 This method returns a dictionary that maps a mime-type to its
3348 representation.
3350 .. seealso::
3352 https://ipython.readthedocs.io/en/stable/config/integrating.html
3353 """
3354 pdf_data = BytesIO()
3355 self.write(pdf_data)
3356 data = {
3357 "application/pdf": pdf_data,
3358 }
3360 if include is not None:
3361 # Filter representations based on include list
3362 data = {k: v for k, v in data.items() if k in include}
3364 if exclude is not None:
3365 # Remove representations based on exclude list
3366 data = {k: v for k, v in data.items() if k not in exclude}
3368 return data
3371def _pdf_objectify(obj: Union[Dict[str, Any], str, float, List[Any]]) -> PdfObject:
3372 if isinstance(obj, PdfObject):
3373 return obj
3374 if isinstance(obj, dict):
3375 to_add = DictionaryObject()
3376 for key, value in obj.items():
3377 to_add[NameObject(key)] = _pdf_objectify(value)
3378 return to_add
3379 if isinstance(obj, str):
3380 if obj.startswith("/"):
3381 return NameObject(obj)
3382 return TextStringObject(obj)
3383 if isinstance(obj, (float, int)):
3384 return FloatObject(obj)
3385 if isinstance(obj, list):
3386 return ArrayObject(_pdf_objectify(i) for i in obj)
3387 raise NotImplementedError(
3388 f"{type(obj)=} could not be cast to a PdfObject"
3389 )
3392def _create_outline_item(
3393 action_ref: Union[None, IndirectObject],
3394 title: str,
3395 color: Union[Tuple[float, float, float], str, None],
3396 italic: bool,
3397 bold: bool,
3398) -> TreeObject:
3399 outline_item = TreeObject()
3400 if action_ref is not None:
3401 outline_item[NameObject("/A")] = action_ref
3402 outline_item.update(
3403 {
3404 NameObject("/Title"): create_string_object(title),
3405 }
3406 )
3407 if color:
3408 if isinstance(color, str):
3409 color = hex_to_rgb(color)
3410 outline_item.update(
3411 {NameObject("/C"): ArrayObject([FloatObject(c) for c in color])}
3412 )
3413 if italic or bold:
3414 format_flag = 0
3415 if italic:
3416 format_flag += OutlineFontFlag.italic
3417 if bold:
3418 format_flag += OutlineFontFlag.bold
3419 outline_item.update({NameObject("/F"): NumberObject(format_flag)})
3420 return outline_item
3423def generate_appearance_stream(
3424 txt: str,
3425 sel: List[str],
3426 da: str,
3427 font_full_rev: Dict[str, bytes],
3428 rct: RectangleObject,
3429 font_height: float,
3430 y_offset: float,
3431) -> bytes:
3432 ap_stream = f"q\n/Tx BMC \nq\n1 1 {rct.width - 1} {rct.height - 1} re\nW\nBT\n{da}\n".encode()
3433 for line_number, line in enumerate(txt.replace("\n", "\r").split("\r")):
3434 if line in sel:
3435 # may be improved but cannot find how to get fill working => replaced with lined box
3436 ap_stream += (
3437 f"1 {y_offset - (line_number * font_height * 1.4) - 1} {rct.width - 2} {font_height + 2} re\n"
3438 f"0.5 0.5 0.5 rg s\n{da}\n"
3439 ).encode()
3440 if line_number == 0:
3441 ap_stream += f"2 {y_offset} Td\n".encode()
3442 else:
3443 # Td is a relative translation
3444 ap_stream += f"0 {- font_height * 1.4} Td\n".encode()
3445 enc_line: List[bytes] = [
3446 font_full_rev.get(c, c.encode("utf-16-be")) for c in line
3447 ]
3448 if any(len(c) >= 2 for c in enc_line):
3449 ap_stream += b"<" + (b"".join(enc_line)).hex().encode() + b"> Tj\n"
3450 else:
3451 ap_stream += b"(" + b"".join(enc_line) + b") Tj\n"
3452 ap_stream += b"ET\nQ\nEMC\nQ\n"
3453 return ap_stream