Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_reader.py: 35%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com>
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15# * The name of the author may not be used to endorse or promote products
16# derived from this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
30import os
31import re
32import sys
33from collections.abc import Iterable
34from io import BytesIO, UnsupportedOperation
35from pathlib import Path
36from types import TracebackType
37from typing import (
38 TYPE_CHECKING,
39 Any,
40 Callable,
41 Optional,
42 Union,
43 cast,
44)
46if sys.version_info >= (3, 11):
47 from typing import Self
48else:
49 from typing_extensions import Self
51from ._doc_common import PdfDocCommon, convert_to_int
52from ._encryption import Encryption, PasswordType
53from ._utils import (
54 WHITESPACES_AS_BYTES,
55 StrByteType,
56 StreamType,
57 logger_warning,
58 read_non_whitespace,
59 read_previous_line,
60 read_until_whitespace,
61 skip_over_comment,
62 skip_over_whitespace,
63)
64from .constants import TrailerKeys as TK
65from .errors import (
66 EmptyFileError,
67 FileNotDecryptedError,
68 LimitReachedError,
69 PdfReadError,
70 PdfStreamError,
71 WrongPasswordError,
72)
73from .generic import (
74 ArrayObject,
75 ContentStream,
76 DecodedStreamObject,
77 DictionaryObject,
78 EncodedStreamObject,
79 IndirectObject,
80 NameObject,
81 NullObject,
82 NumberObject,
83 PdfObject,
84 StreamObject,
85 TextStringObject,
86 is_null_or_none,
87 read_object,
88)
89from .xmp import XmpInformation
91if TYPE_CHECKING:
92 from ._page import PageObject
95class PdfReader(PdfDocCommon):
96 """
97 Initialize a PdfReader object.
99 This operation can take some time, as the PDF stream's cross-reference
100 tables are read into memory.
102 Args:
103 stream: A File object or an object that supports the standard read
104 and seek methods similar to a File object. Could also be a
105 string representing a path to a PDF file.
106 strict: Determines whether user should be warned of all
107 problems and also causes some correctable problems to be fatal.
108 Defaults to ``False``.
109 password: Decrypt PDF file at initialization. If the
110 password is None, the file will not be decrypted.
111 Defaults to ``None``.
112 root_object_recovery_limit: The maximum number of objects to query
113 for recovering the Root object in non-strict mode. To disable
114 this security measure, pass ``None``.
116 """
118 def __init__(
119 self,
120 stream: Union[StrByteType, Path],
121 strict: bool = False,
122 password: Union[None, str, bytes] = None,
123 *,
124 root_object_recovery_limit: Optional[int] = 10_000,
125 ) -> None:
126 self.strict = strict
127 self.flattened_pages: Optional[list[PageObject]] = None
129 #: Storage of parsed PDF objects.
130 self.resolved_objects: dict[tuple[Any, Any], Optional[PdfObject]] = {}
132 self._startxref: int = 0
133 self.xref_index = 0
134 self.xref: dict[int, dict[Any, Any]] = {}
135 self.xref_free_entry: dict[int, dict[Any, Any]] = {}
136 self.xref_objStm: dict[int, tuple[Any, Any]] = {}
137 self.trailer = DictionaryObject()
139 # Security parameters.
140 self._root_object_recovery_limit = (
141 root_object_recovery_limit if isinstance(root_object_recovery_limit, int) else sys.maxsize
142 )
144 # Map page indirect_reference number to page number
145 self._page_id2num: Optional[dict[Any, Any]] = None
147 self._validated_root: Optional[DictionaryObject] = None
149 self._initialize_stream(stream)
150 self._known_objects: set[tuple[int, int]] = set()
152 self._override_encryption = False
153 self._encryption: Optional[Encryption] = None
154 if self.is_encrypted:
155 self._handle_encryption(password)
156 elif password is not None:
157 raise PdfReadError("Not an encrypted file")
159 def _initialize_stream(self, stream: Union[StrByteType, Path]) -> None:
160 if hasattr(stream, "mode") and "b" not in stream.mode:
161 logger_warning(
162 "PdfReader stream/file object is not in binary mode. "
163 "It may not be read correctly.",
164 __name__,
165 )
166 self._stream_opened = False
167 if isinstance(stream, (str, Path)):
168 with open(stream, "rb") as fh:
169 stream = BytesIO(fh.read())
170 self._stream_opened = True
171 self.read(stream)
172 self.stream = stream
174 def _handle_encryption(self, password: Optional[Union[str, bytes]]) -> None:
175 self._override_encryption = True
176 # Some documents may not have a /ID, use two empty
177 # byte strings instead. Solves
178 # https://github.com/py-pdf/pypdf/issues/608
179 id_entry = self.trailer.get(TK.ID)
180 id1_entry = id_entry[0].get_object().original_bytes if id_entry else b""
181 encrypt_entry = cast(DictionaryObject, self.trailer[TK.ENCRYPT].get_object())
182 self._encryption = Encryption.read(encrypt_entry, id1_entry)
184 # try empty password if no password provided
185 pwd = password if password is not None else b""
186 if (
187 self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED
188 and password is not None
189 ):
190 # raise if password provided
191 raise WrongPasswordError("Wrong password")
192 self._override_encryption = False
194 def __enter__(self) -> Self:
195 return self
197 def __exit__(
198 self,
199 exc_type: Optional[type[BaseException]],
200 exc_val: Optional[BaseException],
201 exc_tb: Optional[TracebackType],
202 ) -> None:
203 self.close()
205 def close(self) -> None:
206 """Close the stream if opened in __init__ and clear memory."""
207 if self._stream_opened:
208 self.stream.close()
209 self.flattened_pages = []
210 self.resolved_objects = {}
211 self.trailer = DictionaryObject()
212 self.xref = {}
213 self.xref_free_entry = {}
214 self.xref_objStm = {}
216 @property
217 def root_object(self) -> DictionaryObject:
218 """Provide access to "/Root". Standardized with PdfWriter."""
219 if self._validated_root:
220 return self._validated_root
221 root = self.trailer.get(TK.ROOT)
222 if is_null_or_none(root):
223 logger_warning('Cannot find "/Root" key in trailer', __name__)
224 elif (
225 cast(DictionaryObject, cast(PdfObject, root).get_object()).get("/Type")
226 == "/Catalog"
227 ):
228 self._validated_root = cast(
229 DictionaryObject, cast(PdfObject, root).get_object()
230 )
231 else:
232 logger_warning("Invalid Root object in trailer", __name__)
233 if self._validated_root is None:
234 logger_warning('Searching object with "/Catalog" key', __name__)
235 number_of_objects = cast(int, self.trailer.get("/Size", 0))
236 for i in range(number_of_objects):
237 if i >= self._root_object_recovery_limit:
238 raise LimitReachedError("Maximum Root object recovery limit reached.")
239 try:
240 obj = self.get_object(i + 1)
241 except Exception: # to be sure to capture all errors
242 obj = None
243 if isinstance(obj, DictionaryObject) and obj.get("/Type") == "/Catalog":
244 self._validated_root = obj
245 logger_warning(f"Root found at {obj.indirect_reference!r}", __name__)
246 break
247 if self._validated_root is None:
248 if not is_null_or_none(root) and "/Pages" in cast(DictionaryObject, cast(PdfObject, root).get_object()):
249 logger_warning(
250 f"Possible root found at {cast(PdfObject, root).indirect_reference!r}, but missing /Catalog key",
251 __name__
252 )
253 self._validated_root = cast(
254 DictionaryObject, cast(PdfObject, root).get_object()
255 )
256 else:
257 raise PdfReadError("Cannot find Root object in pdf")
258 return self._validated_root
260 @property
261 def _info(self) -> Optional[DictionaryObject]:
262 """
263 Provide access to "/Info". Standardized with PdfWriter.
265 Returns:
266 /Info Dictionary; None if the entry does not exist
268 """
269 info = self.trailer.get(TK.INFO, None)
270 if is_null_or_none(info):
271 return None
272 assert info is not None, "mypy"
273 info = info.get_object()
274 if not isinstance(info, DictionaryObject):
275 raise PdfReadError(
276 "Trailer not found or does not point to a document information dictionary"
277 )
278 return info
280 @property
281 def _ID(self) -> Optional[ArrayObject]:
282 """
283 Provide access to "/ID". Standardized with PdfWriter.
285 Returns:
286 /ID array; None if the entry does not exist
288 """
289 id = self.trailer.get(TK.ID, None)
290 if is_null_or_none(id):
291 return None
292 assert id is not None, "mypy"
293 return cast(ArrayObject, id.get_object())
295 @property
296 def pdf_header(self) -> str:
297 """
298 The first 8 bytes of the file.
300 This is typically something like ``'%PDF-1.6'`` and can be used to
301 detect if the file is actually a PDF file and which version it is.
302 """
303 # TODO: Make this return a bytes object for consistency
304 # but that needs a deprecation
305 loc = self.stream.tell()
306 self.stream.seek(0, 0)
307 pdf_file_version = self.stream.read(8).decode("utf-8", "backslashreplace")
308 self.stream.seek(loc, 0) # return to where it was
309 return pdf_file_version
311 @property
312 def xmp_metadata(self) -> Optional[XmpInformation]:
313 """XMP (Extensible Metadata Platform) data."""
314 try:
315 self._override_encryption = True
316 return cast(XmpInformation, self.root_object.xmp_metadata)
317 finally:
318 self._override_encryption = False
320 def _get_page_number_by_indirect(
321 self, indirect_reference: Union[None, int, NullObject, IndirectObject]
322 ) -> Optional[int]:
323 """
324 Retrieve the page number from an indirect reference.
326 Args:
327 indirect_reference: The indirect reference to locate.
329 Returns:
330 Page number or None.
332 """
333 if self._page_id2num is None:
334 self._page_id2num = {
335 x.indirect_reference.idnum: i for i, x in enumerate(self.pages) # type: ignore
336 }
338 if is_null_or_none(indirect_reference):
339 return None
340 assert isinstance(indirect_reference, (int, IndirectObject)), "mypy"
341 if isinstance(indirect_reference, int):
342 idnum = indirect_reference
343 else:
344 idnum = indirect_reference.idnum
345 assert self._page_id2num is not None, "hint for mypy"
346 return self._page_id2num.get(idnum, None)
348 def _get_object_from_stream(
349 self, indirect_reference: IndirectObject
350 ) -> Union[int, PdfObject, str]:
351 # indirect reference to object in object stream
352 # read the entire object stream into memory
353 stmnum, idx = self.xref_objStm[indirect_reference.idnum]
354 obj_stm: EncodedStreamObject = IndirectObject(stmnum, 0, self).get_object() # type: ignore
355 # This is an xref to a stream, so its type better be a stream
356 assert cast(str, obj_stm["/Type"]) == "/ObjStm"
357 stream_data = BytesIO(obj_stm.get_data())
358 for i in range(obj_stm["/N"]): # type: ignore
359 read_non_whitespace(stream_data)
360 stream_data.seek(-1, 1)
361 objnum = NumberObject.read_from_stream(stream_data)
362 read_non_whitespace(stream_data)
363 stream_data.seek(-1, 1)
364 offset = NumberObject.read_from_stream(stream_data)
365 read_non_whitespace(stream_data)
366 stream_data.seek(-1, 1)
367 if objnum != indirect_reference.idnum:
368 # We're only interested in one object
369 continue
370 if self.strict and idx != i:
371 raise PdfReadError("Object is in wrong index.")
372 stream_data.seek(int(obj_stm["/First"] + offset), 0) # type: ignore
374 # To cope with case where the 'pointer' is on a white space
375 read_non_whitespace(stream_data)
376 stream_data.seek(-1, 1)
378 try:
379 obj = read_object(stream_data, self)
380 except PdfStreamError as exc:
381 # Stream object cannot be read. Normally, a critical error, but
382 # Adobe Reader doesn't complain, so continue (in strict mode?)
383 logger_warning(
384 f"Invalid stream (index {i}) within object "
385 f"{indirect_reference.idnum} {indirect_reference.generation}: "
386 f"{exc}",
387 __name__,
388 )
390 if self.strict: # pragma: no cover
391 raise PdfReadError(
392 f"Cannot read object stream: {exc}"
393 ) # pragma: no cover
394 # Replace with null. Hopefully it's nothing important.
395 obj = NullObject() # pragma: no cover
396 return obj
398 if self.strict: # pragma: no cover
399 raise PdfReadError(
400 "This is a fatal error in strict mode."
401 ) # pragma: no cover
402 return NullObject() # pragma: no cover
404 def get_object(
405 self, indirect_reference: Union[int, IndirectObject]
406 ) -> Optional[PdfObject]:
407 if isinstance(indirect_reference, int):
408 indirect_reference = IndirectObject(indirect_reference, 0, self)
409 retval = self.cache_get_indirect_object(
410 indirect_reference.generation, indirect_reference.idnum
411 )
412 if retval is not None:
413 return retval
414 if (
415 indirect_reference.generation == 0
416 and indirect_reference.idnum in self.xref_objStm
417 ):
418 retval = self._get_object_from_stream(indirect_reference) # type: ignore
419 elif (
420 indirect_reference.generation in self.xref
421 and indirect_reference.idnum in self.xref[indirect_reference.generation]
422 ):
423 if self.xref_free_entry.get(indirect_reference.generation, {}).get(
424 indirect_reference.idnum, False
425 ):
426 return NullObject()
427 start = self.xref[indirect_reference.generation][indirect_reference.idnum]
428 self.stream.seek(start, 0)
429 try:
430 idnum, generation = self.read_object_header(self.stream)
431 if (
432 idnum != indirect_reference.idnum
433 or generation != indirect_reference.generation
434 ):
435 raise PdfReadError("Not matching, we parse the file for it")
436 except Exception:
437 if hasattr(self.stream, "getbuffer"):
438 buf = bytes(self.stream.getbuffer())
439 else:
440 p = self.stream.tell()
441 self.stream.seek(0, 0)
442 buf = self.stream.read(-1)
443 self.stream.seek(p, 0)
444 m = re.search(
445 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(),
446 buf,
447 )
448 if m is not None:
449 logger_warning(
450 f"Object ID {indirect_reference.idnum},{indirect_reference.generation} ref repaired",
451 __name__,
452 )
453 self.xref[indirect_reference.generation][
454 indirect_reference.idnum
455 ] = (m.start(0) + 1)
456 self.stream.seek(m.start(0) + 1)
457 idnum, generation = self.read_object_header(self.stream)
458 else:
459 idnum = -1
460 generation = -1 # exception will be raised below
461 if idnum != indirect_reference.idnum and self.xref_index:
462 # xref table probably had bad indexes due to not being zero-indexed
463 if self.strict:
464 raise PdfReadError(
465 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) "
466 f"does not match actual ({idnum} {generation}); "
467 "xref table not zero-indexed."
468 )
469 # xref table is corrected in non-strict mode
470 elif idnum != indirect_reference.idnum and self.strict:
471 # some other problem
472 raise PdfReadError(
473 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) "
474 f"does not match actual ({idnum} {generation})."
475 )
476 if self.strict:
477 assert generation == indirect_reference.generation
479 current_object = (indirect_reference.idnum, indirect_reference.generation)
480 if current_object in self._known_objects:
481 raise PdfReadError(f"Detected loop with self reference for {indirect_reference!r}.")
482 self._known_objects.add(current_object)
483 retval = read_object(self.stream, self) # type: ignore
484 self._known_objects.remove(current_object)
486 # override encryption is used for the /Encrypt dictionary
487 if not self._override_encryption and self._encryption is not None:
488 # if we don't have the encryption key:
489 if not self._encryption.is_decrypted():
490 raise FileNotDecryptedError("File has not been decrypted")
491 # otherwise, decrypt here...
492 retval = cast(PdfObject, retval)
493 retval = self._encryption.decrypt_object(
494 retval, indirect_reference.idnum, indirect_reference.generation
495 )
496 else:
497 if hasattr(self.stream, "getbuffer"):
498 buf = bytes(self.stream.getbuffer())
499 else:
500 p = self.stream.tell()
501 self.stream.seek(0, 0)
502 buf = self.stream.read(-1)
503 self.stream.seek(p, 0)
504 m = re.search(
505 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(),
506 buf,
507 )
508 if m is not None:
509 logger_warning(
510 f"Object {indirect_reference.idnum} {indirect_reference.generation} found",
511 __name__,
512 )
513 if indirect_reference.generation not in self.xref:
514 self.xref[indirect_reference.generation] = {}
515 self.xref[indirect_reference.generation][indirect_reference.idnum] = (
516 m.start(0) + 1
517 )
518 self.stream.seek(m.end(0) + 1)
519 skip_over_whitespace(self.stream)
520 self.stream.seek(-1, 1)
521 retval = read_object(self.stream, self) # type: ignore
523 # override encryption is used for the /Encrypt dictionary
524 if not self._override_encryption and self._encryption is not None:
525 # if we don't have the encryption key:
526 if not self._encryption.is_decrypted():
527 raise FileNotDecryptedError("File has not been decrypted")
528 # otherwise, decrypt here...
529 retval = cast(PdfObject, retval)
530 retval = self._encryption.decrypt_object(
531 retval, indirect_reference.idnum, indirect_reference.generation
532 )
533 else:
534 logger_warning(
535 f"Object {indirect_reference.idnum} {indirect_reference.generation} not defined.",
536 __name__,
537 )
538 if self.strict:
539 raise PdfReadError("Could not find object.")
540 self.cache_indirect_object(
541 indirect_reference.generation, indirect_reference.idnum, retval
542 )
543 return retval
545 def read_object_header(self, stream: StreamType) -> tuple[int, int]:
546 # Should never be necessary to read out whitespace, since the
547 # cross-reference table should put us in the right spot to read the
548 # object header. In reality some files have stupid cross-reference
549 # tables that are off by whitespace bytes.
550 skip_over_comment(stream)
551 extra = skip_over_whitespace(stream)
552 stream.seek(-1, 1)
553 idnum = read_until_whitespace(stream)
554 extra |= skip_over_whitespace(stream)
555 stream.seek(-1, 1)
556 generation = read_until_whitespace(stream)
557 extra |= skip_over_whitespace(stream)
558 stream.seek(-1, 1)
560 # although it's not used, it might still be necessary to read
561 _obj = stream.read(3)
563 read_non_whitespace(stream)
564 stream.seek(-1, 1)
565 if extra and self.strict:
566 logger_warning(
567 f"Superfluous whitespace found in object header {idnum} {generation}", # type: ignore
568 __name__,
569 )
570 return int(idnum), int(generation)
572 def cache_get_indirect_object(
573 self, generation: int, idnum: int
574 ) -> Optional[PdfObject]:
575 try:
576 return self.resolved_objects.get((generation, idnum))
577 except RecursionError:
578 raise PdfReadError("Maximum recursion depth reached.")
580 def cache_indirect_object(
581 self, generation: int, idnum: int, obj: Optional[PdfObject]
582 ) -> Optional[PdfObject]:
583 if (generation, idnum) in self.resolved_objects:
584 msg = f"Overwriting cache for {generation} {idnum}"
585 if self.strict:
586 raise PdfReadError(msg)
587 logger_warning(msg, __name__)
588 self.resolved_objects[(generation, idnum)] = obj
589 if obj is not None:
590 obj.indirect_reference = IndirectObject(idnum, generation, self)
591 return obj
593 def _replace_object(self, indirect: IndirectObject, obj: PdfObject) -> PdfObject:
594 # function reserved for future development
595 if indirect.pdf != self:
596 raise ValueError("Cannot update PdfReader with external object")
597 if (indirect.generation, indirect.idnum) not in self.resolved_objects:
598 raise ValueError("Cannot find referenced object")
599 self.resolved_objects[(indirect.generation, indirect.idnum)] = obj
600 obj.indirect_reference = indirect
601 return obj
603 def read(self, stream: StreamType) -> None:
604 """
605 Read and process the PDF stream, extracting necessary data.
607 Args:
608 stream: The PDF file stream.
610 """
611 self._basic_validation(stream)
612 self._find_eof_marker(stream)
613 startxref = self._find_startxref_pos(stream)
614 self._startxref = startxref
616 # check and eventually correct the startxref only if not strict
617 xref_issue_nr = self._get_xref_issues(stream, startxref)
618 if xref_issue_nr != 0:
619 if self.strict and xref_issue_nr:
620 raise PdfReadError("Broken xref table")
621 logger_warning(f"incorrect startxref pointer({xref_issue_nr})", __name__)
623 # read all cross-reference tables and their trailers
624 self._read_xref_tables_and_trailers(stream, startxref, xref_issue_nr)
626 # if not zero-indexed, verify that the table is correct; change it if necessary
627 if self.xref_index and not self.strict:
628 loc = stream.tell()
629 for gen, xref_entry in self.xref.items():
630 if gen == 65535:
631 continue
632 xref_k = sorted(
633 xref_entry.keys()
634 ) # ensure ascending to prevent damage
635 for id in xref_k:
636 stream.seek(xref_entry[id], 0)
637 try:
638 pid, _pgen = self.read_object_header(stream)
639 except ValueError:
640 self._rebuild_xref_table(stream)
641 break
642 if pid == id - self.xref_index:
643 # fixing index item per item is required for revised PDF.
644 self.xref[gen][pid] = self.xref[gen][id]
645 del self.xref[gen][id]
646 # if not, then either it's just plain wrong, or the
647 # non-zero-index is actually correct
648 stream.seek(loc, 0) # return to where it was
650 # remove wrong objects (not pointing to correct structures) - cf #2326
651 if not self.strict:
652 loc = stream.tell()
653 for gen, xref_entry in self.xref.items():
654 if gen == 65535:
655 continue
656 ids = list(xref_entry.keys())
657 for id in ids:
658 stream.seek(xref_entry[id], 0)
659 try:
660 self.read_object_header(stream)
661 except ValueError:
662 logger_warning(
663 f"Ignoring wrong pointing object {id} {gen} (offset {xref_entry[id]})",
664 __name__,
665 )
666 del xref_entry[id] # we can delete the id, we are parsing ids
667 stream.seek(loc, 0) # return to where it was
669 def _basic_validation(self, stream: StreamType) -> None:
670 """Ensure the stream is valid and not empty."""
671 stream.seek(0, os.SEEK_SET)
672 try:
673 header_byte = stream.read(5)
674 except UnicodeDecodeError:
675 raise UnsupportedOperation("cannot read header")
676 if header_byte == b"":
677 raise EmptyFileError("Cannot read an empty file")
678 if header_byte != b"%PDF-":
679 if self.strict:
680 raise PdfReadError(
681 f"PDF starts with '{header_byte.decode('utf8')}', "
682 "but '%PDF-' expected"
683 )
684 logger_warning(f"invalid pdf header: {header_byte}", __name__)
685 stream.seek(0, os.SEEK_END)
687 def _find_eof_marker(self, stream: StreamType) -> None:
688 """
689 Jump to the %%EOF marker.
691 According to the specs, the %%EOF marker should be at the very end of
692 the file. Hence for standard-compliant PDF documents this function will
693 read only the last part (DEFAULT_BUFFER_SIZE).
694 """
695 HEADER_SIZE = 8 # to parse whole file, Header is e.g. '%PDF-1.6'
696 line = b""
697 first = True
698 while not line.startswith(b"%%EOF"):
699 if line != b"" and first:
700 if any(
701 line.strip().endswith(tr) for tr in (b"%%EO", b"%%E", b"%%", b"%")
702 ):
703 # Consider the file as truncated while
704 # having enough confidence to carry on.
705 logger_warning("EOF marker seems truncated", __name__)
706 break
707 first = False
708 if b"startxref" in line:
709 logger_warning(
710 "CAUTION: startxref found while searching for %%EOF. "
711 "The file might be truncated and some data might not be read.",
712 __name__,
713 )
714 if stream.tell() < HEADER_SIZE:
715 if self.strict:
716 raise PdfReadError("EOF marker not found")
717 logger_warning("EOF marker not found", __name__)
718 line = read_previous_line(stream)
720 def _find_startxref_pos(self, stream: StreamType) -> int:
721 """
722 Find startxref entry - the location of the xref table.
724 Args:
725 stream:
727 Returns:
728 The bytes offset
730 """
731 line = read_previous_line(stream)
732 try:
733 startxref = int(line)
734 except ValueError:
735 # 'startxref' may be on the same line as the location
736 if not line.startswith(b"startxref"):
737 raise PdfReadError("startxref not found")
738 startxref = int(line[9:].strip())
739 logger_warning("startxref on same line as offset", __name__)
740 else:
741 line = read_previous_line(stream)
742 if not line.startswith(b"startxref"):
743 raise PdfReadError("startxref not found")
744 return startxref
746 def _read_standard_xref_table(self, stream: StreamType) -> None:
747 # standard cross-reference table
748 ref = stream.read(3)
749 if ref != b"ref":
750 raise PdfReadError("xref table read error")
751 read_non_whitespace(stream)
752 stream.seek(-1, 1)
753 first_time = True # check if the first time looking at the xref table
754 while True:
755 num = cast(int, read_object(stream, self))
756 if first_time and num != 0:
757 self.xref_index = num
758 if self.strict:
759 logger_warning(
760 "Xref table not zero-indexed. ID numbers for objects will be corrected.",
761 __name__,
762 )
763 # if table not zero indexed, could be due to error from when PDF was created
764 # which will lead to mismatched indices later on, only warned and corrected if self.strict==True
765 first_time = False
766 read_non_whitespace(stream)
767 stream.seek(-1, 1)
768 size = cast(int, read_object(stream, self))
769 if not isinstance(size, int):
770 logger_warning(
771 "Invalid/Truncated xref table. Rebuilding it.",
772 __name__,
773 )
774 self._rebuild_xref_table(stream)
775 stream.read()
776 return
777 read_non_whitespace(stream)
778 stream.seek(-1, 1)
779 cnt = 0
780 while cnt < size:
781 line = stream.read(20)
782 if not line:
783 raise PdfReadError("Unexpected empty line in Xref table.")
785 # It's very clear in section 3.4.3 of the PDF spec
786 # that all cross-reference table lines are a fixed
787 # 20 bytes (as of PDF 1.7). However, some files have
788 # 21-byte entries (or more) due to the use of \r\n
789 # (CRLF) EOL's. Detect that case, and adjust the line
790 # until it does not begin with a \r (CR) or \n (LF).
791 while line[0] in b"\x0D\x0A":
792 stream.seek(-20 + 1, 1)
793 line = stream.read(20)
795 # On the other hand, some malformed PDF files
796 # use a single character EOL without a preceding
797 # space. Detect that case, and seek the stream
798 # back one character (0-9 means we've bled into
799 # the next xref entry, t means we've bled into the
800 # text "trailer"):
801 if line[-1] in b"0123456789t":
802 stream.seek(-1, 1)
804 try:
805 offset_b, generation_b = line[:16].split(b" ")
806 entry_type_b = line[17:18]
808 offset, generation = int(offset_b), int(generation_b)
809 except Exception:
810 if hasattr(stream, "getbuffer"):
811 buf = bytes(stream.getbuffer())
812 else:
813 p = stream.tell()
814 stream.seek(0, 0)
815 buf = stream.read(-1)
816 stream.seek(p)
818 f = re.search(rf"{num}\s+(\d+)\s+obj".encode(), buf)
819 if f is None:
820 logger_warning(
821 f"entry {num} in Xref table invalid; object not found",
822 __name__,
823 )
824 generation = 65535
825 offset = -1
826 entry_type_b = b"f"
827 else:
828 logger_warning(
829 f"entry {num} in Xref table invalid but object found",
830 __name__,
831 )
832 generation = int(f.group(1))
833 offset = f.start()
835 if generation not in self.xref:
836 self.xref[generation] = {}
837 self.xref_free_entry[generation] = {}
838 if num in self.xref[generation]:
839 # It really seems like we should allow the last
840 # xref table in the file to override previous
841 # ones. Since we read the file backwards, assume
842 # any existing key is already set correctly.
843 pass
844 else:
845 if entry_type_b == b"n":
846 self.xref[generation][num] = offset
847 try:
848 self.xref_free_entry[generation][num] = entry_type_b == b"f"
849 except Exception:
850 pass
851 try:
852 self.xref_free_entry[65535][num] = entry_type_b == b"f"
853 except Exception:
854 pass
855 cnt += 1
856 num += 1
857 read_non_whitespace(stream)
858 stream.seek(-1, 1)
859 trailer_tag = stream.read(7)
860 if trailer_tag != b"trailer":
861 # more xrefs!
862 stream.seek(-7, 1)
863 else:
864 break
866 def _read_xref_tables_and_trailers(
867 self, stream: StreamType, startxref: Optional[int], xref_issue_nr: int
868 ) -> None:
869 """Read the cross-reference tables and trailers in the PDF stream."""
870 self.xref = {}
871 self.xref_free_entry = {}
872 self.xref_objStm = {}
873 self.trailer = DictionaryObject()
874 while startxref is not None:
875 # load the xref table
876 stream.seek(startxref, 0)
877 x = stream.read(1)
878 if x in b"\r\n":
879 x = stream.read(1)
880 if x == b"x":
881 startxref = self._read_xref(stream)
882 elif xref_issue_nr:
883 try:
884 self._rebuild_xref_table(stream)
885 break
886 except Exception:
887 xref_issue_nr = 0
888 elif x.isdigit():
889 try:
890 xrefstream = self._read_pdf15_xref_stream(stream)
891 except Exception as e:
892 if TK.ROOT in self.trailer:
893 logger_warning(
894 f"Previous trailer cannot be read: {e.args}", __name__
895 )
896 break
897 raise PdfReadError(f"Trailer cannot be read: {e!s}")
898 self._process_xref_stream(xrefstream)
899 if "/Prev" in xrefstream:
900 startxref = cast(int, xrefstream["/Prev"])
901 else:
902 break
903 else:
904 startxref = self._read_xref_other_error(stream, startxref)
906 def _process_xref_stream(self, xrefstream: DictionaryObject) -> None:
907 """Process and handle the xref stream."""
908 trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE
909 for key in trailer_keys:
910 if key in xrefstream and key not in self.trailer:
911 self.trailer[NameObject(key)] = xrefstream.raw_get(key)
912 if "/XRefStm" in xrefstream:
913 p = self.stream.tell()
914 self.stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0)
915 self._read_pdf15_xref_stream(self.stream)
916 self.stream.seek(p, 0)
918 def _read_xref(self, stream: StreamType) -> Optional[int]:
919 self._read_standard_xref_table(stream)
920 if stream.read(1) == b"":
921 return None
922 stream.seek(-1, 1)
923 read_non_whitespace(stream)
924 stream.seek(-1, 1)
925 new_trailer = cast(dict[str, Any], read_object(stream, self))
926 for key, value in new_trailer.items():
927 if key not in self.trailer:
928 self.trailer[key] = value
929 if "/XRefStm" in new_trailer:
930 p = stream.tell()
931 stream.seek(cast(int, new_trailer["/XRefStm"]) + 1, 0)
932 try:
933 self._read_pdf15_xref_stream(stream)
934 except Exception:
935 logger_warning(
936 f"XRef object at {new_trailer['/XRefStm']} can not be read, some object may be missing",
937 __name__,
938 )
939 stream.seek(p, 0)
940 if "/Prev" in new_trailer:
941 return new_trailer["/Prev"]
942 return None
944 def _read_xref_other_error(
945 self, stream: StreamType, startxref: int
946 ) -> Optional[int]:
947 # some PDFs have /Prev=0 in the trailer, instead of no /Prev
948 if startxref == 0:
949 if self.strict:
950 raise PdfReadError(
951 "/Prev=0 in the trailer (try opening with strict=False)"
952 )
953 logger_warning(
954 "/Prev=0 in the trailer - assuming there is no previous xref table",
955 __name__,
956 )
957 return None
958 # bad xref character at startxref. Let's see if we can find
959 # the xref table nearby, as we've observed this error with an
960 # off-by-one before.
961 stream.seek(-11, 1)
962 tmp = stream.read(20)
963 xref_loc = tmp.find(b"xref")
964 if xref_loc != -1:
965 startxref -= 10 - xref_loc
966 return startxref
967 # No explicit xref table, try finding a cross-reference stream.
968 stream.seek(startxref, 0)
969 for look in range(25): # value extended to cope with more linearized files
970 if stream.read(1).isdigit():
971 # This is not a standard PDF, consider adding a warning
972 startxref += look
973 return startxref
974 # no xref table found at specified location
975 if "/Root" in self.trailer and not self.strict:
976 # if Root has been already found, just raise warning
977 logger_warning("Invalid parent xref., rebuild xref", __name__)
978 try:
979 self._rebuild_xref_table(stream)
980 return None
981 except Exception:
982 raise PdfReadError("Cannot rebuild xref")
983 raise PdfReadError("Could not find xref table at specified location")
985 def _read_pdf15_xref_stream(
986 self, stream: StreamType
987 ) -> Union[ContentStream, EncodedStreamObject, DecodedStreamObject]:
988 """Read the cross-reference stream for PDF 1.5+."""
989 stream.seek(-1, 1)
990 idnum, generation = self.read_object_header(stream)
991 xrefstream = cast(ContentStream, read_object(stream, self))
992 if cast(str, xrefstream["/Type"]) != "/XRef":
993 raise PdfReadError(f"Unexpected type {xrefstream['/Type']!r}")
994 self.cache_indirect_object(generation, idnum, xrefstream)
996 # Index pairs specify the subsections in the dictionary.
997 # If none, create one subsection that spans everything.
998 if "/Size" not in xrefstream:
999 # According to table 17 of the PDF 2.0 specification, this key is required.
1000 raise PdfReadError(f"Size missing from XRef stream {xrefstream!r}!")
1001 idx_pairs = xrefstream.get("/Index", [0, xrefstream["/Size"]])
1003 entry_sizes = cast(dict[Any, Any], xrefstream.get("/W"))
1004 assert len(entry_sizes) >= 3
1005 if self.strict and len(entry_sizes) > 3:
1006 raise PdfReadError(f"Too many entry sizes: {entry_sizes}")
1008 stream_data = BytesIO(xrefstream.get_data())
1010 def get_entry(i: int) -> Union[int, tuple[int, ...]]:
1011 # Reads the correct number of bytes for each entry. See the
1012 # discussion of the W parameter in PDF spec table 17.
1013 if entry_sizes[i] > 0:
1014 d = stream_data.read(entry_sizes[i])
1015 return convert_to_int(d, entry_sizes[i])
1017 # PDF Spec Table 17: A value of zero for an element in the
1018 # W array indicates...the default value shall be used
1019 if i == 0:
1020 return 1 # First value defaults to 1
1021 return 0
1023 def used_before(num: int, generation: Union[int, tuple[int, ...]]) -> bool:
1024 # We move backwards through the xrefs, don't replace any.
1025 return num in self.xref.get(generation, []) or num in self.xref_objStm # type: ignore
1027 # Iterate through each subsection
1028 self._read_xref_subsections(idx_pairs, get_entry, used_before)
1029 return xrefstream
1031 @staticmethod
1032 def _get_xref_issues(stream: StreamType, startxref: int) -> int:
1033 """
1034 Return an int which indicates an issue. 0 means there is no issue.
1036 Args:
1037 stream:
1038 startxref:
1040 Returns:
1041 0 means no issue, other values represent specific issues.
1043 """
1044 if startxref == 0:
1045 return 4
1047 stream.seek(startxref - 1, 0) # -1 to check character before
1048 line = stream.read(1)
1049 if line == b"j":
1050 line = stream.read(1)
1051 if line not in b"\r\n \t":
1052 return 1
1053 line = stream.read(4)
1054 if line != b"xref":
1055 # not a xref so check if it is an XREF object
1056 line = b""
1057 while line in b"0123456789 \t":
1058 line = stream.read(1)
1059 if line == b"":
1060 return 2
1061 line += stream.read(2) # 1 char already read, +2 to check "obj"
1062 if line.lower() != b"obj":
1063 return 3
1064 return 0
1066 @classmethod
1067 def _find_pdf_objects(cls, data: bytes) -> Iterable[tuple[int, int, int]]:
1068 index = 0
1069 ord_0 = ord("0")
1070 ord_9 = ord("9")
1071 while True:
1072 index = data.find(b" obj", index)
1073 if index == -1:
1074 return
1076 index_before_space = index - 1
1078 # Skip whitespace backwards
1079 while index_before_space >= 0 and data[index_before_space] in WHITESPACES_AS_BYTES:
1080 index_before_space -= 1
1082 # Read generation number
1083 generation_end = index_before_space + 1
1084 while index_before_space >= 0 and ord_0 <= data[index_before_space] <= ord_9:
1085 index_before_space -= 1
1086 generation_start = index_before_space + 1
1088 # Skip whitespace
1089 while index_before_space >= 0 and data[index_before_space] in WHITESPACES_AS_BYTES:
1090 index_before_space -= 1
1092 # Read object number
1093 object_end = index_before_space + 1
1094 while index_before_space >= 0 and ord_0 <= data[index_before_space] <= ord_9:
1095 index_before_space -= 1
1096 object_start = index_before_space + 1
1098 # Validate
1099 if object_start < object_end and generation_start < generation_end:
1100 object_number = int(data[object_start:object_end])
1101 generation_number = int(data[generation_start:generation_end])
1103 yield object_number, generation_number, object_start
1105 index += 4 # len(b" obj")
1107 @classmethod
1108 def _find_pdf_trailers(cls, data: bytes) -> Iterable[int]:
1109 index = 0
1110 data_length = len(data)
1111 while True:
1112 index = data.find(b"trailer", index)
1113 if index == -1:
1114 return
1116 index_after_trailer = index + 7 # len(b"trailer")
1118 # Skip whitespace
1119 while index_after_trailer < data_length and data[index_after_trailer] in WHITESPACES_AS_BYTES:
1120 index_after_trailer += 1
1122 # Must be dictionary start
1123 if index_after_trailer + 1 < data_length and data[index_after_trailer:index_after_trailer+2] == b"<<":
1124 yield index_after_trailer # offset of '<<'
1126 index += 7 # len(b"trailer")
1128 def _rebuild_xref_table(self, stream: StreamType) -> None:
1129 self.xref = {}
1130 stream.seek(0, 0)
1131 stream_data = stream.read(-1)
1133 for object_number, generation_number, object_start in self._find_pdf_objects(stream_data):
1134 if generation_number not in self.xref:
1135 self.xref[generation_number] = {}
1136 self.xref[generation_number][object_number] = object_start
1138 logger_warning("parsing for Object Streams", __name__)
1139 for generation_number in self.xref:
1140 for object_number in self.xref[generation_number]:
1141 # get_object in manual
1142 stream.seek(self.xref[generation_number][object_number], 0)
1143 try:
1144 _ = self.read_object_header(stream)
1145 obj = cast(StreamObject, read_object(stream, self))
1146 if obj.get("/Type", "") != "/ObjStm":
1147 continue
1148 object_stream = BytesIO(obj.get_data())
1149 actual_count = 0
1150 while True:
1151 current = read_until_whitespace(object_stream)
1152 if not current.isdigit():
1153 break
1154 inner_object_number = int(current)
1155 skip_over_whitespace(object_stream)
1156 object_stream.seek(-1, 1)
1157 current = read_until_whitespace(object_stream)
1158 if not current.isdigit(): # pragma: no cover
1159 break # pragma: no cover
1160 inner_generation_number = int(current)
1161 self.xref_objStm[inner_object_number] = (object_number, inner_generation_number)
1162 actual_count += 1
1163 if actual_count != obj.get("/N"): # pragma: no cover
1164 logger_warning( # pragma: no cover
1165 f"found {actual_count} objects within Object({object_number},{generation_number})"
1166 f" whereas {obj.get('/N')} expected",
1167 __name__,
1168 )
1169 except Exception: # could be multiple causes
1170 pass
1172 stream.seek(0, 0)
1173 for position in self._find_pdf_trailers(stream_data):
1174 stream.seek(position, 0)
1175 new_trailer = cast(dict[Any, Any], read_object(stream, self))
1176 # Here, we are parsing the file from start to end, the new data have to erase the existing.
1177 for key, value in new_trailer.items():
1178 self.trailer[key] = value
1180 def _read_xref_subsections(
1181 self,
1182 idx_pairs: list[int],
1183 get_entry: Callable[[int], Union[int, tuple[int, ...]]],
1184 used_before: Callable[[int, Union[int, tuple[int, ...]]], bool],
1185 ) -> None:
1186 """Read and process the subsections of the xref."""
1187 for start, size in self._pairs(idx_pairs):
1188 # The subsections must increase
1189 for num in range(start, start + size):
1190 # The first entry is the type
1191 xref_type = get_entry(0)
1192 # The rest of the elements depend on the xref_type
1193 if xref_type == 0:
1194 # linked list of free objects
1195 next_free_object = get_entry(1) # noqa: F841
1196 next_generation = get_entry(2) # noqa: F841
1197 elif xref_type == 1:
1198 # objects that are in use but are not compressed
1199 byte_offset = get_entry(1)
1200 generation = get_entry(2)
1201 if generation not in self.xref:
1202 self.xref[generation] = {} # type: ignore
1203 if not used_before(num, generation):
1204 self.xref[generation][num] = byte_offset # type: ignore
1205 elif xref_type == 2:
1206 # compressed objects
1207 objstr_num = get_entry(1)
1208 obstr_idx = get_entry(2)
1209 generation = 0 # PDF spec table 18, generation is 0
1210 if not used_before(num, generation):
1211 self.xref_objStm[num] = (objstr_num, obstr_idx)
1212 elif self.strict:
1213 raise PdfReadError(f"Unknown xref type: {xref_type}")
1215 def _pairs(self, array: list[int]) -> Iterable[tuple[int, int]]:
1216 """Iterate over pairs in the array."""
1217 i = 0
1218 while i + 1 < len(array):
1219 yield array[i], array[i + 1]
1220 i += 2
1222 def decrypt(self, password: Union[str, bytes]) -> PasswordType:
1223 """
1224 When using an encrypted / secured PDF file with the PDF Standard
1225 encryption handler, this function will allow the file to be decrypted.
1226 It checks the given password against the document's user password and
1227 owner password, and then stores the resulting decryption key if either
1228 password is correct.
1230 It does not matter which password was matched. Both passwords provide
1231 the correct decryption key that will allow the document to be used with
1232 this library.
1234 Args:
1235 password: The password to match.
1237 Returns:
1238 An indicator if the document was decrypted and whether it was the
1239 owner password or the user password.
1241 """
1242 if not self._encryption:
1243 raise PdfReadError("Not encrypted file")
1244 # TODO: raise Exception for wrong password
1245 return self._encryption.verify(password)
1247 @property
1248 def is_encrypted(self) -> bool:
1249 """
1250 Read-only boolean property showing whether this PDF file is encrypted.
1252 Note that this property, if true, will remain true even after the
1253 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
1254 """
1255 return TK.ENCRYPT in self.trailer
1257 def add_form_topname(self, name: str) -> Optional[DictionaryObject]:
1258 """
1259 Add a top level form that groups all form fields below it.
1261 Args:
1262 name: text string of the "/T" Attribute of the created object
1264 Returns:
1265 The created object. ``None`` means no object was created.
1267 """
1268 catalog = self.root_object
1270 if "/AcroForm" not in catalog or not isinstance(
1271 catalog["/AcroForm"], DictionaryObject
1272 ):
1273 return None
1274 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")])
1275 if "/Fields" not in acroform:
1276 # TODO: No error but this may be extended for XFA Forms
1277 return None
1279 interim = DictionaryObject()
1280 interim[NameObject("/T")] = TextStringObject(name)
1281 interim[NameObject("/Kids")] = acroform[NameObject("/Fields")]
1282 self.cache_indirect_object(
1283 0,
1284 max(i for (g, i) in self.resolved_objects if g == 0) + 1,
1285 interim,
1286 )
1287 arr = ArrayObject()
1288 arr.append(interim.indirect_reference)
1289 acroform[NameObject("/Fields")] = arr
1290 for o in cast(ArrayObject, interim["/Kids"]):
1291 obj = o.get_object()
1292 if "/Parent" in obj:
1293 logger_warning(
1294 f"Top Level Form Field {obj.indirect_reference} have a non-expected parent",
1295 __name__,
1296 )
1297 obj[NameObject("/Parent")] = interim.indirect_reference
1298 return interim
1300 def rename_form_topname(self, name: str) -> Optional[DictionaryObject]:
1301 """
1302 Rename top level form field that all form fields below it.
1304 Args:
1305 name: text string of the "/T" field of the created object
1307 Returns:
1308 The modified object. ``None`` means no object was modified.
1310 """
1311 catalog = self.root_object
1313 if "/AcroForm" not in catalog or not isinstance(
1314 catalog["/AcroForm"], DictionaryObject
1315 ):
1316 return None
1317 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")])
1318 if "/Fields" not in acroform:
1319 return None
1321 interim = cast(
1322 DictionaryObject,
1323 cast(ArrayObject, acroform[NameObject("/Fields")])[0].get_object(),
1324 )
1325 interim[NameObject("/T")] = TextStringObject(name)
1326 return interim
1328 def _repr_mimebundle_(
1329 self,
1330 include: Union[None, Iterable[str]] = None,
1331 exclude: Union[None, Iterable[str]] = None,
1332 ) -> dict[str, Any]:
1333 """
1334 Integration into Jupyter Notebooks.
1336 This method returns a dictionary that maps a mime-type to its
1337 representation.
1339 .. seealso::
1341 https://ipython.readthedocs.io/en/stable/config/integrating.html
1342 """
1343 self.stream.seek(0)
1344 pdf_data = self.stream.read()
1345 data = {
1346 "application/pdf": pdf_data,
1347 }
1349 if include is not None:
1350 # Filter representations based on include list
1351 data = {k: v for k, v in data.items() if k in include}
1353 if exclude is not None:
1354 # Remove representations based on exclude list
1355 data = {k: v for k, v in data.items() if k not in exclude}
1357 return data