Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 87%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import itertools
2import logging
3import re
4import struct
5from collections.abc import Callable, Iterable, Iterator, KeysView, Sequence
6from hashlib import md5, sha256, sha384, sha512
7from typing import (
8 Any,
9 ClassVar,
10 cast,
11)
13from cryptography.hazmat.backends import default_backend
14from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
16from pdfminer import settings
17from pdfminer.arcfour import Arcfour
18from pdfminer.casting import safe_int
19from pdfminer.data_structures import NumberTree
20from pdfminer.pdfexceptions import (
21 PDFException,
22 PDFKeyError,
23 PDFObjectNotFound,
24 PDFTypeError,
25)
26from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError
27from pdfminer.pdftypes import (
28 DecipherCallable,
29 PDFStream,
30 decipher_all,
31 dict_value,
32 int_value,
33 list_value,
34 str_value,
35 stream_value,
36 uint_value,
37)
38from pdfminer.psexceptions import PSEOF
39from pdfminer.psparser import KWD, LIT, literal_name
40from pdfminer.utils import (
41 choplist,
42 decode_text,
43 format_int_alpha,
44 format_int_roman,
45 nunpack,
46 unpad_aes,
47)
49log = logging.getLogger(__name__)
52class PDFNoValidXRef(PDFSyntaxError):
53 pass
56class PDFNoValidXRefWarning(SyntaxWarning):
57 """Legacy warning for missing xref.
59 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
60 """
63class PDFNoOutlines(PDFException):
64 pass
67class PDFNoPageLabels(PDFException):
68 pass
71class PDFDestinationNotFound(PDFException):
72 pass
75class PDFEncryptionError(PDFException):
76 pass
79class PDFPasswordIncorrect(PDFEncryptionError):
80 pass
83class PDFEncryptionWarning(UserWarning):
84 """Legacy warning for failed decryption.
86 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
87 """
90class PDFTextExtractionNotAllowedWarning(UserWarning):
91 """Legacy warning for PDF that does not allow extraction.
93 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
94 """
97class PDFTextExtractionNotAllowed(PDFEncryptionError):
98 pass
101# some predefined literals and keywords.
102LITERAL_OBJSTM = LIT("ObjStm")
103LITERAL_XREF = LIT("XRef")
104LITERAL_CATALOG = LIT("Catalog")
107class PDFBaseXRef:
108 def get_trailer(self) -> dict[str, Any]:
109 raise NotImplementedError
111 def get_objids(self) -> Iterable[int]:
112 return []
114 # Must return
115 # (strmid, index, genno)
116 # or (None, pos, genno)
117 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
118 raise PDFKeyError(objid)
120 def load(self, parser: PDFParser) -> None:
121 raise NotImplementedError
124class PDFXRef(PDFBaseXRef):
125 def __init__(self) -> None:
126 self.offsets: dict[int, tuple[int | None, int, int]] = {}
127 self.trailer: dict[str, Any] = {}
129 def __repr__(self) -> str:
130 return f"<PDFXRef: offsets={self.offsets.keys()!r}>"
132 def load(self, parser: PDFParser) -> None:
133 while True:
134 try:
135 (pos, line) = parser.nextline()
136 line = line.strip()
137 if not line:
138 continue
139 except PSEOF as err:
140 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err
141 if line.startswith(b"trailer"):
142 parser.seek(pos)
143 break
144 f = line.split(b" ")
145 if len(f) != 2:
146 error_msg = f"Trailer not found: {parser!r}: line={line!r}"
147 raise PDFNoValidXRef(error_msg)
148 try:
149 (start, nobjs) = map(int, f)
150 except ValueError as err:
151 error_msg = f"Invalid line: {parser!r}: line={line!r}"
152 raise PDFNoValidXRef(error_msg) from err
153 for objid in range(start, start + nobjs):
154 try:
155 (_, line) = parser.nextline()
156 line = line.strip()
157 except PSEOF as err:
158 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err
159 f = line.split(b" ")
160 if len(f) != 3:
161 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}"
162 raise PDFNoValidXRef(error_msg)
163 (pos_b, genno_b, use_b) = f
164 if use_b != b"n":
165 continue
167 pos_i = safe_int(pos_b)
168 genno_i = safe_int(genno_b)
169 if pos_i is not None and genno_i is not None:
170 self.offsets[objid] = (None, pos_i, genno_i)
171 else:
172 log.warning(
173 "Not adding object %s to xref because position %r "
174 "or generation number %r cannot be parsed as an int",
175 objid,
176 pos_b,
177 genno_b,
178 )
180 log.debug("xref objects: %r", self.offsets)
181 self.load_trailer(parser)
183 def load_trailer(self, parser: PDFParser) -> None:
184 try:
185 (_, kwd) = parser.nexttoken()
186 assert kwd is KWD(b"trailer"), str(kwd)
187 (_, dic) = parser.nextobject()
188 except PSEOF:
189 x = parser.pop(1)
190 if not x:
191 raise PDFNoValidXRef("Unexpected EOF - file corrupted") from None
192 (_, dic) = x[0]
193 self.trailer.update(dict_value(dic))
194 log.debug("trailer=%r", self.trailer)
196 def get_trailer(self) -> dict[str, Any]:
197 return self.trailer
199 def get_objids(self) -> KeysView[int]:
200 return self.offsets.keys()
202 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
203 return self.offsets[objid]
206class PDFXRefFallback(PDFXRef):
207 def __repr__(self) -> str:
208 return f"<PDFXRefFallback: offsets={self.offsets.keys()!r}>"
210 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b")
212 def load(self, parser: PDFParser) -> None:
213 parser.seek(0)
214 while 1:
215 try:
216 (pos, line_bytes) = parser.nextline()
217 except PSEOF:
218 break
219 if line_bytes.startswith(b"trailer"):
220 parser.seek(pos)
221 self.load_trailer(parser)
222 log.debug("trailer: %r", self.trailer)
223 break
224 line = line_bytes.decode("latin-1") # default pdf encoding
225 m = self.PDFOBJ_CUE.match(line)
226 if not m:
227 continue
228 (objid_s, genno_s) = m.groups()
229 objid = int(objid_s)
230 genno = int(genno_s)
231 self.offsets[objid] = (None, pos, genno)
232 # expand ObjStm.
233 parser.seek(pos)
234 (_, obj) = parser.nextobject()
235 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM:
236 stream = stream_value(obj)
237 try:
238 n = stream["N"]
239 except KeyError:
240 if settings.STRICT:
241 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None
242 n = 0
243 parser1 = PDFStreamParser(stream.get_data())
244 objs: list[int] = []
245 try:
246 while 1:
247 (_, obj) = parser1.nextobject()
248 objs.append(cast(int, obj))
249 except PSEOF:
250 pass
251 n = min(n, len(objs) // 2)
252 for index in range(n):
253 objid1 = objs[index * 2]
254 self.offsets[objid1] = (objid, index, 0)
257class PDFXRefStream(PDFBaseXRef):
258 def __init__(self) -> None:
259 self.data: bytes | None = None
260 self.entlen: int | None = None
261 self.fl1: int | None = None
262 self.fl2: int | None = None
263 self.fl3: int | None = None
264 self.ranges: list[tuple[int, int]] = []
266 def __repr__(self) -> str:
267 return f"<PDFXRefStream: ranges={self.ranges!r}>"
269 def load(self, parser: PDFParser) -> None:
270 (_, _objid) = parser.nexttoken() # ignored
271 (_, _genno) = parser.nexttoken() # ignored
272 (_, _kwd) = parser.nexttoken()
273 (_, stream) = parser.nextobject()
274 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF:
275 raise PDFNoValidXRef("Invalid PDF stream spec.")
276 size = stream["Size"]
277 index_array = stream.get("Index", (0, size))
278 if len(index_array) % 2 != 0:
279 raise PDFSyntaxError("Invalid index number")
280 self.ranges.extend(cast(Iterator[tuple[int, int]], choplist(2, index_array)))
281 (self.fl1, self.fl2, self.fl3) = stream["W"]
282 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
283 self.data = stream.get_data()
284 self.entlen = self.fl1 + self.fl2 + self.fl3
285 self.trailer = stream.attrs
286 log.debug(
287 "xref stream: objid=%s, fields=%d,%d,%d",
288 ", ".join(map(repr, self.ranges)),
289 self.fl1,
290 self.fl2,
291 self.fl3,
292 )
294 def get_trailer(self) -> dict[str, Any]:
295 return self.trailer
297 def get_objids(self) -> Iterator[int]:
298 for start, nobjs in self.ranges:
299 for i in range(nobjs):
300 assert self.entlen is not None
301 assert self.data is not None
302 offset = self.entlen * i
303 ent = self.data[offset : offset + self.entlen]
304 f1 = nunpack(ent[: self.fl1], 1)
305 if f1 == 1 or f1 == 2:
306 yield start + i
308 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
309 index = 0
310 for start, nobjs in self.ranges:
311 if start <= objid and objid < start + nobjs:
312 index += objid - start
313 break
314 else:
315 index += nobjs
316 else:
317 raise PDFKeyError(objid)
318 assert self.entlen is not None
319 assert self.data is not None
320 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
321 offset = self.entlen * index
322 ent = self.data[offset : offset + self.entlen]
323 f1 = nunpack(ent[: self.fl1], 1)
324 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2])
325 f3 = nunpack(ent[self.fl1 + self.fl2 :])
326 if f1 == 1:
327 return (None, f2, f3)
328 elif f1 == 2:
329 return (f2, f3, 0)
330 else:
331 # this is a free object
332 raise PDFKeyError(objid)
335class PDFStandardSecurityHandler:
336 PASSWORD_PADDING = (
337 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz"
338 )
339 supported_revisions: tuple[int, ...] = (2, 3)
341 def __init__(
342 self,
343 docid: Sequence[bytes],
344 param: dict[str, Any],
345 password: str = "",
346 ) -> None:
347 self.docid = docid
348 self.param = param
349 self.password = password
350 self.init()
352 def init(self) -> None:
353 self.init_params()
354 if self.r not in self.supported_revisions:
355 error_msg = f"Unsupported revision: param={self.param!r}"
356 raise PDFEncryptionError(error_msg)
357 self.init_key()
359 def init_params(self) -> None:
360 self.v = int_value(self.param.get("V", 0))
361 self.r = int_value(self.param["R"])
362 self.p = uint_value(self.param["P"], 32)
363 self.o = str_value(self.param["O"])
364 self.u = str_value(self.param["U"])
365 self.length = int_value(self.param.get("Length", 40))
367 def init_key(self) -> None:
368 self.key = self.authenticate(self.password)
369 if self.key is None:
370 raise PDFPasswordIncorrect
372 def is_printable(self) -> bool:
373 return bool(self.p & 4)
375 def is_modifiable(self) -> bool:
376 return bool(self.p & 8)
378 def is_extractable(self) -> bool:
379 return bool(self.p & 16)
381 def compute_u(self, key: bytes) -> bytes:
382 if self.r == 2:
383 # Algorithm 3.4
384 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2
385 else:
386 # Algorithm 3.5
387 hash = md5(self.PASSWORD_PADDING) # 2
388 hash.update(self.docid[0]) # 3
389 result = Arcfour(key).encrypt(hash.digest()) # 4
390 for i in range(1, 20): # 5
391 k = b"".join(bytes((c ^ i,)) for c in iter(key))
392 result = Arcfour(k).encrypt(result)
393 result += result # 6
394 return result
396 def compute_encryption_key(self, password: bytes) -> bytes:
397 # Algorithm 3.2
398 password = (password + self.PASSWORD_PADDING)[:32] # 1
399 hash = md5(password) # 2
400 hash.update(self.o) # 3
401 # See https://github.com/pdfminer/pdfminer.six/issues/186
402 hash.update(struct.pack("<L", self.p)) # 4
403 hash.update(self.docid[0]) # 5
404 if (
405 self.r >= 4
406 and not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata
407 ):
408 hash.update(b"\xff\xff\xff\xff")
409 result = hash.digest()
410 n = 5
411 if self.r >= 3:
412 n = self.length // 8
413 for _ in range(50):
414 result = md5(result[:n]).digest()
415 return result[:n]
417 def authenticate(self, password: str) -> bytes | None:
418 password_bytes = password.encode("latin1")
419 key = self.authenticate_user_password(password_bytes)
420 if key is None:
421 key = self.authenticate_owner_password(password_bytes)
422 return key
424 def authenticate_user_password(self, password: bytes) -> bytes | None:
425 key = self.compute_encryption_key(password)
426 if self.verify_encryption_key(key):
427 return key
428 else:
429 return None
431 def verify_encryption_key(self, key: bytes) -> bool:
432 # Algorithm 3.6
433 u = self.compute_u(key)
434 if self.r == 2:
435 return u == self.u
436 return u[:16] == self.u[:16]
438 def authenticate_owner_password(self, password: bytes) -> bytes | None:
439 # Algorithm 3.7
440 password = (password + self.PASSWORD_PADDING)[:32]
441 hash = md5(password)
442 if self.r >= 3:
443 for _ in range(50):
444 hash = md5(hash.digest())
445 n = 5
446 if self.r >= 3:
447 n = self.length // 8
448 key = hash.digest()[:n]
449 if self.r == 2:
450 user_password = Arcfour(key).decrypt(self.o)
451 else:
452 user_password = self.o
453 for i in range(19, -1, -1):
454 k = b"".join(bytes((c ^ i,)) for c in iter(key))
455 user_password = Arcfour(k).decrypt(user_password)
456 return self.authenticate_user_password(user_password)
458 def decrypt(
459 self,
460 objid: int,
461 genno: int,
462 data: bytes,
463 attrs: dict[str, Any] | None = None,
464 ) -> bytes:
465 return self.decrypt_rc4(objid, genno, data)
467 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes:
468 assert self.key is not None
469 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2]
470 hash = md5(key)
471 key = hash.digest()[: min(len(key), 16)]
472 return Arcfour(key).decrypt(data)
475class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler):
476 supported_revisions: tuple[int, ...] = (4,)
478 def init_params(self) -> None:
479 super().init_params()
480 self.length = 128
481 self.cf = dict_value(self.param.get("CF"))
482 self.stmf = literal_name(self.param["StmF"])
483 self.strf = literal_name(self.param["StrF"])
484 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True))
485 if self.stmf != self.strf:
486 error_msg = f"Unsupported crypt filter: param={self.param!r}"
487 raise PDFEncryptionError(error_msg)
488 self.cfm = {}
489 for k, v in self.cf.items():
490 f = self.get_cfm(literal_name(v["CFM"]))
491 if f is None:
492 error_msg = f"Unknown crypt filter method: param={self.param!r}"
493 raise PDFEncryptionError(error_msg)
494 self.cfm[k] = f
495 self.cfm["Identity"] = self.decrypt_identity
496 if self.strf not in self.cfm:
497 error_msg = f"Undefined crypt filter: param={self.param!r}"
498 raise PDFEncryptionError(error_msg)
500 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None:
501 if name == "V2":
502 return self.decrypt_rc4
503 elif name == "AESV2":
504 return self.decrypt_aes128
505 else:
506 return None
508 def decrypt(
509 self,
510 objid: int,
511 genno: int,
512 data: bytes,
513 attrs: dict[str, Any] | None = None,
514 name: str | None = None,
515 ) -> bytes:
516 if not self.encrypt_metadata and attrs is not None:
517 t = attrs.get("Type")
518 if t is not None and literal_name(t) == "Metadata":
519 return data
520 if name is None:
521 name = self.strf
522 return self.cfm[name](objid, genno, data)
524 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes:
525 return data
527 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes:
528 assert self.key is not None
529 key = (
530 self.key
531 + struct.pack("<L", objid)[:3]
532 + struct.pack("<L", genno)[:2]
533 + b"sAlT"
534 )
535 hash = md5(key)
536 key = hash.digest()[: min(len(key), 16)]
537 initialization_vector = data[:16]
538 ciphertext = data[16:]
539 cipher = Cipher(
540 algorithms.AES(key),
541 modes.CBC(initialization_vector),
542 backend=default_backend(),
543 ) # type: ignore
544 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
545 return unpad_aes(plaintext)
548class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4):
549 supported_revisions = (5, 6)
551 def init_params(self) -> None:
552 super().init_params()
553 self.length = 256
554 self.oe = str_value(self.param["OE"])
555 self.ue = str_value(self.param["UE"])
556 self.o_hash = self.o[:32]
557 self.o_validation_salt = self.o[32:40]
558 self.o_key_salt = self.o[40:]
559 self.u_hash = self.u[:32]
560 self.u_validation_salt = self.u[32:40]
561 self.u_key_salt = self.u[40:]
563 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None:
564 if name == "AESV3":
565 return self.decrypt_aes256
566 else:
567 return None
569 def authenticate(self, password: str) -> bytes | None:
570 password_b = self._normalize_password(password)
571 hash = self._password_hash(password_b, self.o_validation_salt, self.u)
572 if hash == self.o_hash:
573 hash = self._password_hash(password_b, self.o_key_salt, self.u)
574 cipher = Cipher(
575 algorithms.AES(hash),
576 modes.CBC(b"\0" * 16),
577 backend=default_backend(),
578 ) # type: ignore
579 return cipher.decryptor().update(self.oe) # type: ignore
580 hash = self._password_hash(password_b, self.u_validation_salt)
581 if hash == self.u_hash:
582 hash = self._password_hash(password_b, self.u_key_salt)
583 cipher = Cipher(
584 algorithms.AES(hash),
585 modes.CBC(b"\0" * 16),
586 backend=default_backend(),
587 ) # type: ignore
588 return cipher.decryptor().update(self.ue) # type: ignore
589 return None
591 def _normalize_password(self, password: str) -> bytes:
592 if self.r == 6:
593 # saslprep expects non-empty strings, apparently
594 if not password:
595 return b""
596 from pdfminer._saslprep import saslprep
598 password = saslprep(password)
599 return password.encode("utf-8")[:127]
601 def _password_hash(
602 self,
603 password: bytes,
604 salt: bytes,
605 vector: bytes | None = None,
606 ) -> bytes:
607 """Compute password hash depending on revision number"""
608 if self.r == 5:
609 return self._r5_password(password, salt, vector)
610 return self._r6_password(password, salt[0:8], vector)
612 def _r5_password(
613 self,
614 password: bytes,
615 salt: bytes,
616 vector: bytes | None = None,
617 ) -> bytes:
618 """Compute the password for revision 5"""
619 hash = sha256(password)
620 hash.update(salt)
621 if vector is not None:
622 hash.update(vector)
623 return hash.digest()
625 def _r6_password(
626 self,
627 password: bytes,
628 salt: bytes,
629 vector: bytes | None = None,
630 ) -> bytes:
631 """Compute the password for revision 6"""
632 initial_hash = sha256(password)
633 initial_hash.update(salt)
634 if vector is not None:
635 initial_hash.update(vector)
636 k = initial_hash.digest()
637 hashes = (sha256, sha384, sha512)
638 round_no = last_byte_val = 0
639 while round_no < 64 or last_byte_val > round_no - 32:
640 k1 = (password + k + (vector or b"")) * 64
641 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1)
642 # compute the first 16 bytes of e,
643 # interpreted as an unsigned integer mod 3
644 next_hash = hashes[self._bytes_mod_3(e[:16])]
645 k = next_hash(e).digest()
646 last_byte_val = e[len(e) - 1]
647 round_no += 1
648 return k[:32]
650 @staticmethod
651 def _bytes_mod_3(input_bytes: bytes) -> int:
652 # 256 is 1 mod 3, so we can just sum 'em
653 return sum(b % 3 for b in input_bytes) % 3
655 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes:
656 cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
657 encryptor = cipher.encryptor() # type: ignore
658 return encryptor.update(data) + encryptor.finalize() # type: ignore
660 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes:
661 initialization_vector = data[:16]
662 ciphertext = data[16:]
663 assert self.key is not None
664 cipher = Cipher(
665 algorithms.AES(self.key),
666 modes.CBC(initialization_vector),
667 backend=default_backend(),
668 ) # type: ignore
669 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
670 return unpad_aes(plaintext)
673class PDFDocument:
674 """PDFDocument object represents a PDF document.
676 Since a PDF file can be very big, normally it is not loaded at
677 once. So PDF document has to cooperate with a PDF parser in order to
678 dynamically import the data as processing goes.
680 Typical usage:
681 doc = PDFDocument(parser, password)
682 obj = doc.getobj(objid)
684 """
686 security_handler_registry: ClassVar[dict[int, type[PDFStandardSecurityHandler]]] = {
687 1: PDFStandardSecurityHandler,
688 2: PDFStandardSecurityHandler,
689 4: PDFStandardSecurityHandlerV4,
690 5: PDFStandardSecurityHandlerV5,
691 }
693 def __init__(
694 self,
695 parser: PDFParser,
696 password: str = "",
697 caching: bool = True,
698 fallback: bool = True,
699 ) -> None:
700 """Set the document to use a given PDFParser object."""
701 self.caching = caching
702 self.xrefs: list[PDFBaseXRef] = []
703 self.info = []
704 self.catalog: dict[str, Any] = {}
705 self.encryption: tuple[Any, Any] | None = None
706 self.decipher: DecipherCallable | None = None
707 self._parser = None
708 self._cached_objs: dict[int, tuple[object, int]] = {}
709 self._parsed_objs: dict[int, tuple[list[object], int]] = {}
710 self._parser = parser
711 self._parser.set_document(self)
712 self.is_printable = self.is_modifiable = self.is_extractable = True
713 # Retrieve the information of each header that was appended
714 # (maybe multiple times) at the end of the document.
715 self._xrefpos: set[int] = set()
716 try:
717 pos = self.find_xref(parser)
718 self.read_xref_from(parser, pos, self.xrefs)
719 except PDFNoValidXRef:
720 if fallback:
721 parser.fallback = True
722 newxref = PDFXRefFallback()
723 newxref.load(parser)
724 self.xrefs.append(newxref)
726 for xref in self.xrefs:
727 trailer = xref.get_trailer()
728 if not trailer:
729 continue
730 # If there's an encryption info, remember it.
731 if "Encrypt" in trailer:
732 # Some documents may not have a /ID, use two empty
733 # byte strings instead. Solves
734 # https://github.com/pdfminer/pdfminer.six/issues/594
735 id_value = list_value(trailer["ID"]) if "ID" in trailer else (b"", b"")
736 self.encryption = (id_value, dict_value(trailer["Encrypt"]))
737 self._initialize_password(password)
738 if "Info" in trailer:
739 self.info.append(dict_value(trailer["Info"]))
740 if "Root" in trailer:
741 # Every PDF file must have exactly one /Root dictionary.
742 self.catalog = dict_value(trailer["Root"])
743 break
744 else:
745 raise PDFSyntaxError("No /Root object! - Is this really a PDF?")
746 if self.catalog.get("Type") is not LITERAL_CATALOG and settings.STRICT:
747 raise PDFSyntaxError("Catalog not found!")
749 KEYWORD_OBJ = KWD(b"obj")
751 # _initialize_password(password=b'')
752 # Perform the initialization with a given password.
753 def _initialize_password(self, password: str = "") -> None:
754 assert self.encryption is not None
755 (docid, param) = self.encryption
756 if literal_name(param.get("Filter")) != "Standard":
757 raise PDFEncryptionError(f"Unknown filter: param={param!r}")
758 v = int_value(param.get("V", 0))
759 factory = self.security_handler_registry.get(v)
760 if factory is None:
761 raise PDFEncryptionError(f"Unknown algorithm: param={param!r}")
762 handler = factory(docid, param, password)
763 self.decipher = handler.decrypt
764 self.is_printable = handler.is_printable()
765 self.is_modifiable = handler.is_modifiable()
766 self.is_extractable = handler.is_extractable()
767 assert self._parser is not None
768 self._parser.fallback = False # need to read streams with exact length
770 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object:
771 if stream.objid in self._parsed_objs:
772 (objs, n) = self._parsed_objs[stream.objid]
773 else:
774 (objs, n) = self._get_objects(stream)
775 if self.caching:
776 assert stream.objid is not None
777 self._parsed_objs[stream.objid] = (objs, n)
778 i = n * 2 + index
779 try:
780 obj = objs[i]
781 except IndexError as err:
782 raise PDFSyntaxError(f"index too big: {index!r}") from err
783 return obj
785 def _get_objects(self, stream: PDFStream) -> tuple[list[object], int]:
786 if stream.get("Type") is not LITERAL_OBJSTM and settings.STRICT:
787 raise PDFSyntaxError(f"Not a stream object: {stream!r}")
788 try:
789 n = cast(int, stream["N"])
790 except KeyError:
791 if settings.STRICT:
792 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None
793 n = 0
794 parser = PDFStreamParser(stream.get_data())
795 parser.set_document(self)
796 objs: list[object] = []
797 try:
798 while 1:
799 (_, obj) = parser.nextobject()
800 objs.append(obj)
801 except PSEOF:
802 pass
803 return (objs, n)
805 def _getobj_parse(self, pos: int, objid: int) -> object:
806 assert self._parser is not None
807 self._parser.seek(pos)
808 (_, objid1) = self._parser.nexttoken() # objid
809 (_, _genno) = self._parser.nexttoken() # genno
810 (_, kwd) = self._parser.nexttoken()
811 # hack around malformed pdf files
812 # copied from https://github.com/jaepil/pdfminer3k/blob/master/
813 # pdfminer/pdfparser.py#L399
814 # to solve https://github.com/pdfminer/pdfminer.six/issues/56
815 # assert objid1 == objid, str((objid1, objid))
816 if objid1 != objid:
817 x = []
818 while kwd is not self.KEYWORD_OBJ:
819 (_, kwd) = self._parser.nexttoken()
820 x.append(kwd)
821 if len(x) >= 2:
822 objid1 = x[-2]
823 # #### end hack around malformed pdf files
824 if objid1 != objid:
825 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}")
827 if kwd != KWD(b"obj"):
828 raise PDFSyntaxError(f"Invalid object spec: offset={pos!r}")
829 (_, obj) = self._parser.nextobject()
830 return obj
832 # can raise PDFObjectNotFound
833 def getobj(self, objid: int) -> object:
834 """Get object from PDF
836 :raises PDFException if PDFDocument is not initialized
837 :raises PDFObjectNotFound if objid does not exist in PDF
838 """
839 if not self.xrefs:
840 raise PDFException("PDFDocument is not initialized")
841 log.debug("getobj: objid=%r", objid)
842 obj: object # Initialize to satisfy mypy; always assigned in branches below
843 genno: int
844 if objid in self._cached_objs:
845 (obj, genno) = self._cached_objs[objid]
846 else:
847 for xref in self.xrefs:
848 try:
849 (strmid, index, genno) = xref.get_pos(objid)
850 except KeyError:
851 continue
852 try:
853 if strmid is not None:
854 stream = stream_value(self.getobj(strmid))
855 obj = self._getobj_objstm(stream, index, objid)
856 else:
857 obj = self._getobj_parse(index, objid)
858 if self.decipher:
859 obj = decipher_all(self.decipher, objid, genno, obj)
861 if isinstance(obj, PDFStream):
862 obj.set_objid(objid, genno)
863 break
864 except (PSEOF, PDFSyntaxError):
865 continue
866 else:
867 raise PDFObjectNotFound(objid)
868 log.debug("register: objid=%r: %r", objid, obj)
869 if self.caching:
870 self._cached_objs[objid] = (obj, genno)
871 return obj
873 OutlineType = tuple[Any, Any, Any, Any, Any]
875 def get_outlines(self) -> Iterator[OutlineType]:
876 if "Outlines" not in self.catalog:
877 raise PDFNoOutlines
879 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]:
880 entry = dict_value(entry)
881 if "Title" in entry and ("A" in entry or "Dest" in entry):
882 title = decode_text(str_value(entry["Title"]))
883 dest = entry.get("Dest")
884 action = entry.get("A")
885 se = entry.get("SE")
886 yield (level, title, dest, action, se)
887 if "First" in entry and "Last" in entry:
888 yield from search(entry["First"], level + 1)
889 if "Next" in entry:
890 yield from search(entry["Next"], level)
892 return search(self.catalog["Outlines"], 0)
894 def get_page_labels(self) -> Iterator[str]:
895 """Generate page label strings for the PDF document.
897 If the document includes page labels, generates strings, one per page.
898 If not, raises PDFNoPageLabels.
900 The resulting iteration is unbounded.
901 """
902 assert self.catalog is not None
904 try:
905 page_labels = PageLabels(self.catalog["PageLabels"])
906 except (PDFTypeError, KeyError) as err:
907 raise PDFNoPageLabels from err
909 return page_labels.labels
911 def lookup_name(self, cat: str, key: str | bytes) -> Any:
912 try:
913 names = dict_value(self.catalog["Names"])
914 except (PDFTypeError, KeyError) as err:
915 raise PDFKeyError((cat, key)) from err
916 # may raise KeyError
917 d0 = dict_value(names[cat])
919 def lookup(d: dict[str, Any]) -> Any:
920 if "Limits" in d:
921 (k1, k2) = list_value(d["Limits"])
922 if key < k1 or k2 < key:
923 return None
924 if "Names" in d:
925 objs = list_value(d["Names"])
926 names = dict(
927 cast(Iterator[tuple[str | bytes, Any]], choplist(2, objs)),
928 )
929 return names[key]
930 if "Kids" in d:
931 for c in list_value(d["Kids"]):
932 v = lookup(dict_value(c))
933 if v:
934 return v
935 raise PDFKeyError((cat, key))
937 return lookup(d0)
939 def get_dest(self, name: str | bytes) -> Any:
940 try:
941 # PDF-1.2 or later
942 obj = self.lookup_name("Dests", name)
943 except KeyError:
944 # PDF-1.1 or prior
945 if "Dests" not in self.catalog:
946 raise PDFDestinationNotFound(name) from None
947 d0 = dict_value(self.catalog["Dests"])
948 if name not in d0:
949 raise PDFDestinationNotFound(name) from None
950 obj = d0[name]
951 return obj
953 # find_xref
954 def find_xref(self, parser: PDFParser) -> int:
955 """Internal function used to locate the first XRef."""
956 # search the last xref table by scanning the file backwards.
957 prev = b""
958 for line in parser.revreadlines():
959 line = line.strip()
960 log.debug("find_xref: %r", line)
962 if line == b"startxref":
963 log.debug("xref found: pos=%r", prev)
965 if not prev.isdigit():
966 raise PDFNoValidXRef(f"Invalid xref position, no digit: {prev!r}")
968 start = int(prev)
970 if not start >= 0:
971 raise PDFNoValidXRef(f"Invalid xref position, negative: {start}")
973 # The xref start needs to fit in a C ssize_t to be a proper file offset
974 if start >= 2**31:
975 raise PDFNoValidXRef(f"Invalid xref position, too large: {start!r}")
977 return start
979 if line:
980 prev = line
982 raise PDFNoValidXRef("Unexpected EOF")
984 # read xref table
985 def read_xref_from(
986 self,
987 parser: PDFParser,
988 start: int,
989 xrefs: list[PDFBaseXRef],
990 ) -> None:
991 """Reads XRefs from the given location."""
992 if start in self._xrefpos:
993 raise PDFNoValidXRef(f"Detected circular xref chain at {start}")
994 return
995 self._xrefpos.add(start)
996 parser.seek(start)
997 parser.reset()
998 try:
999 (pos, token) = parser.nexttoken()
1000 except PSEOF as err:
1001 raise PDFNoValidXRef("Unexpected EOF") from err
1002 log.debug("read_xref_from: start=%d, token=%r", start, token)
1003 if isinstance(token, int):
1004 # XRefStream: PDF-1.5
1005 parser.seek(pos)
1006 parser.reset()
1007 xref: PDFBaseXRef = PDFXRefStream()
1008 xref.load(parser)
1009 else:
1010 if token is parser.KEYWORD_XREF:
1011 parser.nextline()
1012 xref = PDFXRef()
1013 xref.load(parser)
1014 xrefs.append(xref)
1015 trailer = xref.get_trailer()
1016 log.debug("trailer: %r", trailer)
1017 if "XRefStm" in trailer:
1018 pos = int_value(trailer["XRefStm"])
1019 self.read_xref_from(parser, pos, xrefs)
1020 if "Prev" in trailer:
1021 # find previous xref
1022 pos = int_value(trailer["Prev"])
1023 self.read_xref_from(parser, pos, xrefs)
1026class PageLabels(NumberTree):
1027 """PageLabels from the document catalog.
1029 See Section 8.3.1 in the PDF Reference.
1030 """
1032 @property
1033 def labels(self) -> Iterator[str]:
1034 ranges = self.values
1036 # The tree must begin with page index 0
1037 if len(ranges) == 0 or ranges[0][0] != 0:
1038 if settings.STRICT:
1039 raise PDFSyntaxError("PageLabels is missing page index 0")
1040 else:
1041 # Try to cope, by assuming empty labels for the initial pages
1042 ranges.insert(0, (0, {}))
1044 for next, (start, label_dict_unchecked) in enumerate(ranges, 1):
1045 label_dict = dict_value(label_dict_unchecked)
1046 style = label_dict.get("S")
1047 prefix = decode_text(str_value(label_dict.get("P", b"")))
1048 first_value = int_value(label_dict.get("St", 1))
1050 if next == len(ranges):
1051 # This is the last specified range. It continues until the end
1052 # of the document.
1053 values: Iterable[int] = itertools.count(first_value)
1054 else:
1055 end, _ = ranges[next]
1056 range_length = end - start
1057 values = range(first_value, first_value + range_length)
1059 for value in values:
1060 label = self._format_page_label(value, style)
1061 yield prefix + label
1063 @staticmethod
1064 def _format_page_label(value: int, style: Any) -> str:
1065 """Format page label value in a specific style"""
1066 if style is None:
1067 label = ""
1068 elif style is LIT("D"): # Decimal arabic numerals
1069 label = str(value)
1070 elif style is LIT("R"): # Uppercase roman numerals
1071 label = format_int_roman(value).upper()
1072 elif style is LIT("r"): # Lowercase roman numerals
1073 label = format_int_roman(value)
1074 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ...
1075 label = format_int_alpha(value).upper()
1076 elif style is LIT("a"): # Lowercase letters a-z, aa-zz...
1077 label = format_int_alpha(value)
1078 else:
1079 log.warning("Unknown page label style: %r", style)
1080 label = ""
1081 return label