Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 84%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import itertools
2import logging
3import re
4import struct
5from collections.abc import Callable, Iterable, Iterator, KeysView, Sequence
6from hashlib import md5, sha256, sha384, sha512
7from typing import (
8 Any,
9 ClassVar,
10 cast,
11)
13from cryptography.hazmat.backends import default_backend
14from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
16from pdfminer import settings
17from pdfminer.arcfour import Arcfour
18from pdfminer.casting import safe_int
19from pdfminer.data_structures import NumberTree
20from pdfminer.pdfexceptions import (
21 PDFException,
22 PDFKeyError,
23 PDFObjectNotFound,
24 PDFTypeError,
25)
26from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError
27from pdfminer.pdftypes import (
28 DecipherCallable,
29 PDFStream,
30 decipher_all,
31 dict_value,
32 int_value,
33 list_value,
34 str_value,
35 stream_value,
36 uint_value,
37)
38from pdfminer.psexceptions import PSEOF
39from pdfminer.psparser import KWD, LIT, literal_name
40from pdfminer.utils import (
41 choplist,
42 decode_text,
43 format_int_alpha,
44 format_int_roman,
45 nunpack,
46 unpad_aes,
47)
49log = logging.getLogger(__name__)
52class PDFNoValidXRef(PDFSyntaxError):
53 pass
56class PDFNoValidXRefWarning(SyntaxWarning):
57 """Legacy warning for missing xref.
59 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
60 """
63class PDFNoOutlines(PDFException):
64 pass
67class PDFNoPageLabels(PDFException):
68 pass
71class PDFDestinationNotFound(PDFException):
72 pass
75class PDFEncryptionError(PDFException):
76 pass
79class PDFPasswordIncorrect(PDFEncryptionError):
80 pass
83class PDFEncryptionWarning(UserWarning):
84 """Legacy warning for failed decryption.
86 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
87 """
90class PDFTextExtractionNotAllowedWarning(UserWarning):
91 """Legacy warning for PDF that does not allow extraction.
93 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
94 """
97class PDFTextExtractionNotAllowed(PDFEncryptionError):
98 pass
101# some predefined literals and keywords.
102LITERAL_OBJSTM = LIT("ObjStm")
103LITERAL_XREF = LIT("XRef")
104LITERAL_CATALOG = LIT("Catalog")
107class PDFBaseXRef:
108 def get_trailer(self) -> dict[str, Any]:
109 raise NotImplementedError
111 def get_objids(self) -> Iterable[int]:
112 return []
114 # Must return
115 # (strmid, index, genno)
116 # or (None, pos, genno)
117 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
118 raise PDFKeyError(objid)
120 def load(self, parser: PDFParser) -> None:
121 raise NotImplementedError
124class PDFXRef(PDFBaseXRef):
125 def __init__(self) -> None:
126 self.offsets: dict[int, tuple[int | None, int, int]] = {}
127 self.trailer: dict[str, Any] = {}
129 def __repr__(self) -> str:
130 return f"<PDFXRef: offsets={self.offsets.keys()!r}>"
132 def load(self, parser: PDFParser) -> None:
133 while True:
134 try:
135 (pos, line) = parser.nextline()
136 line = line.strip()
137 if not line:
138 continue
139 except PSEOF as err:
140 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err
141 if line.startswith(b"trailer"):
142 parser.seek(pos)
143 break
144 f = line.split(b" ")
145 if len(f) != 2:
146 error_msg = f"Trailer not found: {parser!r}: line={line!r}"
147 raise PDFNoValidXRef(error_msg)
148 try:
149 (start, nobjs) = map(int, f)
150 except ValueError as err:
151 error_msg = f"Invalid line: {parser!r}: line={line!r}"
152 raise PDFNoValidXRef(error_msg) from err
153 for objid in range(start, start + nobjs):
154 try:
155 (_, line) = parser.nextline()
156 line = line.strip()
157 except PSEOF as err:
158 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err
159 f = line.split(b" ")
160 if len(f) != 3:
161 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}"
162 raise PDFNoValidXRef(error_msg)
163 (pos_b, genno_b, use_b) = f
164 if use_b != b"n":
165 continue
167 pos_i = safe_int(pos_b)
168 genno_i = safe_int(genno_b)
169 if pos_i is not None and genno_i is not None:
170 self.offsets[objid] = (None, pos_i, genno_i)
171 else:
172 log.warning(
173 f"Not adding object {objid} to xref because position {pos_b!r} "
174 f"or generation number {genno_b!r} cannot be parsed as an int"
175 )
177 log.debug("xref objects: %r", self.offsets)
178 self.load_trailer(parser)
180 def load_trailer(self, parser: PDFParser) -> None:
181 try:
182 (_, kwd) = parser.nexttoken()
183 assert kwd is KWD(b"trailer"), str(kwd)
184 (_, dic) = parser.nextobject()
185 except PSEOF:
186 x = parser.pop(1)
187 if not x:
188 raise PDFNoValidXRef("Unexpected EOF - file corrupted") from None
189 (_, dic) = x[0]
190 self.trailer.update(dict_value(dic))
191 log.debug("trailer=%r", self.trailer)
193 def get_trailer(self) -> dict[str, Any]:
194 return self.trailer
196 def get_objids(self) -> KeysView[int]:
197 return self.offsets.keys()
199 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
200 return self.offsets[objid]
203class PDFXRefFallback(PDFXRef):
204 def __repr__(self) -> str:
205 return f"<PDFXRefFallback: offsets={self.offsets.keys()!r}>"
207 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b")
209 def load(self, parser: PDFParser) -> None:
210 parser.seek(0)
211 while 1:
212 try:
213 (pos, line_bytes) = parser.nextline()
214 except PSEOF:
215 break
216 if line_bytes.startswith(b"trailer"):
217 parser.seek(pos)
218 self.load_trailer(parser)
219 log.debug("trailer: %r", self.trailer)
220 break
221 line = line_bytes.decode("latin-1") # default pdf encoding
222 m = self.PDFOBJ_CUE.match(line)
223 if not m:
224 continue
225 (objid_s, genno_s) = m.groups()
226 objid = int(objid_s)
227 genno = int(genno_s)
228 self.offsets[objid] = (None, pos, genno)
229 # expand ObjStm.
230 parser.seek(pos)
231 (_, obj) = parser.nextobject()
232 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM:
233 stream = stream_value(obj)
234 try:
235 n = stream["N"]
236 except KeyError:
237 if settings.STRICT:
238 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None
239 n = 0
240 parser1 = PDFStreamParser(stream.get_data())
241 objs: list[int] = []
242 try:
243 while 1:
244 (_, obj) = parser1.nextobject()
245 objs.append(cast(int, obj))
246 except PSEOF:
247 pass
248 n = min(n, len(objs) // 2)
249 for index in range(n):
250 objid1 = objs[index * 2]
251 self.offsets[objid1] = (objid, index, 0)
254class PDFXRefStream(PDFBaseXRef):
255 def __init__(self) -> None:
256 self.data: bytes | None = None
257 self.entlen: int | None = None
258 self.fl1: int | None = None
259 self.fl2: int | None = None
260 self.fl3: int | None = None
261 self.ranges: list[tuple[int, int]] = []
263 def __repr__(self) -> str:
264 return f"<PDFXRefStream: ranges={self.ranges!r}>"
266 def load(self, parser: PDFParser) -> None:
267 (_, _objid) = parser.nexttoken() # ignored
268 (_, _genno) = parser.nexttoken() # ignored
269 (_, _kwd) = parser.nexttoken()
270 (_, stream) = parser.nextobject()
271 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF:
272 raise PDFNoValidXRef("Invalid PDF stream spec.")
273 size = stream["Size"]
274 index_array = stream.get("Index", (0, size))
275 if len(index_array) % 2 != 0:
276 raise PDFSyntaxError("Invalid index number")
277 self.ranges.extend(cast(Iterator[tuple[int, int]], choplist(2, index_array)))
278 (self.fl1, self.fl2, self.fl3) = stream["W"]
279 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
280 self.data = stream.get_data()
281 self.entlen = self.fl1 + self.fl2 + self.fl3
282 self.trailer = stream.attrs
283 log.debug(
284 "xref stream: objid=%s, fields=%d,%d,%d",
285 ", ".join(map(repr, self.ranges)),
286 self.fl1,
287 self.fl2,
288 self.fl3,
289 )
291 def get_trailer(self) -> dict[str, Any]:
292 return self.trailer
294 def get_objids(self) -> Iterator[int]:
295 for start, nobjs in self.ranges:
296 for i in range(nobjs):
297 assert self.entlen is not None
298 assert self.data is not None
299 offset = self.entlen * i
300 ent = self.data[offset : offset + self.entlen]
301 f1 = nunpack(ent[: self.fl1], 1)
302 if f1 == 1 or f1 == 2:
303 yield start + i
305 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
306 index = 0
307 for start, nobjs in self.ranges:
308 if start <= objid and objid < start + nobjs:
309 index += objid - start
310 break
311 else:
312 index += nobjs
313 else:
314 raise PDFKeyError(objid)
315 assert self.entlen is not None
316 assert self.data is not None
317 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
318 offset = self.entlen * index
319 ent = self.data[offset : offset + self.entlen]
320 f1 = nunpack(ent[: self.fl1], 1)
321 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2])
322 f3 = nunpack(ent[self.fl1 + self.fl2 :])
323 if f1 == 1:
324 return (None, f2, f3)
325 elif f1 == 2:
326 return (f2, f3, 0)
327 else:
328 # this is a free object
329 raise PDFKeyError(objid)
332class PDFStandardSecurityHandler:
333 PASSWORD_PADDING = (
334 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz"
335 )
336 supported_revisions: tuple[int, ...] = (2, 3)
338 def __init__(
339 self,
340 docid: Sequence[bytes],
341 param: dict[str, Any],
342 password: str = "",
343 ) -> None:
344 self.docid = docid
345 self.param = param
346 self.password = password
347 self.init()
349 def init(self) -> None:
350 self.init_params()
351 if self.r not in self.supported_revisions:
352 error_msg = f"Unsupported revision: param={self.param!r}"
353 raise PDFEncryptionError(error_msg)
354 self.init_key()
356 def init_params(self) -> None:
357 self.v = int_value(self.param.get("V", 0))
358 self.r = int_value(self.param["R"])
359 self.p = uint_value(self.param["P"], 32)
360 self.o = str_value(self.param["O"])
361 self.u = str_value(self.param["U"])
362 self.length = int_value(self.param.get("Length", 40))
364 def init_key(self) -> None:
365 self.key = self.authenticate(self.password)
366 if self.key is None:
367 raise PDFPasswordIncorrect
369 def is_printable(self) -> bool:
370 return bool(self.p & 4)
372 def is_modifiable(self) -> bool:
373 return bool(self.p & 8)
375 def is_extractable(self) -> bool:
376 return bool(self.p & 16)
378 def compute_u(self, key: bytes) -> bytes:
379 if self.r == 2:
380 # Algorithm 3.4
381 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2
382 else:
383 # Algorithm 3.5
384 hash = md5(self.PASSWORD_PADDING) # 2
385 hash.update(self.docid[0]) # 3
386 result = Arcfour(key).encrypt(hash.digest()) # 4
387 for i in range(1, 20): # 5
388 k = b"".join(bytes((c ^ i,)) for c in iter(key))
389 result = Arcfour(k).encrypt(result)
390 result += result # 6
391 return result
393 def compute_encryption_key(self, password: bytes) -> bytes:
394 # Algorithm 3.2
395 password = (password + self.PASSWORD_PADDING)[:32] # 1
396 hash = md5(password) # 2
397 hash.update(self.o) # 3
398 # See https://github.com/pdfminer/pdfminer.six/issues/186
399 hash.update(struct.pack("<L", self.p)) # 4
400 hash.update(self.docid[0]) # 5
401 if (
402 self.r >= 4
403 and not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata
404 ):
405 hash.update(b"\xff\xff\xff\xff")
406 result = hash.digest()
407 n = 5
408 if self.r >= 3:
409 n = self.length // 8
410 for _ in range(50):
411 result = md5(result[:n]).digest()
412 return result[:n]
414 def authenticate(self, password: str) -> bytes | None:
415 password_bytes = password.encode("latin1")
416 key = self.authenticate_user_password(password_bytes)
417 if key is None:
418 key = self.authenticate_owner_password(password_bytes)
419 return key
421 def authenticate_user_password(self, password: bytes) -> bytes | None:
422 key = self.compute_encryption_key(password)
423 if self.verify_encryption_key(key):
424 return key
425 else:
426 return None
428 def verify_encryption_key(self, key: bytes) -> bool:
429 # Algorithm 3.6
430 u = self.compute_u(key)
431 if self.r == 2:
432 return u == self.u
433 return u[:16] == self.u[:16]
435 def authenticate_owner_password(self, password: bytes) -> bytes | None:
436 # Algorithm 3.7
437 password = (password + self.PASSWORD_PADDING)[:32]
438 hash = md5(password)
439 if self.r >= 3:
440 for _ in range(50):
441 hash = md5(hash.digest())
442 n = 5
443 if self.r >= 3:
444 n = self.length // 8
445 key = hash.digest()[:n]
446 if self.r == 2:
447 user_password = Arcfour(key).decrypt(self.o)
448 else:
449 user_password = self.o
450 for i in range(19, -1, -1):
451 k = b"".join(bytes((c ^ i,)) for c in iter(key))
452 user_password = Arcfour(k).decrypt(user_password)
453 return self.authenticate_user_password(user_password)
455 def decrypt(
456 self,
457 objid: int,
458 genno: int,
459 data: bytes,
460 attrs: dict[str, Any] | None = None,
461 ) -> bytes:
462 return self.decrypt_rc4(objid, genno, data)
464 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes:
465 assert self.key is not None
466 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2]
467 hash = md5(key)
468 key = hash.digest()[: min(len(key), 16)]
469 return Arcfour(key).decrypt(data)
472class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler):
473 supported_revisions: tuple[int, ...] = (4,)
475 def init_params(self) -> None:
476 super().init_params()
477 self.length = 128
478 self.cf = dict_value(self.param.get("CF"))
479 self.stmf = literal_name(self.param["StmF"])
480 self.strf = literal_name(self.param["StrF"])
481 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True))
482 if self.stmf != self.strf:
483 error_msg = f"Unsupported crypt filter: param={self.param!r}"
484 raise PDFEncryptionError(error_msg)
485 self.cfm = {}
486 for k, v in self.cf.items():
487 f = self.get_cfm(literal_name(v["CFM"]))
488 if f is None:
489 error_msg = f"Unknown crypt filter method: param={self.param!r}"
490 raise PDFEncryptionError(error_msg)
491 self.cfm[k] = f
492 self.cfm["Identity"] = self.decrypt_identity
493 if self.strf not in self.cfm:
494 error_msg = f"Undefined crypt filter: param={self.param!r}"
495 raise PDFEncryptionError(error_msg)
497 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None:
498 if name == "V2":
499 return self.decrypt_rc4
500 elif name == "AESV2":
501 return self.decrypt_aes128
502 else:
503 return None
505 def decrypt(
506 self,
507 objid: int,
508 genno: int,
509 data: bytes,
510 attrs: dict[str, Any] | None = None,
511 name: str | None = None,
512 ) -> bytes:
513 if not self.encrypt_metadata and attrs is not None:
514 t = attrs.get("Type")
515 if t is not None and literal_name(t) == "Metadata":
516 return data
517 if name is None:
518 name = self.strf
519 return self.cfm[name](objid, genno, data)
521 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes:
522 return data
524 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes:
525 assert self.key is not None
526 key = (
527 self.key
528 + struct.pack("<L", objid)[:3]
529 + struct.pack("<L", genno)[:2]
530 + b"sAlT"
531 )
532 hash = md5(key)
533 key = hash.digest()[: min(len(key), 16)]
534 initialization_vector = data[:16]
535 ciphertext = data[16:]
536 cipher = Cipher(
537 algorithms.AES(key),
538 modes.CBC(initialization_vector),
539 backend=default_backend(),
540 ) # type: ignore
541 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
542 return unpad_aes(plaintext)
545class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4):
546 supported_revisions = (5, 6)
548 def init_params(self) -> None:
549 super().init_params()
550 self.length = 256
551 self.oe = str_value(self.param["OE"])
552 self.ue = str_value(self.param["UE"])
553 self.o_hash = self.o[:32]
554 self.o_validation_salt = self.o[32:40]
555 self.o_key_salt = self.o[40:]
556 self.u_hash = self.u[:32]
557 self.u_validation_salt = self.u[32:40]
558 self.u_key_salt = self.u[40:]
560 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None:
561 if name == "AESV3":
562 return self.decrypt_aes256
563 else:
564 return None
566 def authenticate(self, password: str) -> bytes | None:
567 password_b = self._normalize_password(password)
568 hash = self._password_hash(password_b, self.o_validation_salt, self.u)
569 if hash == self.o_hash:
570 hash = self._password_hash(password_b, self.o_key_salt, self.u)
571 cipher = Cipher(
572 algorithms.AES(hash),
573 modes.CBC(b"\0" * 16),
574 backend=default_backend(),
575 ) # type: ignore
576 return cipher.decryptor().update(self.oe) # type: ignore
577 hash = self._password_hash(password_b, self.u_validation_salt)
578 if hash == self.u_hash:
579 hash = self._password_hash(password_b, self.u_key_salt)
580 cipher = Cipher(
581 algorithms.AES(hash),
582 modes.CBC(b"\0" * 16),
583 backend=default_backend(),
584 ) # type: ignore
585 return cipher.decryptor().update(self.ue) # type: ignore
586 return None
588 def _normalize_password(self, password: str) -> bytes:
589 if self.r == 6:
590 # saslprep expects non-empty strings, apparently
591 if not password:
592 return b""
593 from pdfminer._saslprep import saslprep
595 password = saslprep(password)
596 return password.encode("utf-8")[:127]
598 def _password_hash(
599 self,
600 password: bytes,
601 salt: bytes,
602 vector: bytes | None = None,
603 ) -> bytes:
604 """Compute password hash depending on revision number"""
605 if self.r == 5:
606 return self._r5_password(password, salt, vector)
607 return self._r6_password(password, salt[0:8], vector)
609 def _r5_password(
610 self,
611 password: bytes,
612 salt: bytes,
613 vector: bytes | None = None,
614 ) -> bytes:
615 """Compute the password for revision 5"""
616 hash = sha256(password)
617 hash.update(salt)
618 if vector is not None:
619 hash.update(vector)
620 return hash.digest()
622 def _r6_password(
623 self,
624 password: bytes,
625 salt: bytes,
626 vector: bytes | None = None,
627 ) -> bytes:
628 """Compute the password for revision 6"""
629 initial_hash = sha256(password)
630 initial_hash.update(salt)
631 if vector is not None:
632 initial_hash.update(vector)
633 k = initial_hash.digest()
634 hashes = (sha256, sha384, sha512)
635 round_no = last_byte_val = 0
636 while round_no < 64 or last_byte_val > round_no - 32:
637 k1 = (password + k + (vector or b"")) * 64
638 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1)
639 # compute the first 16 bytes of e,
640 # interpreted as an unsigned integer mod 3
641 next_hash = hashes[self._bytes_mod_3(e[:16])]
642 k = next_hash(e).digest()
643 last_byte_val = e[len(e) - 1]
644 round_no += 1
645 return k[:32]
647 @staticmethod
648 def _bytes_mod_3(input_bytes: bytes) -> int:
649 # 256 is 1 mod 3, so we can just sum 'em
650 return sum(b % 3 for b in input_bytes) % 3
652 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes:
653 cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
654 encryptor = cipher.encryptor() # type: ignore
655 return encryptor.update(data) + encryptor.finalize() # type: ignore
657 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes:
658 initialization_vector = data[:16]
659 ciphertext = data[16:]
660 assert self.key is not None
661 cipher = Cipher(
662 algorithms.AES(self.key),
663 modes.CBC(initialization_vector),
664 backend=default_backend(),
665 ) # type: ignore
666 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
667 return unpad_aes(plaintext)
670class PDFDocument:
671 """PDFDocument object represents a PDF document.
673 Since a PDF file can be very big, normally it is not loaded at
674 once. So PDF document has to cooperate with a PDF parser in order to
675 dynamically import the data as processing goes.
677 Typical usage:
678 doc = PDFDocument(parser, password)
679 obj = doc.getobj(objid)
681 """
683 security_handler_registry: ClassVar[dict[int, type[PDFStandardSecurityHandler]]] = {
684 1: PDFStandardSecurityHandler,
685 2: PDFStandardSecurityHandler,
686 4: PDFStandardSecurityHandlerV4,
687 5: PDFStandardSecurityHandlerV5,
688 }
690 def __init__(
691 self,
692 parser: PDFParser,
693 password: str = "",
694 caching: bool = True,
695 fallback: bool = True,
696 ) -> None:
697 """Set the document to use a given PDFParser object."""
698 self.caching = caching
699 self.xrefs: list[PDFBaseXRef] = []
700 self.info = []
701 self.catalog: dict[str, Any] = {}
702 self.encryption: tuple[Any, Any] | None = None
703 self.decipher: DecipherCallable | None = None
704 self._parser = None
705 self._cached_objs: dict[int, tuple[object, int]] = {}
706 self._parsed_objs: dict[int, tuple[list[object], int]] = {}
707 self._parser = parser
708 self._parser.set_document(self)
709 self.is_printable = self.is_modifiable = self.is_extractable = True
710 # Retrieve the information of each header that was appended
711 # (maybe multiple times) at the end of the document.
712 try:
713 pos = self.find_xref(parser)
714 self.read_xref_from(parser, pos, self.xrefs)
715 except PDFNoValidXRef:
716 if fallback:
717 parser.fallback = True
718 newxref = PDFXRefFallback()
719 newxref.load(parser)
720 self.xrefs.append(newxref)
722 for xref in self.xrefs:
723 trailer = xref.get_trailer()
724 if not trailer:
725 continue
726 # If there's an encryption info, remember it.
727 if "Encrypt" in trailer:
728 # Some documents may not have a /ID, use two empty
729 # byte strings instead. Solves
730 # https://github.com/pdfminer/pdfminer.six/issues/594
731 id_value = list_value(trailer["ID"]) if "ID" in trailer else (b"", b"")
732 self.encryption = (id_value, dict_value(trailer["Encrypt"]))
733 self._initialize_password(password)
734 if "Info" in trailer:
735 self.info.append(dict_value(trailer["Info"]))
736 if "Root" in trailer:
737 # Every PDF file must have exactly one /Root dictionary.
738 self.catalog = dict_value(trailer["Root"])
739 break
740 else:
741 raise PDFSyntaxError("No /Root object! - Is this really a PDF?")
742 if self.catalog.get("Type") is not LITERAL_CATALOG and settings.STRICT:
743 raise PDFSyntaxError("Catalog not found!")
745 KEYWORD_OBJ = KWD(b"obj")
747 # _initialize_password(password=b'')
748 # Perform the initialization with a given password.
749 def _initialize_password(self, password: str = "") -> None:
750 assert self.encryption is not None
751 (docid, param) = self.encryption
752 if literal_name(param.get("Filter")) != "Standard":
753 raise PDFEncryptionError(f"Unknown filter: param={param!r}")
754 v = int_value(param.get("V", 0))
755 factory = self.security_handler_registry.get(v)
756 if factory is None:
757 raise PDFEncryptionError(f"Unknown algorithm: param={param!r}")
758 handler = factory(docid, param, password)
759 self.decipher = handler.decrypt
760 self.is_printable = handler.is_printable()
761 self.is_modifiable = handler.is_modifiable()
762 self.is_extractable = handler.is_extractable()
763 assert self._parser is not None
764 self._parser.fallback = False # need to read streams with exact length
766 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object:
767 if stream.objid in self._parsed_objs:
768 (objs, n) = self._parsed_objs[stream.objid]
769 else:
770 (objs, n) = self._get_objects(stream)
771 if self.caching:
772 assert stream.objid is not None
773 self._parsed_objs[stream.objid] = (objs, n)
774 i = n * 2 + index
775 try:
776 obj = objs[i]
777 except IndexError as err:
778 raise PDFSyntaxError(f"index too big: {index!r}") from err
779 return obj
781 def _get_objects(self, stream: PDFStream) -> tuple[list[object], int]:
782 if stream.get("Type") is not LITERAL_OBJSTM and settings.STRICT:
783 raise PDFSyntaxError(f"Not a stream object: {stream!r}")
784 try:
785 n = cast(int, stream["N"])
786 except KeyError:
787 if settings.STRICT:
788 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None
789 n = 0
790 parser = PDFStreamParser(stream.get_data())
791 parser.set_document(self)
792 objs: list[object] = []
793 try:
794 while 1:
795 (_, obj) = parser.nextobject()
796 objs.append(obj)
797 except PSEOF:
798 pass
799 return (objs, n)
801 def _getobj_parse(self, pos: int, objid: int) -> object:
802 assert self._parser is not None
803 self._parser.seek(pos)
804 (_, objid1) = self._parser.nexttoken() # objid
805 (_, _genno) = self._parser.nexttoken() # genno
806 (_, kwd) = self._parser.nexttoken()
807 # hack around malformed pdf files
808 # copied from https://github.com/jaepil/pdfminer3k/blob/master/
809 # pdfminer/pdfparser.py#L399
810 # to solve https://github.com/pdfminer/pdfminer.six/issues/56
811 # assert objid1 == objid, str((objid1, objid))
812 if objid1 != objid:
813 x = []
814 while kwd is not self.KEYWORD_OBJ:
815 (_, kwd) = self._parser.nexttoken()
816 x.append(kwd)
817 if len(x) >= 2:
818 objid1 = x[-2]
819 # #### end hack around malformed pdf files
820 if objid1 != objid:
821 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}")
823 if kwd != KWD(b"obj"):
824 raise PDFSyntaxError(f"Invalid object spec: offset={pos!r}")
825 (_, obj) = self._parser.nextobject()
826 return obj
828 # can raise PDFObjectNotFound
829 def getobj(self, objid: int) -> object:
830 """Get object from PDF
832 :raises PDFException if PDFDocument is not initialized
833 :raises PDFObjectNotFound if objid does not exist in PDF
834 """
835 if not self.xrefs:
836 raise PDFException("PDFDocument is not initialized")
837 log.debug("getobj: objid=%r", objid)
838 obj: object # Initialize to satisfy mypy; always assigned in branches below
839 genno: int
840 if objid in self._cached_objs:
841 (obj, genno) = self._cached_objs[objid]
842 else:
843 for xref in self.xrefs:
844 try:
845 (strmid, index, genno) = xref.get_pos(objid)
846 except KeyError:
847 continue
848 try:
849 if strmid is not None:
850 stream = stream_value(self.getobj(strmid))
851 obj = self._getobj_objstm(stream, index, objid)
852 else:
853 obj = self._getobj_parse(index, objid)
854 if self.decipher:
855 obj = decipher_all(self.decipher, objid, genno, obj)
857 if isinstance(obj, PDFStream):
858 obj.set_objid(objid, genno)
859 break
860 except (PSEOF, PDFSyntaxError):
861 continue
862 else:
863 raise PDFObjectNotFound(objid)
864 log.debug("register: objid=%r: %r", objid, obj)
865 if self.caching:
866 self._cached_objs[objid] = (obj, genno)
867 return obj
869 OutlineType = tuple[Any, Any, Any, Any, Any]
871 def get_outlines(self) -> Iterator[OutlineType]:
872 if "Outlines" not in self.catalog:
873 raise PDFNoOutlines
875 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]:
876 entry = dict_value(entry)
877 if "Title" in entry and ("A" in entry or "Dest" in entry):
878 title = decode_text(str_value(entry["Title"]))
879 dest = entry.get("Dest")
880 action = entry.get("A")
881 se = entry.get("SE")
882 yield (level, title, dest, action, se)
883 if "First" in entry and "Last" in entry:
884 yield from search(entry["First"], level + 1)
885 if "Next" in entry:
886 yield from search(entry["Next"], level)
888 return search(self.catalog["Outlines"], 0)
890 def get_page_labels(self) -> Iterator[str]:
891 """Generate page label strings for the PDF document.
893 If the document includes page labels, generates strings, one per page.
894 If not, raises PDFNoPageLabels.
896 The resulting iteration is unbounded.
897 """
898 assert self.catalog is not None
900 try:
901 page_labels = PageLabels(self.catalog["PageLabels"])
902 except (PDFTypeError, KeyError) as err:
903 raise PDFNoPageLabels from err
905 return page_labels.labels
907 def lookup_name(self, cat: str, key: str | bytes) -> Any:
908 try:
909 names = dict_value(self.catalog["Names"])
910 except (PDFTypeError, KeyError) as err:
911 raise PDFKeyError((cat, key)) from err
912 # may raise KeyError
913 d0 = dict_value(names[cat])
915 def lookup(d: dict[str, Any]) -> Any:
916 if "Limits" in d:
917 (k1, k2) = list_value(d["Limits"])
918 if key < k1 or k2 < key:
919 return None
920 if "Names" in d:
921 objs = list_value(d["Names"])
922 names = dict(
923 cast(Iterator[tuple[str | bytes, Any]], choplist(2, objs)),
924 )
925 return names[key]
926 if "Kids" in d:
927 for c in list_value(d["Kids"]):
928 v = lookup(dict_value(c))
929 if v:
930 return v
931 raise PDFKeyError((cat, key))
933 return lookup(d0)
935 def get_dest(self, name: str | bytes) -> Any:
936 try:
937 # PDF-1.2 or later
938 obj = self.lookup_name("Dests", name)
939 except KeyError:
940 # PDF-1.1 or prior
941 if "Dests" not in self.catalog:
942 raise PDFDestinationNotFound(name) from None
943 d0 = dict_value(self.catalog["Dests"])
944 if name not in d0:
945 raise PDFDestinationNotFound(name) from None
946 obj = d0[name]
947 return obj
949 # find_xref
950 def find_xref(self, parser: PDFParser) -> int:
951 """Internal function used to locate the first XRef."""
952 # search the last xref table by scanning the file backwards.
953 prev = b""
954 for line in parser.revreadlines():
955 line = line.strip()
956 log.debug("find_xref: %r", line)
958 if line == b"startxref":
959 log.debug("xref found: pos=%r", prev)
961 if not prev.isdigit():
962 raise PDFNoValidXRef(f"Invalid xref position: {prev!r}")
964 start = int(prev)
966 if not start >= 0:
967 raise PDFNoValidXRef(f"Invalid negative xref position: {start}")
969 return start
971 if line:
972 prev = line
974 raise PDFNoValidXRef("Unexpected EOF")
976 # read xref table
977 def read_xref_from(
978 self,
979 parser: PDFParser,
980 start: int,
981 xrefs: list[PDFBaseXRef],
982 ) -> None:
983 """Reads XRefs from the given location."""
984 parser.seek(start)
985 parser.reset()
986 try:
987 (pos, token) = parser.nexttoken()
988 except PSEOF as err:
989 raise PDFNoValidXRef("Unexpected EOF") from err
990 log.debug("read_xref_from: start=%d, token=%r", start, token)
991 if isinstance(token, int):
992 # XRefStream: PDF-1.5
993 parser.seek(pos)
994 parser.reset()
995 xref: PDFBaseXRef = PDFXRefStream()
996 xref.load(parser)
997 else:
998 if token is parser.KEYWORD_XREF:
999 parser.nextline()
1000 xref = PDFXRef()
1001 xref.load(parser)
1002 xrefs.append(xref)
1003 trailer = xref.get_trailer()
1004 log.debug("trailer: %r", trailer)
1005 if "XRefStm" in trailer:
1006 pos = int_value(trailer["XRefStm"])
1007 self.read_xref_from(parser, pos, xrefs)
1008 if "Prev" in trailer:
1009 # find previous xref
1010 pos = int_value(trailer["Prev"])
1011 self.read_xref_from(parser, pos, xrefs)
1014class PageLabels(NumberTree):
1015 """PageLabels from the document catalog.
1017 See Section 8.3.1 in the PDF Reference.
1018 """
1020 @property
1021 def labels(self) -> Iterator[str]:
1022 ranges = self.values
1024 # The tree must begin with page index 0
1025 if len(ranges) == 0 or ranges[0][0] != 0:
1026 if settings.STRICT:
1027 raise PDFSyntaxError("PageLabels is missing page index 0")
1028 else:
1029 # Try to cope, by assuming empty labels for the initial pages
1030 ranges.insert(0, (0, {}))
1032 for next, (start, label_dict_unchecked) in enumerate(ranges, 1):
1033 label_dict = dict_value(label_dict_unchecked)
1034 style = label_dict.get("S")
1035 prefix = decode_text(str_value(label_dict.get("P", b"")))
1036 first_value = int_value(label_dict.get("St", 1))
1038 if next == len(ranges):
1039 # This is the last specified range. It continues until the end
1040 # of the document.
1041 values: Iterable[int] = itertools.count(first_value)
1042 else:
1043 end, _ = ranges[next]
1044 range_length = end - start
1045 values = range(first_value, first_value + range_length)
1047 for value in values:
1048 label = self._format_page_label(value, style)
1049 yield prefix + label
1051 @staticmethod
1052 def _format_page_label(value: int, style: Any) -> str:
1053 """Format page label value in a specific style"""
1054 if style is None:
1055 label = ""
1056 elif style is LIT("D"): # Decimal arabic numerals
1057 label = str(value)
1058 elif style is LIT("R"): # Uppercase roman numerals
1059 label = format_int_roman(value).upper()
1060 elif style is LIT("r"): # Lowercase roman numerals
1061 label = format_int_roman(value)
1062 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ...
1063 label = format_int_alpha(value).upper()
1064 elif style is LIT("a"): # Lowercase letters a-z, aa-zz...
1065 label = format_int_alpha(value)
1066 else:
1067 log.warning("Unknown page label style: %r", style)
1068 label = ""
1069 return label