Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_reader.py: 36%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# Copyright (c) 2006, Mathieu Fenniak
2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com>
3#
4# All rights reserved.
5#
6# Redistribution and use in source and binary forms, with or without
7# modification, are permitted provided that the following conditions are
8# met:
9#
10# * Redistributions of source code must retain the above copyright notice,
11# this list of conditions and the following disclaimer.
12# * Redistributions in binary form must reproduce the above copyright notice,
13# this list of conditions and the following disclaimer in the documentation
14# and/or other materials provided with the distribution.
15# * The name of the author may not be used to endorse or promote products
16# derived from this software without specific prior written permission.
17#
18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE
22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
28# POSSIBILITY OF SUCH DAMAGE.
30import os
31import re
32from collections.abc import Iterable
33from io import BytesIO, UnsupportedOperation
34from pathlib import Path
35from types import TracebackType
36from typing import (
37 TYPE_CHECKING,
38 Any,
39 Callable,
40 Optional,
41 Union,
42 cast,
43)
45from ._doc_common import PdfDocCommon, convert_to_int
46from ._encryption import Encryption, PasswordType
47from ._utils import (
48 StrByteType,
49 StreamType,
50 logger_warning,
51 read_non_whitespace,
52 read_previous_line,
53 read_until_whitespace,
54 skip_over_comment,
55 skip_over_whitespace,
56)
57from .constants import TrailerKeys as TK
58from .errors import (
59 EmptyFileError,
60 FileNotDecryptedError,
61 PdfReadError,
62 PdfStreamError,
63 WrongPasswordError,
64)
65from .generic import (
66 ArrayObject,
67 ContentStream,
68 DecodedStreamObject,
69 DictionaryObject,
70 EncodedStreamObject,
71 IndirectObject,
72 NameObject,
73 NullObject,
74 NumberObject,
75 PdfObject,
76 StreamObject,
77 TextStringObject,
78 is_null_or_none,
79 read_object,
80)
81from .xmp import XmpInformation
83if TYPE_CHECKING:
84 from ._page import PageObject
87class PdfReader(PdfDocCommon):
88 """
89 Initialize a PdfReader object.
91 This operation can take some time, as the PDF stream's cross-reference
92 tables are read into memory.
94 Args:
95 stream: A File object or an object that supports the standard read
96 and seek methods similar to a File object. Could also be a
97 string representing a path to a PDF file.
98 strict: Determines whether user should be warned of all
99 problems and also causes some correctable problems to be fatal.
100 Defaults to ``False``.
101 password: Decrypt PDF file at initialization. If the
102 password is None, the file will not be decrypted.
103 Defaults to ``None``.
105 """
107 def __init__(
108 self,
109 stream: Union[StrByteType, Path],
110 strict: bool = False,
111 password: Union[None, str, bytes] = None,
112 ) -> None:
113 self.strict = strict
114 self.flattened_pages: Optional[list[PageObject]] = None
116 #: Storage of parsed PDF objects.
117 self.resolved_objects: dict[tuple[Any, Any], Optional[PdfObject]] = {}
119 self._startxref: int = 0
120 self.xref_index = 0
121 self.xref: dict[int, dict[Any, Any]] = {}
122 self.xref_free_entry: dict[int, dict[Any, Any]] = {}
123 self.xref_objStm: dict[int, tuple[Any, Any]] = {}
124 self.trailer = DictionaryObject()
126 # Map page indirect_reference number to page number
127 self._page_id2num: Optional[dict[Any, Any]] = None
129 self._validated_root: Optional[DictionaryObject] = None
131 self._initialize_stream(stream)
132 self._known_objects: set[tuple[int, int]] = set()
134 self._override_encryption = False
135 self._encryption: Optional[Encryption] = None
136 if self.is_encrypted:
137 self._handle_encryption(password)
138 elif password is not None:
139 raise PdfReadError("Not an encrypted file")
141 def _initialize_stream(self, stream: Union[StrByteType, Path]) -> None:
142 if hasattr(stream, "mode") and "b" not in stream.mode:
143 logger_warning(
144 "PdfReader stream/file object is not in binary mode. "
145 "It may not be read correctly.",
146 __name__,
147 )
148 self._stream_opened = False
149 if isinstance(stream, (str, Path)):
150 with open(stream, "rb") as fh:
151 stream = BytesIO(fh.read())
152 self._stream_opened = True
153 self.read(stream)
154 self.stream = stream
156 def _handle_encryption(self, password: Optional[Union[str, bytes]]) -> None:
157 self._override_encryption = True
158 # Some documents may not have a /ID, use two empty
159 # byte strings instead. Solves
160 # https://github.com/py-pdf/pypdf/issues/608
161 id_entry = self.trailer.get(TK.ID)
162 id1_entry = id_entry[0].get_object().original_bytes if id_entry else b""
163 encrypt_entry = cast(DictionaryObject, self.trailer[TK.ENCRYPT].get_object())
164 self._encryption = Encryption.read(encrypt_entry, id1_entry)
166 # try empty password if no password provided
167 pwd = password if password is not None else b""
168 if (
169 self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED
170 and password is not None
171 ):
172 # raise if password provided
173 raise WrongPasswordError("Wrong password")
174 self._override_encryption = False
176 def __enter__(self) -> "PdfReader":
177 return self
179 def __exit__(
180 self,
181 exc_type: Optional[type[BaseException]],
182 exc_val: Optional[BaseException],
183 exc_tb: Optional[TracebackType],
184 ) -> None:
185 self.close()
187 def close(self) -> None:
188 """Close the stream if opened in __init__ and clear memory."""
189 if self._stream_opened:
190 self.stream.close()
191 self.flattened_pages = []
192 self.resolved_objects = {}
193 self.trailer = DictionaryObject()
194 self.xref = {}
195 self.xref_free_entry = {}
196 self.xref_objStm = {}
198 @property
199 def root_object(self) -> DictionaryObject:
200 """Provide access to "/Root". Standardized with PdfWriter."""
201 if self._validated_root:
202 return self._validated_root
203 root = self.trailer.get(TK.ROOT)
204 if is_null_or_none(root):
205 logger_warning('Cannot find "/Root" key in trailer', __name__)
206 elif (
207 cast(DictionaryObject, cast(PdfObject, root).get_object()).get("/Type")
208 == "/Catalog"
209 ):
210 self._validated_root = cast(
211 DictionaryObject, cast(PdfObject, root).get_object()
212 )
213 else:
214 logger_warning("Invalid Root object in trailer", __name__)
215 if self._validated_root is None:
216 logger_warning('Searching object with "/Catalog" key', __name__)
217 nb = cast(int, self.trailer.get("/Size", 0))
218 for i in range(nb):
219 try:
220 o = self.get_object(i + 1)
221 except Exception: # to be sure to capture all errors
222 o = None
223 if isinstance(o, DictionaryObject) and o.get("/Type") == "/Catalog":
224 self._validated_root = o
225 logger_warning(f"Root found at {o.indirect_reference!r}", __name__)
226 break
227 if self._validated_root is None:
228 if not is_null_or_none(root) and "/Pages" in cast(DictionaryObject, cast(PdfObject, root).get_object()):
229 logger_warning(
230 f"Possible root found at {cast(PdfObject, root).indirect_reference!r}, but missing /Catalog key",
231 __name__
232 )
233 self._validated_root = cast(
234 DictionaryObject, cast(PdfObject, root).get_object()
235 )
236 else:
237 raise PdfReadError("Cannot find Root object in pdf")
238 return self._validated_root
240 @property
241 def _info(self) -> Optional[DictionaryObject]:
242 """
243 Provide access to "/Info". Standardized with PdfWriter.
245 Returns:
246 /Info Dictionary; None if the entry does not exist
248 """
249 info = self.trailer.get(TK.INFO, None)
250 if is_null_or_none(info):
251 return None
252 assert info is not None, "mypy"
253 info = info.get_object()
254 if not isinstance(info, DictionaryObject):
255 raise PdfReadError(
256 "Trailer not found or does not point to a document information dictionary"
257 )
258 return info
260 @property
261 def _ID(self) -> Optional[ArrayObject]:
262 """
263 Provide access to "/ID". Standardized with PdfWriter.
265 Returns:
266 /ID array; None if the entry does not exist
268 """
269 id = self.trailer.get(TK.ID, None)
270 if is_null_or_none(id):
271 return None
272 assert id is not None, "mypy"
273 return cast(ArrayObject, id.get_object())
275 @property
276 def pdf_header(self) -> str:
277 """
278 The first 8 bytes of the file.
280 This is typically something like ``'%PDF-1.6'`` and can be used to
281 detect if the file is actually a PDF file and which version it is.
282 """
283 # TODO: Make this return a bytes object for consistency
284 # but that needs a deprecation
285 loc = self.stream.tell()
286 self.stream.seek(0, 0)
287 pdf_file_version = self.stream.read(8).decode("utf-8", "backslashreplace")
288 self.stream.seek(loc, 0) # return to where it was
289 return pdf_file_version
291 @property
292 def xmp_metadata(self) -> Optional[XmpInformation]:
293 """XMP (Extensible Metadata Platform) data."""
294 try:
295 self._override_encryption = True
296 return cast(XmpInformation, self.root_object.xmp_metadata)
297 finally:
298 self._override_encryption = False
300 def _get_page_number_by_indirect(
301 self, indirect_reference: Union[None, int, NullObject, IndirectObject]
302 ) -> Optional[int]:
303 """
304 Retrieve the page number from an indirect reference.
306 Args:
307 indirect_reference: The indirect reference to locate.
309 Returns:
310 Page number or None.
312 """
313 if self._page_id2num is None:
314 self._page_id2num = {
315 x.indirect_reference.idnum: i for i, x in enumerate(self.pages) # type: ignore
316 }
318 if is_null_or_none(indirect_reference):
319 return None
320 assert isinstance(indirect_reference, (int, IndirectObject)), "mypy"
321 if isinstance(indirect_reference, int):
322 idnum = indirect_reference
323 else:
324 idnum = indirect_reference.idnum
325 assert self._page_id2num is not None, "hint for mypy"
326 return self._page_id2num.get(idnum, None)
328 def _get_object_from_stream(
329 self, indirect_reference: IndirectObject
330 ) -> Union[int, PdfObject, str]:
331 # indirect reference to object in object stream
332 # read the entire object stream into memory
333 stmnum, idx = self.xref_objStm[indirect_reference.idnum]
334 obj_stm: EncodedStreamObject = IndirectObject(stmnum, 0, self).get_object() # type: ignore
335 # This is an xref to a stream, so its type better be a stream
336 assert cast(str, obj_stm["/Type"]) == "/ObjStm"
337 stream_data = BytesIO(obj_stm.get_data())
338 for i in range(obj_stm["/N"]): # type: ignore
339 read_non_whitespace(stream_data)
340 stream_data.seek(-1, 1)
341 objnum = NumberObject.read_from_stream(stream_data)
342 read_non_whitespace(stream_data)
343 stream_data.seek(-1, 1)
344 offset = NumberObject.read_from_stream(stream_data)
345 read_non_whitespace(stream_data)
346 stream_data.seek(-1, 1)
347 if objnum != indirect_reference.idnum:
348 # We're only interested in one object
349 continue
350 if self.strict and idx != i:
351 raise PdfReadError("Object is in wrong index.")
352 stream_data.seek(int(obj_stm["/First"] + offset), 0) # type: ignore
354 # To cope with case where the 'pointer' is on a white space
355 read_non_whitespace(stream_data)
356 stream_data.seek(-1, 1)
358 try:
359 obj = read_object(stream_data, self)
360 except PdfStreamError as exc:
361 # Stream object cannot be read. Normally, a critical error, but
362 # Adobe Reader doesn't complain, so continue (in strict mode?)
363 logger_warning(
364 f"Invalid stream (index {i}) within object "
365 f"{indirect_reference.idnum} {indirect_reference.generation}: "
366 f"{exc}",
367 __name__,
368 )
370 if self.strict: # pragma: no cover
371 raise PdfReadError(
372 f"Cannot read object stream: {exc}"
373 ) # pragma: no cover
374 # Replace with null. Hopefully it's nothing important.
375 obj = NullObject() # pragma: no cover
376 return obj
378 if self.strict: # pragma: no cover
379 raise PdfReadError(
380 "This is a fatal error in strict mode."
381 ) # pragma: no cover
382 return NullObject() # pragma: no cover
384 def get_object(
385 self, indirect_reference: Union[int, IndirectObject]
386 ) -> Optional[PdfObject]:
387 if isinstance(indirect_reference, int):
388 indirect_reference = IndirectObject(indirect_reference, 0, self)
389 retval = self.cache_get_indirect_object(
390 indirect_reference.generation, indirect_reference.idnum
391 )
392 if retval is not None:
393 return retval
394 if (
395 indirect_reference.generation == 0
396 and indirect_reference.idnum in self.xref_objStm
397 ):
398 retval = self._get_object_from_stream(indirect_reference) # type: ignore
399 elif (
400 indirect_reference.generation in self.xref
401 and indirect_reference.idnum in self.xref[indirect_reference.generation]
402 ):
403 if self.xref_free_entry.get(indirect_reference.generation, {}).get(
404 indirect_reference.idnum, False
405 ):
406 return NullObject()
407 start = self.xref[indirect_reference.generation][indirect_reference.idnum]
408 self.stream.seek(start, 0)
409 try:
410 idnum, generation = self.read_object_header(self.stream)
411 if (
412 idnum != indirect_reference.idnum
413 or generation != indirect_reference.generation
414 ):
415 raise PdfReadError("Not matching, we parse the file for it")
416 except Exception:
417 if hasattr(self.stream, "getbuffer"):
418 buf = bytes(self.stream.getbuffer())
419 else:
420 p = self.stream.tell()
421 self.stream.seek(0, 0)
422 buf = self.stream.read(-1)
423 self.stream.seek(p, 0)
424 m = re.search(
425 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(),
426 buf,
427 )
428 if m is not None:
429 logger_warning(
430 f"Object ID {indirect_reference.idnum},{indirect_reference.generation} ref repaired",
431 __name__,
432 )
433 self.xref[indirect_reference.generation][
434 indirect_reference.idnum
435 ] = (m.start(0) + 1)
436 self.stream.seek(m.start(0) + 1)
437 idnum, generation = self.read_object_header(self.stream)
438 else:
439 idnum = -1
440 generation = -1 # exception will be raised below
441 if idnum != indirect_reference.idnum and self.xref_index:
442 # xref table probably had bad indexes due to not being zero-indexed
443 if self.strict:
444 raise PdfReadError(
445 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) "
446 f"does not match actual ({idnum} {generation}); "
447 "xref table not zero-indexed."
448 )
449 # xref table is corrected in non-strict mode
450 elif idnum != indirect_reference.idnum and self.strict:
451 # some other problem
452 raise PdfReadError(
453 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) "
454 f"does not match actual ({idnum} {generation})."
455 )
456 if self.strict:
457 assert generation == indirect_reference.generation
459 current_object = (indirect_reference.idnum, indirect_reference.generation)
460 if current_object in self._known_objects:
461 raise PdfReadError(f"Detected loop with self reference for {indirect_reference!r}.")
462 self._known_objects.add(current_object)
463 retval = read_object(self.stream, self) # type: ignore
464 self._known_objects.remove(current_object)
466 # override encryption is used for the /Encrypt dictionary
467 if not self._override_encryption and self._encryption is not None:
468 # if we don't have the encryption key:
469 if not self._encryption.is_decrypted():
470 raise FileNotDecryptedError("File has not been decrypted")
471 # otherwise, decrypt here...
472 retval = cast(PdfObject, retval)
473 retval = self._encryption.decrypt_object(
474 retval, indirect_reference.idnum, indirect_reference.generation
475 )
476 else:
477 if hasattr(self.stream, "getbuffer"):
478 buf = bytes(self.stream.getbuffer())
479 else:
480 p = self.stream.tell()
481 self.stream.seek(0, 0)
482 buf = self.stream.read(-1)
483 self.stream.seek(p, 0)
484 m = re.search(
485 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(),
486 buf,
487 )
488 if m is not None:
489 logger_warning(
490 f"Object {indirect_reference.idnum} {indirect_reference.generation} found",
491 __name__,
492 )
493 if indirect_reference.generation not in self.xref:
494 self.xref[indirect_reference.generation] = {}
495 self.xref[indirect_reference.generation][indirect_reference.idnum] = (
496 m.start(0) + 1
497 )
498 self.stream.seek(m.end(0) + 1)
499 skip_over_whitespace(self.stream)
500 self.stream.seek(-1, 1)
501 retval = read_object(self.stream, self) # type: ignore
503 # override encryption is used for the /Encrypt dictionary
504 if not self._override_encryption and self._encryption is not None:
505 # if we don't have the encryption key:
506 if not self._encryption.is_decrypted():
507 raise FileNotDecryptedError("File has not been decrypted")
508 # otherwise, decrypt here...
509 retval = cast(PdfObject, retval)
510 retval = self._encryption.decrypt_object(
511 retval, indirect_reference.idnum, indirect_reference.generation
512 )
513 else:
514 logger_warning(
515 f"Object {indirect_reference.idnum} {indirect_reference.generation} not defined.",
516 __name__,
517 )
518 if self.strict:
519 raise PdfReadError("Could not find object.")
520 self.cache_indirect_object(
521 indirect_reference.generation, indirect_reference.idnum, retval
522 )
523 return retval
525 def read_object_header(self, stream: StreamType) -> tuple[int, int]:
526 # Should never be necessary to read out whitespace, since the
527 # cross-reference table should put us in the right spot to read the
528 # object header. In reality some files have stupid cross-reference
529 # tables that are off by whitespace bytes.
530 skip_over_comment(stream)
531 extra = skip_over_whitespace(stream)
532 stream.seek(-1, 1)
533 idnum = read_until_whitespace(stream)
534 extra |= skip_over_whitespace(stream)
535 stream.seek(-1, 1)
536 generation = read_until_whitespace(stream)
537 extra |= skip_over_whitespace(stream)
538 stream.seek(-1, 1)
540 # although it's not used, it might still be necessary to read
541 _obj = stream.read(3)
543 read_non_whitespace(stream)
544 stream.seek(-1, 1)
545 if extra and self.strict:
546 logger_warning(
547 f"Superfluous whitespace found in object header {idnum} {generation}", # type: ignore
548 __name__,
549 )
550 return int(idnum), int(generation)
552 def cache_get_indirect_object(
553 self, generation: int, idnum: int
554 ) -> Optional[PdfObject]:
555 try:
556 return self.resolved_objects.get((generation, idnum))
557 except RecursionError:
558 raise PdfReadError("Maximum recursion depth reached.")
560 def cache_indirect_object(
561 self, generation: int, idnum: int, obj: Optional[PdfObject]
562 ) -> Optional[PdfObject]:
563 if (generation, idnum) in self.resolved_objects:
564 msg = f"Overwriting cache for {generation} {idnum}"
565 if self.strict:
566 raise PdfReadError(msg)
567 logger_warning(msg, __name__)
568 self.resolved_objects[(generation, idnum)] = obj
569 if obj is not None:
570 obj.indirect_reference = IndirectObject(idnum, generation, self)
571 return obj
573 def _replace_object(self, indirect: IndirectObject, obj: PdfObject) -> PdfObject:
574 # function reserved for future development
575 if indirect.pdf != self:
576 raise ValueError("Cannot update PdfReader with external object")
577 if (indirect.generation, indirect.idnum) not in self.resolved_objects:
578 raise ValueError("Cannot find referenced object")
579 self.resolved_objects[(indirect.generation, indirect.idnum)] = obj
580 obj.indirect_reference = indirect
581 return obj
583 def read(self, stream: StreamType) -> None:
584 """
585 Read and process the PDF stream, extracting necessary data.
587 Args:
588 stream: The PDF file stream.
590 """
591 self._basic_validation(stream)
592 self._find_eof_marker(stream)
593 startxref = self._find_startxref_pos(stream)
594 self._startxref = startxref
596 # check and eventually correct the startxref only if not strict
597 xref_issue_nr = self._get_xref_issues(stream, startxref)
598 if xref_issue_nr != 0:
599 if self.strict and xref_issue_nr:
600 raise PdfReadError("Broken xref table")
601 logger_warning(f"incorrect startxref pointer({xref_issue_nr})", __name__)
603 # read all cross-reference tables and their trailers
604 self._read_xref_tables_and_trailers(stream, startxref, xref_issue_nr)
606 # if not zero-indexed, verify that the table is correct; change it if necessary
607 if self.xref_index and not self.strict:
608 loc = stream.tell()
609 for gen, xref_entry in self.xref.items():
610 if gen == 65535:
611 continue
612 xref_k = sorted(
613 xref_entry.keys()
614 ) # ensure ascending to prevent damage
615 for id in xref_k:
616 stream.seek(xref_entry[id], 0)
617 try:
618 pid, _pgen = self.read_object_header(stream)
619 except ValueError:
620 self._rebuild_xref_table(stream)
621 break
622 if pid == id - self.xref_index:
623 # fixing index item per item is required for revised PDF.
624 self.xref[gen][pid] = self.xref[gen][id]
625 del self.xref[gen][id]
626 # if not, then either it's just plain wrong, or the
627 # non-zero-index is actually correct
628 stream.seek(loc, 0) # return to where it was
630 # remove wrong objects (not pointing to correct structures) - cf #2326
631 if not self.strict:
632 loc = stream.tell()
633 for gen, xref_entry in self.xref.items():
634 if gen == 65535:
635 continue
636 ids = list(xref_entry.keys())
637 for id in ids:
638 stream.seek(xref_entry[id], 0)
639 try:
640 self.read_object_header(stream)
641 except ValueError:
642 logger_warning(
643 f"Ignoring wrong pointing object {id} {gen} (offset {xref_entry[id]})",
644 __name__,
645 )
646 del xref_entry[id] # we can delete the id, we are parsing ids
647 stream.seek(loc, 0) # return to where it was
649 def _basic_validation(self, stream: StreamType) -> None:
650 """Ensure the stream is valid and not empty."""
651 stream.seek(0, os.SEEK_SET)
652 try:
653 header_byte = stream.read(5)
654 except UnicodeDecodeError:
655 raise UnsupportedOperation("cannot read header")
656 if header_byte == b"":
657 raise EmptyFileError("Cannot read an empty file")
658 if header_byte != b"%PDF-":
659 if self.strict:
660 raise PdfReadError(
661 f"PDF starts with '{header_byte.decode('utf8')}', "
662 "but '%PDF-' expected"
663 )
664 logger_warning(f"invalid pdf header: {header_byte}", __name__)
665 stream.seek(0, os.SEEK_END)
667 def _find_eof_marker(self, stream: StreamType) -> None:
668 """
669 Jump to the %%EOF marker.
671 According to the specs, the %%EOF marker should be at the very end of
672 the file. Hence for standard-compliant PDF documents this function will
673 read only the last part (DEFAULT_BUFFER_SIZE).
674 """
675 HEADER_SIZE = 8 # to parse whole file, Header is e.g. '%PDF-1.6'
676 line = b""
677 first = True
678 while not line.startswith(b"%%EOF"):
679 if line != b"" and first:
680 if any(
681 line.strip().endswith(tr) for tr in (b"%%EO", b"%%E", b"%%", b"%")
682 ):
683 # Consider the file as truncated while
684 # having enough confidence to carry on.
685 logger_warning("EOF marker seems truncated", __name__)
686 break
687 first = False
688 if b"startxref" in line:
689 logger_warning(
690 "CAUTION: startxref found while searching for %%EOF. "
691 "The file might be truncated and some data might not be read.",
692 __name__,
693 )
694 if stream.tell() < HEADER_SIZE:
695 if self.strict:
696 raise PdfReadError("EOF marker not found")
697 logger_warning("EOF marker not found", __name__)
698 line = read_previous_line(stream)
700 def _find_startxref_pos(self, stream: StreamType) -> int:
701 """
702 Find startxref entry - the location of the xref table.
704 Args:
705 stream:
707 Returns:
708 The bytes offset
710 """
711 line = read_previous_line(stream)
712 try:
713 startxref = int(line)
714 except ValueError:
715 # 'startxref' may be on the same line as the location
716 if not line.startswith(b"startxref"):
717 raise PdfReadError("startxref not found")
718 startxref = int(line[9:].strip())
719 logger_warning("startxref on same line as offset", __name__)
720 else:
721 line = read_previous_line(stream)
722 if not line.startswith(b"startxref"):
723 raise PdfReadError("startxref not found")
724 return startxref
726 def _read_standard_xref_table(self, stream: StreamType) -> None:
727 # standard cross-reference table
728 ref = stream.read(3)
729 if ref != b"ref":
730 raise PdfReadError("xref table read error")
731 read_non_whitespace(stream)
732 stream.seek(-1, 1)
733 first_time = True # check if the first time looking at the xref table
734 while True:
735 num = cast(int, read_object(stream, self))
736 if first_time and num != 0:
737 self.xref_index = num
738 if self.strict:
739 logger_warning(
740 "Xref table not zero-indexed. ID numbers for objects will be corrected.",
741 __name__,
742 )
743 # if table not zero indexed, could be due to error from when PDF was created
744 # which will lead to mismatched indices later on, only warned and corrected if self.strict==True
745 first_time = False
746 read_non_whitespace(stream)
747 stream.seek(-1, 1)
748 size = cast(int, read_object(stream, self))
749 if not isinstance(size, int):
750 logger_warning(
751 "Invalid/Truncated xref table. Rebuilding it.",
752 __name__,
753 )
754 self._rebuild_xref_table(stream)
755 stream.read()
756 return
757 read_non_whitespace(stream)
758 stream.seek(-1, 1)
759 cnt = 0
760 while cnt < size:
761 line = stream.read(20)
762 if not line:
763 raise PdfReadError("Unexpected empty line in Xref table.")
765 # It's very clear in section 3.4.3 of the PDF spec
766 # that all cross-reference table lines are a fixed
767 # 20 bytes (as of PDF 1.7). However, some files have
768 # 21-byte entries (or more) due to the use of \r\n
769 # (CRLF) EOL's. Detect that case, and adjust the line
770 # until it does not begin with a \r (CR) or \n (LF).
771 while line[0] in b"\x0D\x0A":
772 stream.seek(-20 + 1, 1)
773 line = stream.read(20)
775 # On the other hand, some malformed PDF files
776 # use a single character EOL without a preceding
777 # space. Detect that case, and seek the stream
778 # back one character (0-9 means we've bled into
779 # the next xref entry, t means we've bled into the
780 # text "trailer"):
781 if line[-1] in b"0123456789t":
782 stream.seek(-1, 1)
784 try:
785 offset_b, generation_b = line[:16].split(b" ")
786 entry_type_b = line[17:18]
788 offset, generation = int(offset_b), int(generation_b)
789 except Exception:
790 if hasattr(stream, "getbuffer"):
791 buf = bytes(stream.getbuffer())
792 else:
793 p = stream.tell()
794 stream.seek(0, 0)
795 buf = stream.read(-1)
796 stream.seek(p)
798 f = re.search(rf"{num}\s+(\d+)\s+obj".encode(), buf)
799 if f is None:
800 logger_warning(
801 f"entry {num} in Xref table invalid; object not found",
802 __name__,
803 )
804 generation = 65535
805 offset = -1
806 entry_type_b = b"f"
807 else:
808 logger_warning(
809 f"entry {num} in Xref table invalid but object found",
810 __name__,
811 )
812 generation = int(f.group(1))
813 offset = f.start()
815 if generation not in self.xref:
816 self.xref[generation] = {}
817 self.xref_free_entry[generation] = {}
818 if num in self.xref[generation]:
819 # It really seems like we should allow the last
820 # xref table in the file to override previous
821 # ones. Since we read the file backwards, assume
822 # any existing key is already set correctly.
823 pass
824 else:
825 if entry_type_b == b"n":
826 self.xref[generation][num] = offset
827 try:
828 self.xref_free_entry[generation][num] = entry_type_b == b"f"
829 except Exception:
830 pass
831 try:
832 self.xref_free_entry[65535][num] = entry_type_b == b"f"
833 except Exception:
834 pass
835 cnt += 1
836 num += 1
837 read_non_whitespace(stream)
838 stream.seek(-1, 1)
839 trailer_tag = stream.read(7)
840 if trailer_tag != b"trailer":
841 # more xrefs!
842 stream.seek(-7, 1)
843 else:
844 break
846 def _read_xref_tables_and_trailers(
847 self, stream: StreamType, startxref: Optional[int], xref_issue_nr: int
848 ) -> None:
849 """Read the cross-reference tables and trailers in the PDF stream."""
850 self.xref = {}
851 self.xref_free_entry = {}
852 self.xref_objStm = {}
853 self.trailer = DictionaryObject()
854 while startxref is not None:
855 # load the xref table
856 stream.seek(startxref, 0)
857 x = stream.read(1)
858 if x in b"\r\n":
859 x = stream.read(1)
860 if x == b"x":
861 startxref = self._read_xref(stream)
862 elif xref_issue_nr:
863 try:
864 self._rebuild_xref_table(stream)
865 break
866 except Exception:
867 xref_issue_nr = 0
868 elif x.isdigit():
869 try:
870 xrefstream = self._read_pdf15_xref_stream(stream)
871 except Exception as e:
872 if TK.ROOT in self.trailer:
873 logger_warning(
874 f"Previous trailer cannot be read: {e.args}", __name__
875 )
876 break
877 raise PdfReadError(f"Trailer cannot be read: {e!s}")
878 self._process_xref_stream(xrefstream)
879 if "/Prev" in xrefstream:
880 startxref = cast(int, xrefstream["/Prev"])
881 else:
882 break
883 else:
884 startxref = self._read_xref_other_error(stream, startxref)
886 def _process_xref_stream(self, xrefstream: DictionaryObject) -> None:
887 """Process and handle the xref stream."""
888 trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE
889 for key in trailer_keys:
890 if key in xrefstream and key not in self.trailer:
891 self.trailer[NameObject(key)] = xrefstream.raw_get(key)
892 if "/XRefStm" in xrefstream:
893 p = self.stream.tell()
894 self.stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0)
895 self._read_pdf15_xref_stream(self.stream)
896 self.stream.seek(p, 0)
898 def _read_xref(self, stream: StreamType) -> Optional[int]:
899 self._read_standard_xref_table(stream)
900 if stream.read(1) == b"":
901 return None
902 stream.seek(-1, 1)
903 read_non_whitespace(stream)
904 stream.seek(-1, 1)
905 new_trailer = cast(dict[str, Any], read_object(stream, self))
906 for key, value in new_trailer.items():
907 if key not in self.trailer:
908 self.trailer[key] = value
909 if "/XRefStm" in new_trailer:
910 p = stream.tell()
911 stream.seek(cast(int, new_trailer["/XRefStm"]) + 1, 0)
912 try:
913 self._read_pdf15_xref_stream(stream)
914 except Exception:
915 logger_warning(
916 f"XRef object at {new_trailer['/XRefStm']} can not be read, some object may be missing",
917 __name__,
918 )
919 stream.seek(p, 0)
920 if "/Prev" in new_trailer:
921 return new_trailer["/Prev"]
922 return None
924 def _read_xref_other_error(
925 self, stream: StreamType, startxref: int
926 ) -> Optional[int]:
927 # some PDFs have /Prev=0 in the trailer, instead of no /Prev
928 if startxref == 0:
929 if self.strict:
930 raise PdfReadError(
931 "/Prev=0 in the trailer (try opening with strict=False)"
932 )
933 logger_warning(
934 "/Prev=0 in the trailer - assuming there is no previous xref table",
935 __name__,
936 )
937 return None
938 # bad xref character at startxref. Let's see if we can find
939 # the xref table nearby, as we've observed this error with an
940 # off-by-one before.
941 stream.seek(-11, 1)
942 tmp = stream.read(20)
943 xref_loc = tmp.find(b"xref")
944 if xref_loc != -1:
945 startxref -= 10 - xref_loc
946 return startxref
947 # No explicit xref table, try finding a cross-reference stream.
948 stream.seek(startxref, 0)
949 for look in range(25): # value extended to cope with more linearized files
950 if stream.read(1).isdigit():
951 # This is not a standard PDF, consider adding a warning
952 startxref += look
953 return startxref
954 # no xref table found at specified location
955 if "/Root" in self.trailer and not self.strict:
956 # if Root has been already found, just raise warning
957 logger_warning("Invalid parent xref., rebuild xref", __name__)
958 try:
959 self._rebuild_xref_table(stream)
960 return None
961 except Exception:
962 raise PdfReadError("Cannot rebuild xref")
963 raise PdfReadError("Could not find xref table at specified location")
965 def _read_pdf15_xref_stream(
966 self, stream: StreamType
967 ) -> Union[ContentStream, EncodedStreamObject, DecodedStreamObject]:
968 """Read the cross-reference stream for PDF 1.5+."""
969 stream.seek(-1, 1)
970 idnum, generation = self.read_object_header(stream)
971 xrefstream = cast(ContentStream, read_object(stream, self))
972 if cast(str, xrefstream["/Type"]) != "/XRef":
973 raise PdfReadError(f"Unexpected type {xrefstream['/Type']!r}")
974 self.cache_indirect_object(generation, idnum, xrefstream)
976 # Index pairs specify the subsections in the dictionary.
977 # If none, create one subsection that spans everything.
978 if "/Size" not in xrefstream:
979 # According to table 17 of the PDF 2.0 specification, this key is required.
980 raise PdfReadError(f"Size missing from XRef stream {xrefstream!r}!")
981 idx_pairs = xrefstream.get("/Index", [0, xrefstream["/Size"]])
983 entry_sizes = cast(dict[Any, Any], xrefstream.get("/W"))
984 assert len(entry_sizes) >= 3
985 if self.strict and len(entry_sizes) > 3:
986 raise PdfReadError(f"Too many entry sizes: {entry_sizes}")
988 stream_data = BytesIO(xrefstream.get_data())
990 def get_entry(i: int) -> Union[int, tuple[int, ...]]:
991 # Reads the correct number of bytes for each entry. See the
992 # discussion of the W parameter in PDF spec table 17.
993 if entry_sizes[i] > 0:
994 d = stream_data.read(entry_sizes[i])
995 return convert_to_int(d, entry_sizes[i])
997 # PDF Spec Table 17: A value of zero for an element in the
998 # W array indicates...the default value shall be used
999 if i == 0:
1000 return 1 # First value defaults to 1
1001 return 0
1003 def used_before(num: int, generation: Union[int, tuple[int, ...]]) -> bool:
1004 # We move backwards through the xrefs, don't replace any.
1005 return num in self.xref.get(generation, []) or num in self.xref_objStm # type: ignore
1007 # Iterate through each subsection
1008 self._read_xref_subsections(idx_pairs, get_entry, used_before)
1009 return xrefstream
1011 @staticmethod
1012 def _get_xref_issues(stream: StreamType, startxref: int) -> int:
1013 """
1014 Return an int which indicates an issue. 0 means there is no issue.
1016 Args:
1017 stream:
1018 startxref:
1020 Returns:
1021 0 means no issue, other values represent specific issues.
1023 """
1024 if startxref == 0:
1025 return 4
1027 stream.seek(startxref - 1, 0) # -1 to check character before
1028 line = stream.read(1)
1029 if line == b"j":
1030 line = stream.read(1)
1031 if line not in b"\r\n \t":
1032 return 1
1033 line = stream.read(4)
1034 if line != b"xref":
1035 # not a xref so check if it is an XREF object
1036 line = b""
1037 while line in b"0123456789 \t":
1038 line = stream.read(1)
1039 if line == b"":
1040 return 2
1041 line += stream.read(2) # 1 char already read, +2 to check "obj"
1042 if line.lower() != b"obj":
1043 return 3
1044 return 0
1046 def _rebuild_xref_table(self, stream: StreamType) -> None:
1047 self.xref = {}
1048 stream.seek(0, 0)
1049 f_ = stream.read(-1)
1051 for m in re.finditer(rb"[\r\n \t][ \t]*(\d+)[ \t]+(\d+)[ \t]+obj", f_):
1052 idnum = int(m.group(1))
1053 generation = int(m.group(2))
1054 if generation not in self.xref:
1055 self.xref[generation] = {}
1056 self.xref[generation][idnum] = m.start(1)
1058 logger_warning("parsing for Object Streams", __name__)
1059 for g in self.xref:
1060 for i in self.xref[g]:
1061 # get_object in manual
1062 stream.seek(self.xref[g][i], 0)
1063 try:
1064 _ = self.read_object_header(stream)
1065 o = cast(StreamObject, read_object(stream, self))
1066 if o.get("/Type", "") != "/ObjStm":
1067 continue
1068 strm = BytesIO(o.get_data())
1069 cpt = 0
1070 while True:
1071 s = read_until_whitespace(strm)
1072 if not s.isdigit():
1073 break
1074 _i = int(s)
1075 skip_over_whitespace(strm)
1076 strm.seek(-1, 1)
1077 s = read_until_whitespace(strm)
1078 if not s.isdigit(): # pragma: no cover
1079 break # pragma: no cover
1080 _o = int(s)
1081 self.xref_objStm[_i] = (i, _o)
1082 cpt += 1
1083 if cpt != o.get("/N"): # pragma: no cover
1084 logger_warning( # pragma: no cover
1085 f"found {cpt} objects within Object({i},{g})"
1086 f" whereas {o.get('/N')} expected",
1087 __name__,
1088 )
1089 except Exception: # could be multiple causes
1090 pass
1092 stream.seek(0, 0)
1093 for m in re.finditer(rb"[\r\n \t][ \t]*trailer[\r\n \t]*(<<)", f_):
1094 stream.seek(m.start(1), 0)
1095 new_trailer = cast(dict[Any, Any], read_object(stream, self))
1096 # Here, we are parsing the file from start to end, the new data have to erase the existing.
1097 for key, value in list(new_trailer.items()):
1098 self.trailer[key] = value
1100 def _read_xref_subsections(
1101 self,
1102 idx_pairs: list[int],
1103 get_entry: Callable[[int], Union[int, tuple[int, ...]]],
1104 used_before: Callable[[int, Union[int, tuple[int, ...]]], bool],
1105 ) -> None:
1106 """Read and process the subsections of the xref."""
1107 for start, size in self._pairs(idx_pairs):
1108 # The subsections must increase
1109 for num in range(start, start + size):
1110 # The first entry is the type
1111 xref_type = get_entry(0)
1112 # The rest of the elements depend on the xref_type
1113 if xref_type == 0:
1114 # linked list of free objects
1115 next_free_object = get_entry(1) # noqa: F841
1116 next_generation = get_entry(2) # noqa: F841
1117 elif xref_type == 1:
1118 # objects that are in use but are not compressed
1119 byte_offset = get_entry(1)
1120 generation = get_entry(2)
1121 if generation not in self.xref:
1122 self.xref[generation] = {} # type: ignore
1123 if not used_before(num, generation):
1124 self.xref[generation][num] = byte_offset # type: ignore
1125 elif xref_type == 2:
1126 # compressed objects
1127 objstr_num = get_entry(1)
1128 obstr_idx = get_entry(2)
1129 generation = 0 # PDF spec table 18, generation is 0
1130 if not used_before(num, generation):
1131 self.xref_objStm[num] = (objstr_num, obstr_idx)
1132 elif self.strict:
1133 raise PdfReadError(f"Unknown xref type: {xref_type}")
1135 def _pairs(self, array: list[int]) -> Iterable[tuple[int, int]]:
1136 """Iterate over pairs in the array."""
1137 i = 0
1138 while i + 1 < len(array):
1139 yield array[i], array[i + 1]
1140 i += 2
1142 def decrypt(self, password: Union[str, bytes]) -> PasswordType:
1143 """
1144 When using an encrypted / secured PDF file with the PDF Standard
1145 encryption handler, this function will allow the file to be decrypted.
1146 It checks the given password against the document's user password and
1147 owner password, and then stores the resulting decryption key if either
1148 password is correct.
1150 It does not matter which password was matched. Both passwords provide
1151 the correct decryption key that will allow the document to be used with
1152 this library.
1154 Args:
1155 password: The password to match.
1157 Returns:
1158 An indicator if the document was decrypted and whether it was the
1159 owner password or the user password.
1161 """
1162 if not self._encryption:
1163 raise PdfReadError("Not encrypted file")
1164 # TODO: raise Exception for wrong password
1165 return self._encryption.verify(password)
1167 @property
1168 def is_encrypted(self) -> bool:
1169 """
1170 Read-only boolean property showing whether this PDF file is encrypted.
1172 Note that this property, if true, will remain true even after the
1173 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called.
1174 """
1175 return TK.ENCRYPT in self.trailer
1177 def add_form_topname(self, name: str) -> Optional[DictionaryObject]:
1178 """
1179 Add a top level form that groups all form fields below it.
1181 Args:
1182 name: text string of the "/T" Attribute of the created object
1184 Returns:
1185 The created object. ``None`` means no object was created.
1187 """
1188 catalog = self.root_object
1190 if "/AcroForm" not in catalog or not isinstance(
1191 catalog["/AcroForm"], DictionaryObject
1192 ):
1193 return None
1194 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")])
1195 if "/Fields" not in acroform:
1196 # TODO: No error but this may be extended for XFA Forms
1197 return None
1199 interim = DictionaryObject()
1200 interim[NameObject("/T")] = TextStringObject(name)
1201 interim[NameObject("/Kids")] = acroform[NameObject("/Fields")]
1202 self.cache_indirect_object(
1203 0,
1204 max(i for (g, i) in self.resolved_objects if g == 0) + 1,
1205 interim,
1206 )
1207 arr = ArrayObject()
1208 arr.append(interim.indirect_reference)
1209 acroform[NameObject("/Fields")] = arr
1210 for o in cast(ArrayObject, interim["/Kids"]):
1211 obj = o.get_object()
1212 if "/Parent" in obj:
1213 logger_warning(
1214 f"Top Level Form Field {obj.indirect_reference} have a non-expected parent",
1215 __name__,
1216 )
1217 obj[NameObject("/Parent")] = interim.indirect_reference
1218 return interim
1220 def rename_form_topname(self, name: str) -> Optional[DictionaryObject]:
1221 """
1222 Rename top level form field that all form fields below it.
1224 Args:
1225 name: text string of the "/T" field of the created object
1227 Returns:
1228 The modified object. ``None`` means no object was modified.
1230 """
1231 catalog = self.root_object
1233 if "/AcroForm" not in catalog or not isinstance(
1234 catalog["/AcroForm"], DictionaryObject
1235 ):
1236 return None
1237 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")])
1238 if "/Fields" not in acroform:
1239 return None
1241 interim = cast(
1242 DictionaryObject,
1243 cast(ArrayObject, acroform[NameObject("/Fields")])[0].get_object(),
1244 )
1245 interim[NameObject("/T")] = TextStringObject(name)
1246 return interim
1248 def _repr_mimebundle_(
1249 self,
1250 include: Union[None, Iterable[str]] = None,
1251 exclude: Union[None, Iterable[str]] = None,
1252 ) -> dict[str, Any]:
1253 """
1254 Integration into Jupyter Notebooks.
1256 This method returns a dictionary that maps a mime-type to its
1257 representation.
1259 .. seealso::
1261 https://ipython.readthedocs.io/en/stable/config/integrating.html
1262 """
1263 self.stream.seek(0)
1264 pdf_data = self.stream.read()
1265 data = {
1266 "application/pdf": pdf_data,
1267 }
1269 if include is not None:
1270 # Filter representations based on include list
1271 data = {k: v for k, v in data.items() if k in include}
1273 if exclude is not None:
1274 # Remove representations based on exclude list
1275 data = {k: v for k, v in data.items() if k not in exclude}
1277 return data