Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 88%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import itertools
2import logging
3import re
4import struct
5from hashlib import md5, sha256, sha384, sha512
6from typing import (
7 Any,
8 Callable,
9 Dict,
10 Iterable,
11 Iterator,
12 KeysView,
13 List,
14 Optional,
15 Sequence,
16 Tuple,
17 Type,
18 Union,
19 cast,
20)
22from cryptography.hazmat.backends import default_backend
23from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
25from pdfminer import settings
26from pdfminer.arcfour import Arcfour
27from pdfminer.casting import safe_int
28from pdfminer.data_structures import NumberTree
29from pdfminer.pdfexceptions import (
30 PDFException,
31 PDFKeyError,
32 PDFObjectNotFound,
33 PDFTypeError,
34)
35from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError
36from pdfminer.pdftypes import (
37 DecipherCallable,
38 PDFStream,
39 decipher_all,
40 dict_value,
41 int_value,
42 list_value,
43 str_value,
44 stream_value,
45 uint_value,
46)
47from pdfminer.psexceptions import PSEOF
48from pdfminer.psparser import KWD, LIT, literal_name
49from pdfminer.utils import (
50 choplist,
51 decode_text,
52 format_int_alpha,
53 format_int_roman,
54 nunpack,
55 unpad_aes,
56)
58log = logging.getLogger(__name__)
61class PDFNoValidXRef(PDFSyntaxError):
62 pass
65class PDFNoValidXRefWarning(SyntaxWarning):
66 """Legacy warning for missing xref.
68 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
69 """
72class PDFNoOutlines(PDFException):
73 pass
76class PDFNoPageLabels(PDFException):
77 pass
80class PDFDestinationNotFound(PDFException):
81 pass
84class PDFEncryptionError(PDFException):
85 pass
88class PDFPasswordIncorrect(PDFEncryptionError):
89 pass
92class PDFEncryptionWarning(UserWarning):
93 """Legacy warning for failed decryption.
95 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
96 """
99class PDFTextExtractionNotAllowedWarning(UserWarning):
100 """Legacy warning for PDF that does not allow extraction.
102 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
103 """
106class PDFTextExtractionNotAllowed(PDFEncryptionError):
107 pass
110# some predefined literals and keywords.
111LITERAL_OBJSTM = LIT("ObjStm")
112LITERAL_XREF = LIT("XRef")
113LITERAL_CATALOG = LIT("Catalog")
116class PDFBaseXRef:
117 def get_trailer(self) -> Dict[str, Any]:
118 raise NotImplementedError
120 def get_objids(self) -> Iterable[int]:
121 return []
123 # Must return
124 # (strmid, index, genno)
125 # or (None, pos, genno)
126 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
127 raise PDFKeyError(objid)
129 def load(self, parser: PDFParser) -> None:
130 raise NotImplementedError
133class PDFXRef(PDFBaseXRef):
134 def __init__(self) -> None:
135 self.offsets: Dict[int, Tuple[Optional[int], int, int]] = {}
136 self.trailer: Dict[str, Any] = {}
138 def __repr__(self) -> str:
139 return "<PDFXRef: offsets=%r>" % (self.offsets.keys())
141 def load(self, parser: PDFParser) -> None:
142 while True:
143 try:
144 (pos, line) = parser.nextline()
145 line = line.strip()
146 if not line:
147 continue
148 except PSEOF:
149 raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
150 if line.startswith(b"trailer"):
151 parser.seek(pos)
152 break
153 f = line.split(b" ")
154 if len(f) != 2:
155 error_msg = f"Trailer not found: {parser!r}: line={line!r}"
156 raise PDFNoValidXRef(error_msg)
157 try:
158 (start, nobjs) = map(int, f)
159 except ValueError:
160 error_msg = f"Invalid line: {parser!r}: line={line!r}"
161 raise PDFNoValidXRef(error_msg)
162 for objid in range(start, start + nobjs):
163 try:
164 (_, line) = parser.nextline()
165 line = line.strip()
166 except PSEOF:
167 raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
168 f = line.split(b" ")
169 if len(f) != 3:
170 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}"
171 raise PDFNoValidXRef(error_msg)
172 (pos_b, genno_b, use_b) = f
173 if use_b != b"n":
174 continue
176 pos_i = safe_int(pos_b)
177 genno_i = safe_int(genno_b)
178 if pos_i is not None and genno_i is not None:
179 self.offsets[objid] = (None, pos_i, genno_i)
180 else:
181 log.warning(
182 f"Not adding object {objid} to xref because position {pos_b!r} "
183 f"or generation number {genno_b!r} cannot be parsed as an int"
184 )
186 log.debug("xref objects: %r", self.offsets)
187 self.load_trailer(parser)
189 def load_trailer(self, parser: PDFParser) -> None:
190 try:
191 (_, kwd) = parser.nexttoken()
192 assert kwd is KWD(b"trailer"), str(kwd)
193 (_, dic) = parser.nextobject()
194 except PSEOF:
195 x = parser.pop(1)
196 if not x:
197 raise PDFNoValidXRef("Unexpected EOF - file corrupted")
198 (_, dic) = x[0]
199 self.trailer.update(dict_value(dic))
200 log.debug("trailer=%r", self.trailer)
202 def get_trailer(self) -> Dict[str, Any]:
203 return self.trailer
205 def get_objids(self) -> KeysView[int]:
206 return self.offsets.keys()
208 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
209 return self.offsets[objid]
212class PDFXRefFallback(PDFXRef):
213 def __repr__(self) -> str:
214 return "<PDFXRefFallback: offsets=%r>" % (self.offsets.keys())
216 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b")
218 def load(self, parser: PDFParser) -> None:
219 parser.seek(0)
220 while 1:
221 try:
222 (pos, line_bytes) = parser.nextline()
223 except PSEOF:
224 break
225 if line_bytes.startswith(b"trailer"):
226 parser.seek(pos)
227 self.load_trailer(parser)
228 log.debug("trailer: %r", self.trailer)
229 break
230 line = line_bytes.decode("latin-1") # default pdf encoding
231 m = self.PDFOBJ_CUE.match(line)
232 if not m:
233 continue
234 (objid_s, genno_s) = m.groups()
235 objid = int(objid_s)
236 genno = int(genno_s)
237 self.offsets[objid] = (None, pos, genno)
238 # expand ObjStm.
239 parser.seek(pos)
240 (_, obj) = parser.nextobject()
241 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM:
242 stream = stream_value(obj)
243 try:
244 n = stream["N"]
245 except KeyError:
246 if settings.STRICT:
247 raise PDFSyntaxError("N is not defined: %r" % stream)
248 n = 0
249 parser1 = PDFStreamParser(stream.get_data())
250 objs: List[int] = []
251 try:
252 while 1:
253 (_, obj) = parser1.nextobject()
254 objs.append(cast(int, obj))
255 except PSEOF:
256 pass
257 n = min(n, len(objs) // 2)
258 for index in range(n):
259 objid1 = objs[index * 2]
260 self.offsets[objid1] = (objid, index, 0)
263class PDFXRefStream(PDFBaseXRef):
264 def __init__(self) -> None:
265 self.data: Optional[bytes] = None
266 self.entlen: Optional[int] = None
267 self.fl1: Optional[int] = None
268 self.fl2: Optional[int] = None
269 self.fl3: Optional[int] = None
270 self.ranges: List[Tuple[int, int]] = []
272 def __repr__(self) -> str:
273 return "<PDFXRefStream: ranges=%r>" % (self.ranges)
275 def load(self, parser: PDFParser) -> None:
276 (_, objid) = parser.nexttoken() # ignored
277 (_, genno) = parser.nexttoken() # ignored
278 (_, kwd) = parser.nexttoken()
279 (_, stream) = parser.nextobject()
280 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF:
281 raise PDFNoValidXRef("Invalid PDF stream spec.")
282 size = stream["Size"]
283 index_array = stream.get("Index", (0, size))
284 if len(index_array) % 2 != 0:
285 raise PDFSyntaxError("Invalid index number")
286 self.ranges.extend(cast(Iterator[Tuple[int, int]], choplist(2, index_array)))
287 (self.fl1, self.fl2, self.fl3) = stream["W"]
288 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
289 self.data = stream.get_data()
290 self.entlen = self.fl1 + self.fl2 + self.fl3
291 self.trailer = stream.attrs
292 log.debug(
293 "xref stream: objid=%s, fields=%d,%d,%d",
294 ", ".join(map(repr, self.ranges)),
295 self.fl1,
296 self.fl2,
297 self.fl3,
298 )
300 def get_trailer(self) -> Dict[str, Any]:
301 return self.trailer
303 def get_objids(self) -> Iterator[int]:
304 for start, nobjs in self.ranges:
305 for i in range(nobjs):
306 assert self.entlen is not None
307 assert self.data is not None
308 offset = self.entlen * i
309 ent = self.data[offset : offset + self.entlen]
310 f1 = nunpack(ent[: self.fl1], 1)
311 if f1 == 1 or f1 == 2:
312 yield start + i
314 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
315 index = 0
316 for start, nobjs in self.ranges:
317 if start <= objid and objid < start + nobjs:
318 index += objid - start
319 break
320 else:
321 index += nobjs
322 else:
323 raise PDFKeyError(objid)
324 assert self.entlen is not None
325 assert self.data is not None
326 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
327 offset = self.entlen * index
328 ent = self.data[offset : offset + self.entlen]
329 f1 = nunpack(ent[: self.fl1], 1)
330 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2])
331 f3 = nunpack(ent[self.fl1 + self.fl2 :])
332 if f1 == 1:
333 return (None, f2, f3)
334 elif f1 == 2:
335 return (f2, f3, 0)
336 else:
337 # this is a free object
338 raise PDFKeyError(objid)
341class PDFStandardSecurityHandler:
342 PASSWORD_PADDING = (
343 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08"
344 b"..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz"
345 )
346 supported_revisions: Tuple[int, ...] = (2, 3)
348 def __init__(
349 self,
350 docid: Sequence[bytes],
351 param: Dict[str, Any],
352 password: str = "",
353 ) -> None:
354 self.docid = docid
355 self.param = param
356 self.password = password
357 self.init()
359 def init(self) -> None:
360 self.init_params()
361 if self.r not in self.supported_revisions:
362 error_msg = "Unsupported revision: param=%r" % self.param
363 raise PDFEncryptionError(error_msg)
364 self.init_key()
366 def init_params(self) -> None:
367 self.v = int_value(self.param.get("V", 0))
368 self.r = int_value(self.param["R"])
369 self.p = uint_value(self.param["P"], 32)
370 self.o = str_value(self.param["O"])
371 self.u = str_value(self.param["U"])
372 self.length = int_value(self.param.get("Length", 40))
374 def init_key(self) -> None:
375 self.key = self.authenticate(self.password)
376 if self.key is None:
377 raise PDFPasswordIncorrect
379 def is_printable(self) -> bool:
380 return bool(self.p & 4)
382 def is_modifiable(self) -> bool:
383 return bool(self.p & 8)
385 def is_extractable(self) -> bool:
386 return bool(self.p & 16)
388 def compute_u(self, key: bytes) -> bytes:
389 if self.r == 2:
390 # Algorithm 3.4
391 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2
392 else:
393 # Algorithm 3.5
394 hash = md5(self.PASSWORD_PADDING) # 2
395 hash.update(self.docid[0]) # 3
396 result = Arcfour(key).encrypt(hash.digest()) # 4
397 for i in range(1, 20): # 5
398 k = b"".join(bytes((c ^ i,)) for c in iter(key))
399 result = Arcfour(k).encrypt(result)
400 result += result # 6
401 return result
403 def compute_encryption_key(self, password: bytes) -> bytes:
404 # Algorithm 3.2
405 password = (password + self.PASSWORD_PADDING)[:32] # 1
406 hash = md5(password) # 2
407 hash.update(self.o) # 3
408 # See https://github.com/pdfminer/pdfminer.six/issues/186
409 hash.update(struct.pack("<L", self.p)) # 4
410 hash.update(self.docid[0]) # 5
411 if self.r >= 4:
412 if not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata:
413 hash.update(b"\xff\xff\xff\xff")
414 result = hash.digest()
415 n = 5
416 if self.r >= 3:
417 n = self.length // 8
418 for _ in range(50):
419 result = md5(result[:n]).digest()
420 return result[:n]
422 def authenticate(self, password: str) -> Optional[bytes]:
423 password_bytes = password.encode("latin1")
424 key = self.authenticate_user_password(password_bytes)
425 if key is None:
426 key = self.authenticate_owner_password(password_bytes)
427 return key
429 def authenticate_user_password(self, password: bytes) -> Optional[bytes]:
430 key = self.compute_encryption_key(password)
431 if self.verify_encryption_key(key):
432 return key
433 else:
434 return None
436 def verify_encryption_key(self, key: bytes) -> bool:
437 # Algorithm 3.6
438 u = self.compute_u(key)
439 if self.r == 2:
440 return u == self.u
441 return u[:16] == self.u[:16]
443 def authenticate_owner_password(self, password: bytes) -> Optional[bytes]:
444 # Algorithm 3.7
445 password = (password + self.PASSWORD_PADDING)[:32]
446 hash = md5(password)
447 if self.r >= 3:
448 for _ in range(50):
449 hash = md5(hash.digest())
450 n = 5
451 if self.r >= 3:
452 n = self.length // 8
453 key = hash.digest()[:n]
454 if self.r == 2:
455 user_password = Arcfour(key).decrypt(self.o)
456 else:
457 user_password = self.o
458 for i in range(19, -1, -1):
459 k = b"".join(bytes((c ^ i,)) for c in iter(key))
460 user_password = Arcfour(k).decrypt(user_password)
461 return self.authenticate_user_password(user_password)
463 def decrypt(
464 self,
465 objid: int,
466 genno: int,
467 data: bytes,
468 attrs: Optional[Dict[str, Any]] = None,
469 ) -> bytes:
470 return self.decrypt_rc4(objid, genno, data)
472 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes:
473 assert self.key is not None
474 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2]
475 hash = md5(key)
476 key = hash.digest()[: min(len(key), 16)]
477 return Arcfour(key).decrypt(data)
480class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler):
481 supported_revisions: Tuple[int, ...] = (4,)
483 def init_params(self) -> None:
484 super().init_params()
485 self.length = 128
486 self.cf = dict_value(self.param.get("CF"))
487 self.stmf = literal_name(self.param["StmF"])
488 self.strf = literal_name(self.param["StrF"])
489 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True))
490 if self.stmf != self.strf:
491 error_msg = "Unsupported crypt filter: param=%r" % self.param
492 raise PDFEncryptionError(error_msg)
493 self.cfm = {}
494 for k, v in self.cf.items():
495 f = self.get_cfm(literal_name(v["CFM"]))
496 if f is None:
497 error_msg = "Unknown crypt filter method: param=%r" % self.param
498 raise PDFEncryptionError(error_msg)
499 self.cfm[k] = f
500 self.cfm["Identity"] = self.decrypt_identity
501 if self.strf not in self.cfm:
502 error_msg = "Undefined crypt filter: param=%r" % self.param
503 raise PDFEncryptionError(error_msg)
505 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
506 if name == "V2":
507 return self.decrypt_rc4
508 elif name == "AESV2":
509 return self.decrypt_aes128
510 else:
511 return None
513 def decrypt(
514 self,
515 objid: int,
516 genno: int,
517 data: bytes,
518 attrs: Optional[Dict[str, Any]] = None,
519 name: Optional[str] = None,
520 ) -> bytes:
521 if not self.encrypt_metadata and attrs is not None:
522 t = attrs.get("Type")
523 if t is not None and literal_name(t) == "Metadata":
524 return data
525 if name is None:
526 name = self.strf
527 return self.cfm[name](objid, genno, data)
529 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes:
530 return data
532 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes:
533 assert self.key is not None
534 key = (
535 self.key
536 + struct.pack("<L", objid)[:3]
537 + struct.pack("<L", genno)[:2]
538 + b"sAlT"
539 )
540 hash = md5(key)
541 key = hash.digest()[: min(len(key), 16)]
542 initialization_vector = data[:16]
543 ciphertext = data[16:]
544 cipher = Cipher(
545 algorithms.AES(key),
546 modes.CBC(initialization_vector),
547 backend=default_backend(),
548 ) # type: ignore
549 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
550 return unpad_aes(plaintext)
553class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4):
554 supported_revisions = (5, 6)
556 def init_params(self) -> None:
557 super().init_params()
558 self.length = 256
559 self.oe = str_value(self.param["OE"])
560 self.ue = str_value(self.param["UE"])
561 self.o_hash = self.o[:32]
562 self.o_validation_salt = self.o[32:40]
563 self.o_key_salt = self.o[40:]
564 self.u_hash = self.u[:32]
565 self.u_validation_salt = self.u[32:40]
566 self.u_key_salt = self.u[40:]
568 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
569 if name == "AESV3":
570 return self.decrypt_aes256
571 else:
572 return None
574 def authenticate(self, password: str) -> Optional[bytes]:
575 password_b = self._normalize_password(password)
576 hash = self._password_hash(password_b, self.o_validation_salt, self.u)
577 if hash == self.o_hash:
578 hash = self._password_hash(password_b, self.o_key_salt, self.u)
579 cipher = Cipher(
580 algorithms.AES(hash),
581 modes.CBC(b"\0" * 16),
582 backend=default_backend(),
583 ) # type: ignore
584 return cipher.decryptor().update(self.oe) # type: ignore
585 hash = self._password_hash(password_b, self.u_validation_salt)
586 if hash == self.u_hash:
587 hash = self._password_hash(password_b, self.u_key_salt)
588 cipher = Cipher(
589 algorithms.AES(hash),
590 modes.CBC(b"\0" * 16),
591 backend=default_backend(),
592 ) # type: ignore
593 return cipher.decryptor().update(self.ue) # type: ignore
594 return None
596 def _normalize_password(self, password: str) -> bytes:
597 if self.r == 6:
598 # saslprep expects non-empty strings, apparently
599 if not password:
600 return b""
601 from pdfminer._saslprep import saslprep
603 password = saslprep(password)
604 return password.encode("utf-8")[:127]
606 def _password_hash(
607 self,
608 password: bytes,
609 salt: bytes,
610 vector: Optional[bytes] = None,
611 ) -> bytes:
612 """Compute password hash depending on revision number"""
613 if self.r == 5:
614 return self._r5_password(password, salt, vector)
615 return self._r6_password(password, salt[0:8], vector)
617 def _r5_password(
618 self,
619 password: bytes,
620 salt: bytes,
621 vector: Optional[bytes] = None,
622 ) -> bytes:
623 """Compute the password for revision 5"""
624 hash = sha256(password)
625 hash.update(salt)
626 if vector is not None:
627 hash.update(vector)
628 return hash.digest()
630 def _r6_password(
631 self,
632 password: bytes,
633 salt: bytes,
634 vector: Optional[bytes] = None,
635 ) -> bytes:
636 """Compute the password for revision 6"""
637 initial_hash = sha256(password)
638 initial_hash.update(salt)
639 if vector is not None:
640 initial_hash.update(vector)
641 k = initial_hash.digest()
642 hashes = (sha256, sha384, sha512)
643 round_no = last_byte_val = 0
644 while round_no < 64 or last_byte_val > round_no - 32:
645 k1 = (password + k + (vector or b"")) * 64
646 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1)
647 # compute the first 16 bytes of e,
648 # interpreted as an unsigned integer mod 3
649 next_hash = hashes[self._bytes_mod_3(e[:16])]
650 k = next_hash(e).digest()
651 last_byte_val = e[len(e) - 1]
652 round_no += 1
653 return k[:32]
655 @staticmethod
656 def _bytes_mod_3(input_bytes: bytes) -> int:
657 # 256 is 1 mod 3, so we can just sum 'em
658 return sum(b % 3 for b in input_bytes) % 3
660 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes:
661 cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
662 encryptor = cipher.encryptor() # type: ignore
663 return encryptor.update(data) + encryptor.finalize() # type: ignore
665 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes:
666 initialization_vector = data[:16]
667 ciphertext = data[16:]
668 assert self.key is not None
669 cipher = Cipher(
670 algorithms.AES(self.key),
671 modes.CBC(initialization_vector),
672 backend=default_backend(),
673 ) # type: ignore
674 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
675 return unpad_aes(plaintext)
678class PDFDocument:
679 """PDFDocument object represents a PDF document.
681 Since a PDF file can be very big, normally it is not loaded at
682 once. So PDF document has to cooperate with a PDF parser in order to
683 dynamically import the data as processing goes.
685 Typical usage:
686 doc = PDFDocument(parser, password)
687 obj = doc.getobj(objid)
689 """
691 security_handler_registry: Dict[int, Type[PDFStandardSecurityHandler]] = {
692 1: PDFStandardSecurityHandler,
693 2: PDFStandardSecurityHandler,
694 4: PDFStandardSecurityHandlerV4,
695 5: PDFStandardSecurityHandlerV5,
696 }
698 def __init__(
699 self,
700 parser: PDFParser,
701 password: str = "",
702 caching: bool = True,
703 fallback: bool = True,
704 ) -> None:
705 """Set the document to use a given PDFParser object."""
706 self.caching = caching
707 self.xrefs: List[PDFBaseXRef] = []
708 self.info = []
709 self.catalog: Dict[str, Any] = {}
710 self.encryption: Optional[Tuple[Any, Any]] = None
711 self.decipher: Optional[DecipherCallable] = None
712 self._parser = None
713 self._cached_objs: Dict[int, Tuple[object, int]] = {}
714 self._parsed_objs: Dict[int, Tuple[List[object], int]] = {}
715 self._parser = parser
716 self._parser.set_document(self)
717 self.is_printable = self.is_modifiable = self.is_extractable = True
718 # Retrieve the information of each header that was appended
719 # (maybe multiple times) at the end of the document.
720 try:
721 pos = self.find_xref(parser)
722 self.read_xref_from(parser, pos, self.xrefs)
723 except PDFNoValidXRef:
724 if fallback:
725 parser.fallback = True
726 newxref = PDFXRefFallback()
727 newxref.load(parser)
728 self.xrefs.append(newxref)
730 for xref in self.xrefs:
731 trailer = xref.get_trailer()
732 if not trailer:
733 continue
734 # If there's an encryption info, remember it.
735 if "Encrypt" in trailer:
736 if "ID" in trailer:
737 id_value = list_value(trailer["ID"])
738 else:
739 # Some documents may not have a /ID, use two empty
740 # byte strings instead. Solves
741 # https://github.com/pdfminer/pdfminer.six/issues/594
742 id_value = (b"", b"")
743 self.encryption = (id_value, dict_value(trailer["Encrypt"]))
744 self._initialize_password(password)
745 if "Info" in trailer:
746 self.info.append(dict_value(trailer["Info"]))
747 if "Root" in trailer:
748 # Every PDF file must have exactly one /Root dictionary.
749 self.catalog = dict_value(trailer["Root"])
750 break
751 else:
752 raise PDFSyntaxError("No /Root object! - Is this really a PDF?")
753 if self.catalog.get("Type") is not LITERAL_CATALOG:
754 if settings.STRICT:
755 raise PDFSyntaxError("Catalog not found!")
757 KEYWORD_OBJ = KWD(b"obj")
759 # _initialize_password(password=b'')
760 # Perform the initialization with a given password.
761 def _initialize_password(self, password: str = "") -> None:
762 assert self.encryption is not None
763 (docid, param) = self.encryption
764 if literal_name(param.get("Filter")) != "Standard":
765 raise PDFEncryptionError("Unknown filter: param=%r" % param)
766 v = int_value(param.get("V", 0))
767 factory = self.security_handler_registry.get(v)
768 if factory is None:
769 raise PDFEncryptionError("Unknown algorithm: param=%r" % param)
770 handler = factory(docid, param, password)
771 self.decipher = handler.decrypt
772 self.is_printable = handler.is_printable()
773 self.is_modifiable = handler.is_modifiable()
774 self.is_extractable = handler.is_extractable()
775 assert self._parser is not None
776 self._parser.fallback = False # need to read streams with exact length
778 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object:
779 if stream.objid in self._parsed_objs:
780 (objs, n) = self._parsed_objs[stream.objid]
781 else:
782 (objs, n) = self._get_objects(stream)
783 if self.caching:
784 assert stream.objid is not None
785 self._parsed_objs[stream.objid] = (objs, n)
786 i = n * 2 + index
787 try:
788 obj = objs[i]
789 except IndexError:
790 raise PDFSyntaxError("index too big: %r" % index)
791 return obj
793 def _get_objects(self, stream: PDFStream) -> Tuple[List[object], int]:
794 if stream.get("Type") is not LITERAL_OBJSTM:
795 if settings.STRICT:
796 raise PDFSyntaxError("Not a stream object: %r" % stream)
797 try:
798 n = cast(int, stream["N"])
799 except KeyError:
800 if settings.STRICT:
801 raise PDFSyntaxError("N is not defined: %r" % stream)
802 n = 0
803 parser = PDFStreamParser(stream.get_data())
804 parser.set_document(self)
805 objs: List[object] = []
806 try:
807 while 1:
808 (_, obj) = parser.nextobject()
809 objs.append(obj)
810 except PSEOF:
811 pass
812 return (objs, n)
814 def _getobj_parse(self, pos: int, objid: int) -> object:
815 assert self._parser is not None
816 self._parser.seek(pos)
817 (_, objid1) = self._parser.nexttoken() # objid
818 (_, genno) = self._parser.nexttoken() # genno
819 (_, kwd) = self._parser.nexttoken()
820 # hack around malformed pdf files
821 # copied from https://github.com/jaepil/pdfminer3k/blob/master/
822 # pdfminer/pdfparser.py#L399
823 # to solve https://github.com/pdfminer/pdfminer.six/issues/56
824 # assert objid1 == objid, str((objid1, objid))
825 if objid1 != objid:
826 x = []
827 while kwd is not self.KEYWORD_OBJ:
828 (_, kwd) = self._parser.nexttoken()
829 x.append(kwd)
830 if len(x) >= 2:
831 objid1 = x[-2]
832 # #### end hack around malformed pdf files
833 if objid1 != objid:
834 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}")
836 if kwd != KWD(b"obj"):
837 raise PDFSyntaxError("Invalid object spec: offset=%r" % pos)
838 (_, obj) = self._parser.nextobject()
839 return obj
841 # can raise PDFObjectNotFound
842 def getobj(self, objid: int) -> object:
843 """Get object from PDF
845 :raises PDFException if PDFDocument is not initialized
846 :raises PDFObjectNotFound if objid does not exist in PDF
847 """
848 if not self.xrefs:
849 raise PDFException("PDFDocument is not initialized")
850 log.debug("getobj: objid=%r", objid)
851 if objid in self._cached_objs:
852 (obj, genno) = self._cached_objs[objid]
853 else:
854 for xref in self.xrefs:
855 try:
856 (strmid, index, genno) = xref.get_pos(objid)
857 except KeyError:
858 continue
859 try:
860 if strmid is not None:
861 stream = stream_value(self.getobj(strmid))
862 obj = self._getobj_objstm(stream, index, objid)
863 else:
864 obj = self._getobj_parse(index, objid)
865 if self.decipher:
866 obj = decipher_all(self.decipher, objid, genno, obj)
868 if isinstance(obj, PDFStream):
869 obj.set_objid(objid, genno)
870 break
871 except (PSEOF, PDFSyntaxError):
872 continue
873 else:
874 raise PDFObjectNotFound(objid)
875 log.debug("register: objid=%r: %r", objid, obj)
876 if self.caching:
877 self._cached_objs[objid] = (obj, genno)
878 return obj
880 OutlineType = Tuple[Any, Any, Any, Any, Any]
882 def get_outlines(self) -> Iterator[OutlineType]:
883 if "Outlines" not in self.catalog:
884 raise PDFNoOutlines
886 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]:
887 entry = dict_value(entry)
888 if "Title" in entry:
889 if "A" in entry or "Dest" in entry:
890 title = decode_text(str_value(entry["Title"]))
891 dest = entry.get("Dest")
892 action = entry.get("A")
893 se = entry.get("SE")
894 yield (level, title, dest, action, se)
895 if "First" in entry and "Last" in entry:
896 yield from search(entry["First"], level + 1)
897 if "Next" in entry:
898 yield from search(entry["Next"], level)
900 return search(self.catalog["Outlines"], 0)
902 def get_page_labels(self) -> Iterator[str]:
903 """Generate page label strings for the PDF document.
905 If the document includes page labels, generates strings, one per page.
906 If not, raises PDFNoPageLabels.
908 The resulting iteration is unbounded.
909 """
910 assert self.catalog is not None
912 try:
913 page_labels = PageLabels(self.catalog["PageLabels"])
914 except (PDFTypeError, KeyError):
915 raise PDFNoPageLabels
917 return page_labels.labels
919 def lookup_name(self, cat: str, key: Union[str, bytes]) -> Any:
920 try:
921 names = dict_value(self.catalog["Names"])
922 except (PDFTypeError, KeyError):
923 raise PDFKeyError((cat, key))
924 # may raise KeyError
925 d0 = dict_value(names[cat])
927 def lookup(d: Dict[str, Any]) -> Any:
928 if "Limits" in d:
929 (k1, k2) = list_value(d["Limits"])
930 if key < k1 or k2 < key:
931 return None
932 if "Names" in d:
933 objs = list_value(d["Names"])
934 names = dict(
935 cast(Iterator[Tuple[Union[str, bytes], Any]], choplist(2, objs)),
936 )
937 return names[key]
938 if "Kids" in d:
939 for c in list_value(d["Kids"]):
940 v = lookup(dict_value(c))
941 if v:
942 return v
943 raise PDFKeyError((cat, key))
945 return lookup(d0)
947 def get_dest(self, name: Union[str, bytes]) -> Any:
948 try:
949 # PDF-1.2 or later
950 obj = self.lookup_name("Dests", name)
951 except KeyError:
952 # PDF-1.1 or prior
953 if "Dests" not in self.catalog:
954 raise PDFDestinationNotFound(name)
955 d0 = dict_value(self.catalog["Dests"])
956 if name not in d0:
957 raise PDFDestinationNotFound(name)
958 obj = d0[name]
959 return obj
961 # find_xref
962 def find_xref(self, parser: PDFParser) -> int:
963 """Internal function used to locate the first XRef."""
964 # search the last xref table by scanning the file backwards.
965 prev = b""
966 for line in parser.revreadlines():
967 line = line.strip()
968 log.debug("find_xref: %r", line)
970 if line == b"startxref":
971 log.debug("xref found: pos=%r", prev)
973 if not prev.isdigit():
974 raise PDFNoValidXRef(f"Invalid xref position: {prev!r}")
976 start = int(prev)
978 if not start >= 0:
979 raise PDFNoValidXRef(f"Invalid negative xref position: {start}")
981 return start
983 if line:
984 prev = line
986 raise PDFNoValidXRef("Unexpected EOF")
988 # read xref table
989 def read_xref_from(
990 self,
991 parser: PDFParser,
992 start: int,
993 xrefs: List[PDFBaseXRef],
994 ) -> None:
995 """Reads XRefs from the given location."""
996 parser.seek(start)
997 parser.reset()
998 try:
999 (pos, token) = parser.nexttoken()
1000 except PSEOF:
1001 raise PDFNoValidXRef("Unexpected EOF")
1002 log.debug("read_xref_from: start=%d, token=%r", start, token)
1003 if isinstance(token, int):
1004 # XRefStream: PDF-1.5
1005 parser.seek(pos)
1006 parser.reset()
1007 xref: PDFBaseXRef = PDFXRefStream()
1008 xref.load(parser)
1009 else:
1010 if token is parser.KEYWORD_XREF:
1011 parser.nextline()
1012 xref = PDFXRef()
1013 xref.load(parser)
1014 xrefs.append(xref)
1015 trailer = xref.get_trailer()
1016 log.debug("trailer: %r", trailer)
1017 if "XRefStm" in trailer:
1018 pos = int_value(trailer["XRefStm"])
1019 self.read_xref_from(parser, pos, xrefs)
1020 if "Prev" in trailer:
1021 # find previous xref
1022 pos = int_value(trailer["Prev"])
1023 self.read_xref_from(parser, pos, xrefs)
1026class PageLabels(NumberTree):
1027 """PageLabels from the document catalog.
1029 See Section 8.3.1 in the PDF Reference.
1030 """
1032 @property
1033 def labels(self) -> Iterator[str]:
1034 ranges = self.values
1036 # The tree must begin with page index 0
1037 if len(ranges) == 0 or ranges[0][0] != 0:
1038 if settings.STRICT:
1039 raise PDFSyntaxError("PageLabels is missing page index 0")
1040 else:
1041 # Try to cope, by assuming empty labels for the initial pages
1042 ranges.insert(0, (0, {}))
1044 for next, (start, label_dict_unchecked) in enumerate(ranges, 1):
1045 label_dict = dict_value(label_dict_unchecked)
1046 style = label_dict.get("S")
1047 prefix = decode_text(str_value(label_dict.get("P", b"")))
1048 first_value = int_value(label_dict.get("St", 1))
1050 if next == len(ranges):
1051 # This is the last specified range. It continues until the end
1052 # of the document.
1053 values: Iterable[int] = itertools.count(first_value)
1054 else:
1055 end, _ = ranges[next]
1056 range_length = end - start
1057 values = range(first_value, first_value + range_length)
1059 for value in values:
1060 label = self._format_page_label(value, style)
1061 yield prefix + label
1063 @staticmethod
1064 def _format_page_label(value: int, style: Any) -> str:
1065 """Format page label value in a specific style"""
1066 if style is None:
1067 label = ""
1068 elif style is LIT("D"): # Decimal arabic numerals
1069 label = str(value)
1070 elif style is LIT("R"): # Uppercase roman numerals
1071 label = format_int_roman(value).upper()
1072 elif style is LIT("r"): # Lowercase roman numerals
1073 label = format_int_roman(value)
1074 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ...
1075 label = format_int_alpha(value).upper()
1076 elif style is LIT("a"): # Lowercase letters a-z, aa-zz...
1077 label = format_int_alpha(value)
1078 else:
1079 log.warning("Unknown page label style: %r", style)
1080 label = ""
1081 return label