1from __future__ import annotations
2
3import bisect
4from functools import cached_property
5from typing import TYPE_CHECKING, cast
6
7from pypdf._utils import format_iso8824_date, parse_iso8824_date
8from pypdf.constants import CatalogAttributes as CA
9from pypdf.constants import FileSpecificationDictionaryEntries
10from pypdf.constants import PageAttributes as PG
11from pypdf.errors import PdfReadError, PyPdfError
12from pypdf.generic import (
13 ArrayObject,
14 ByteStringObject,
15 DecodedStreamObject,
16 DictionaryObject,
17 NameObject,
18 NullObject,
19 NumberObject,
20 StreamObject,
21 TextStringObject,
22 is_null_or_none,
23)
24
25if TYPE_CHECKING:
26 import datetime
27 from collections.abc import Generator
28
29 from pypdf._writer import PdfWriter
30
31
32class EmbeddedFile:
33 """
34 Container holding the information on an embedded file.
35
36 Attributes are evaluated lazily if possible.
37
38 Further information on embedded files can be found in section 7.11 of the PDF 2.0 specification.
39 """
40 def __init__(self, name: str, pdf_object: DictionaryObject, parent: ArrayObject | None = None) -> None:
41 """
42 Args:
43 name: The (primary) name as provided in the name tree.
44 pdf_object: The corresponding PDF object to allow retrieving further data.
45 parent: The parent list.
46 """
47 self._name = name
48 self.pdf_object = pdf_object
49 self._parent = parent
50
51 @property
52 def name(self) -> str:
53 """The (primary) name of the embedded file as provided in the name tree."""
54 return self._name
55
56 @classmethod
57 def _create_new(cls, writer: PdfWriter, name: str, content: str | bytes) -> EmbeddedFile:
58 """
59 Create a new embedded file and add it to the PdfWriter.
60
61 Args:
62 writer: The PdfWriter instance to add the embedded file to.
63 name: The filename to display.
64 content: The data in the file.
65
66 Returns:
67 EmbeddedFile instance for the newly created embedded file.
68 """
69 # Convert string content to bytes if needed
70 if isinstance(content, str):
71 content = content.encode("latin-1")
72
73 # Create the file entry (the actual embedded file stream)
74 file_entry = DecodedStreamObject()
75 file_entry.set_data(content)
76 file_entry.update({NameObject(PG.TYPE): NameObject("/EmbeddedFile")})
77
78 # Create the /EF entry
79 ef_entry = DictionaryObject()
80 ef_entry.update({NameObject("/F"): writer._add_object(file_entry)})
81
82 # Create the filespec dictionary
83 from pypdf.generic import create_string_object # noqa: PLC0415
84 filespec = DictionaryObject()
85 filespec_reference = writer._add_object(filespec)
86 name_object = cast(TextStringObject, create_string_object(name))
87 filespec.update(
88 {
89 NameObject(PG.TYPE): NameObject("/Filespec"),
90 NameObject(FileSpecificationDictionaryEntries.F): name_object,
91 NameObject(FileSpecificationDictionaryEntries.EF): ef_entry,
92 }
93 )
94
95 # Add the name and filespec to the names array.
96 # We use the inverse order for insertion, as this allows us to re-use the
97 # same index.
98 names_array = cls._get_names_array(writer)
99 insertion_index = cls._get_insertion_index(names_array, name_object)
100 names_array.insert(insertion_index, filespec_reference)
101 names_array.insert(insertion_index, name_object)
102
103 # Return an EmbeddedFile instance
104 return cls(name=name, pdf_object=filespec, parent=names_array)
105
106 @classmethod
107 def _get_names_array(cls, writer: PdfWriter) -> ArrayObject:
108 """Get the names array for embedded files, possibly creating and flattening it."""
109 if CA.NAMES not in writer.root_object:
110 # Add the /Names entry to the catalog.
111 writer.root_object[NameObject(CA.NAMES)] = writer._add_object(DictionaryObject())
112
113 names_dict = cast(DictionaryObject, writer.root_object[CA.NAMES])
114 if "/EmbeddedFiles" not in names_dict:
115 # We do not yet have an entry for embedded files. Create and return it.
116 names = ArrayObject()
117 embedded_files_names_dictionary = DictionaryObject(
118 {NameObject(CA.NAMES): names}
119 )
120 names_dict[NameObject("/EmbeddedFiles")] = writer._add_object(embedded_files_names_dictionary)
121 return names
122
123 # We have an existing embedded files entry.
124 embedded_files_names_tree = cast(DictionaryObject, names_dict["/EmbeddedFiles"])
125 if "/Names" in embedded_files_names_tree:
126 # Simple case: We already have a flat list.
127 return cast(ArrayObject, embedded_files_names_tree[NameObject(CA.NAMES)])
128 if "/Kids" not in embedded_files_names_tree:
129 # Invalid case: This is no name tree.
130 raise PdfReadError("Got neither Names nor Kids in embedded files tree.")
131
132 # Complex case: Convert a /Kids-based name tree to a /Names-based one.
133 # /Name-based ones are much easier to handle and allow us to simplify the
134 # actual insertion logic by only having to consider one case.
135 names = ArrayObject()
136 kids = cast(ArrayObject, embedded_files_names_tree["/Kids"].get_object())
137 embedded_files_names_dictionary = DictionaryObject(
138 {NameObject(CA.NAMES): names}
139 )
140 names_dict[NameObject("/EmbeddedFiles")] = writer._add_object(embedded_files_names_dictionary)
141 for kid in kids:
142 # Write the flattened file entries. As we do not change the actual files,
143 # this should not have any impact on references to them.
144 # There might be further (nested) kids here.
145 # Wait for an example before evaluating an implementation.
146 for name in kid.get_object().get("/Names", []):
147 names.append(name)
148 return names
149
150 @classmethod
151 def _get_insertion_index(cls, names_array: ArrayObject, name: str) -> int:
152 keys = [names_array[i].encode("utf-8") for i in range(0, len(names_array), 2)]
153 name_bytes = name.encode("utf-8")
154
155 start = bisect.bisect_left(keys, name_bytes)
156 end = bisect.bisect_right(keys, name_bytes)
157
158 if start != end:
159 return end * 2
160 if start == 0:
161 return 0
162 if start == (key_count := len(keys)):
163 return key_count * 2
164 return end * 2
165
166 @property
167 def alternative_name(self) -> str | None:
168 """Retrieve the alternative name (file specification)."""
169 for key in [FileSpecificationDictionaryEntries.UF, FileSpecificationDictionaryEntries.F]:
170 # PDF 2.0 reference, table 43:
171 # > A PDF reader shall use the value of the UF key, when present, instead of the F key.
172 if key in self.pdf_object:
173 value = self.pdf_object[key].get_object()
174 if not is_null_or_none(value):
175 return cast(str, value)
176 return None
177
178 @alternative_name.setter
179 def alternative_name(self, value: TextStringObject | None) -> None:
180 """Set the alternative name (file specification)."""
181 if value is None:
182 if FileSpecificationDictionaryEntries.UF in self.pdf_object:
183 self.pdf_object[NameObject(FileSpecificationDictionaryEntries.UF)] = NullObject()
184 if FileSpecificationDictionaryEntries.F in self.pdf_object:
185 self.pdf_object[NameObject(FileSpecificationDictionaryEntries.F)] = NullObject()
186 else:
187 self.pdf_object[NameObject(FileSpecificationDictionaryEntries.UF)] = value
188 self.pdf_object[NameObject(FileSpecificationDictionaryEntries.F)] = value
189
190 @property
191 def description(self) -> str | None:
192 """Retrieve the description."""
193 value = self.pdf_object.get(FileSpecificationDictionaryEntries.DESC)
194 if is_null_or_none(value):
195 return None
196 return value
197
198 @description.setter
199 def description(self, value: TextStringObject | None) -> None:
200 """Set the description."""
201 if value is None:
202 self.pdf_object[NameObject(FileSpecificationDictionaryEntries.DESC)] = NullObject()
203 else:
204 self.pdf_object[NameObject(FileSpecificationDictionaryEntries.DESC)] = value
205
206 @property
207 def associated_file_relationship(self) -> str:
208 """Retrieve the relationship of the referring document to this embedded file."""
209 return self.pdf_object.get("/AFRelationship", "/Unspecified")
210
211 @associated_file_relationship.setter
212 def associated_file_relationship(self, value: NameObject) -> None:
213 """Set the relationship of the referring document to this embedded file."""
214 self.pdf_object[NameObject("/AFRelationship")] = value
215
216 @property
217 def _embedded_file(self) -> StreamObject:
218 """Retrieve the actual embedded file stream."""
219 if "/EF" not in self.pdf_object:
220 raise PdfReadError(f"/EF entry not found: {self.pdf_object}")
221 ef = cast(DictionaryObject, self.pdf_object["/EF"])
222 for key in [FileSpecificationDictionaryEntries.UF, FileSpecificationDictionaryEntries.F]:
223 if key in ef:
224 return cast(StreamObject, ef[key].get_object())
225 raise PdfReadError(f"No /(U)F key found in file dictionary: {ef}")
226
227 @property
228 def _params(self) -> DictionaryObject:
229 """Retrieve the file-specific parameters."""
230 return self._embedded_file.get("/Params", DictionaryObject()).get_object()
231
232 @cached_property
233 def _ensure_params(self) -> DictionaryObject:
234 """Ensure the /Params dictionary exists and return it."""
235 embedded_file = self._embedded_file
236 if "/Params" not in embedded_file:
237 embedded_file[NameObject("/Params")] = DictionaryObject()
238 return cast(DictionaryObject, embedded_file["/Params"])
239
240 @property
241 def subtype(self) -> str | None:
242 """Retrieve the subtype. This is a MIME media type, prefixed by a slash."""
243 value = self._embedded_file.get("/Subtype")
244 if is_null_or_none(value):
245 return None
246 return value
247
248 @subtype.setter
249 def subtype(self, value: NameObject | None) -> None:
250 """Set the subtype. This should be a MIME media type, prefixed by a slash."""
251 embedded_file = self._embedded_file
252 if value is None:
253 embedded_file[NameObject("/Subtype")] = NullObject()
254 else:
255 embedded_file[NameObject("/Subtype")] = value
256
257 @property
258 def content(self) -> bytes:
259 """Retrieve the actual file content."""
260 return self._embedded_file.get_data()
261
262 @content.setter
263 def content(self, value: str | bytes) -> None:
264 """Set the file content."""
265 if isinstance(value, str):
266 value = value.encode("latin-1")
267 self._embedded_file.set_data(value)
268
269 @property
270 def size(self) -> int | None:
271 """Retrieve the size of the uncompressed file in bytes."""
272 value = self._params.get("/Size")
273 if is_null_or_none(value):
274 return None
275 return value
276
277 @size.setter
278 def size(self, value: NumberObject | None) -> None:
279 """Set the size of the uncompressed file in bytes."""
280 params = self._ensure_params
281 if value is None:
282 params[NameObject("/Size")] = NullObject()
283 else:
284 params[NameObject("/Size")] = value
285
286 @property
287 def creation_date(self) -> datetime.datetime | None:
288 """Retrieve the file creation datetime."""
289 return parse_iso8824_date(self._params.get("/CreationDate"))
290
291 @creation_date.setter
292 def creation_date(self, value: datetime.datetime | None) -> None:
293 """Set the file creation datetime."""
294 params = self._ensure_params
295 if value is None:
296 params[NameObject("/CreationDate")] = NullObject()
297 else:
298 date_str = format_iso8824_date(value)
299 params[NameObject("/CreationDate")] = TextStringObject(date_str)
300
301 @property
302 def modification_date(self) -> datetime.datetime | None:
303 """Retrieve the datetime of the last file modification."""
304 return parse_iso8824_date(self._params.get("/ModDate"))
305
306 @modification_date.setter
307 def modification_date(self, value: datetime.datetime | None) -> None:
308 """Set the datetime of the last file modification."""
309 params = self._ensure_params
310 if value is None:
311 params[NameObject("/ModDate")] = NullObject()
312 else:
313 date_str = format_iso8824_date(value)
314 params[NameObject("/ModDate")] = TextStringObject(date_str)
315
316 @property
317 def checksum(self) -> bytes | None:
318 """Retrieve the MD5 checksum of the (uncompressed) file."""
319 value = self._params.get("/CheckSum")
320 if is_null_or_none(value):
321 return None
322 return value
323
324 @checksum.setter
325 def checksum(self, value: ByteStringObject | None) -> None:
326 """Set the MD5 checksum of the (uncompressed) file."""
327 params = self._ensure_params
328 if value is None:
329 params[NameObject("/CheckSum")] = NullObject()
330 else:
331 params[NameObject("/CheckSum")] = value
332
333 def delete(self) -> None:
334 """Delete the file from the document."""
335 if not self._parent:
336 raise PyPdfError("Parent required to delete file from document.")
337 if self.pdf_object in self._parent:
338 index = self._parent.index(self.pdf_object)
339 elif (
340 (indirect_reference := getattr(self.pdf_object, "indirect_reference", None)) is not None
341 and indirect_reference in self._parent
342 ):
343 index = self._parent.index(indirect_reference)
344 else:
345 raise PyPdfError("File not found in parent object.")
346 self._parent.pop(index) # Reference.
347 self._parent.pop(index - 1) # Name.
348 self.pdf_object = DictionaryObject() # Invalidate.
349
350 def __repr__(self) -> str:
351 return f"<{self.__class__.__name__} name={self.name!r}>"
352
353 @classmethod
354 def _load_from_names(cls, names: ArrayObject) -> Generator[EmbeddedFile]:
355 """
356 Convert the given name tree into class instances.
357
358 Args:
359 names: The name tree to load the data from.
360
361 Returns:
362 Iterable of class instances for the files found.
363 """
364 # This is a name tree of the format [name_1, reference_1, name_2, reference_2, ...]
365 for i, name in enumerate(names):
366 if not isinstance(name, str):
367 # Skip plain strings and retrieve them as `direct_name` by index.
368 file_dictionary = name.get_object()
369 direct_name = names[i - 1].get_object()
370 yield EmbeddedFile(name=direct_name, pdf_object=file_dictionary, parent=names)
371
372 @classmethod
373 def _load(cls, catalog: DictionaryObject) -> Generator[EmbeddedFile]:
374 """
375 Load the embedded files for the given document catalog.
376
377 This method and its signature are considered internal API and thus not exposed publicly for now.
378
379 Args:
380 catalog: The document catalog to load from.
381
382 Returns:
383 Iterable of class instances for the files found.
384 """
385 try:
386 container = cast(
387 DictionaryObject,
388 cast(DictionaryObject, catalog["/Names"])["/EmbeddedFiles"],
389 )
390 except KeyError:
391 return
392
393 if "/Kids" in container:
394 for kid in cast(ArrayObject, container["/Kids"].get_object()):
395 # There might be further (nested) kids here.
396 # Wait for an example before evaluating an implementation.
397 kid = kid.get_object()
398 if "/Names" in kid:
399 yield from cls._load_from_names(cast(ArrayObject, kid["/Names"]))
400 if "/Names" in container:
401 yield from cls._load_from_names(cast(ArrayObject, container["/Names"]))