Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 80%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import itertools
2import logging
3import re
4import struct
5from hashlib import md5, sha256, sha384, sha512
6from typing import (
7 Any,
8 Callable,
9 Dict,
10 Iterable,
11 Iterator,
12 KeysView,
13 List,
14 Optional,
15 Sequence,
16 Tuple,
17 Type,
18 Union,
19 cast,
20)
22from cryptography.hazmat.backends import default_backend
23from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
25from pdfminer import settings
26from pdfminer.arcfour import Arcfour
27from pdfminer.data_structures import NumberTree
28from pdfminer.pdfexceptions import (
29 PDFException,
30 PDFKeyError,
31 PDFObjectNotFound,
32 PDFTypeError,
33)
34from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError
35from pdfminer.pdftypes import (
36 DecipherCallable,
37 PDFStream,
38 decipher_all,
39 dict_value,
40 int_value,
41 list_value,
42 str_value,
43 stream_value,
44 uint_value,
45)
46from pdfminer.psexceptions import PSEOF
47from pdfminer.psparser import KWD, LIT, literal_name
48from pdfminer.utils import (
49 choplist,
50 decode_text,
51 format_int_alpha,
52 format_int_roman,
53 nunpack,
54)
56log = logging.getLogger(__name__)
59class PDFNoValidXRef(PDFSyntaxError):
60 pass
63class PDFNoValidXRefWarning(SyntaxWarning):
64 """Legacy warning for missing xref.
66 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
67 """
70class PDFNoOutlines(PDFException):
71 pass
74class PDFNoPageLabels(PDFException):
75 pass
78class PDFDestinationNotFound(PDFException):
79 pass
82class PDFEncryptionError(PDFException):
83 pass
86class PDFPasswordIncorrect(PDFEncryptionError):
87 pass
90class PDFEncryptionWarning(UserWarning):
91 """Legacy warning for failed decryption.
93 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
94 """
97class PDFTextExtractionNotAllowedWarning(UserWarning):
98 """Legacy warning for PDF that does not allow extraction.
100 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
101 """
104class PDFTextExtractionNotAllowed(PDFEncryptionError):
105 pass
108# some predefined literals and keywords.
109LITERAL_OBJSTM = LIT("ObjStm")
110LITERAL_XREF = LIT("XRef")
111LITERAL_CATALOG = LIT("Catalog")
114class PDFBaseXRef:
115 def get_trailer(self) -> Dict[str, Any]:
116 raise NotImplementedError
118 def get_objids(self) -> Iterable[int]:
119 return []
121 # Must return
122 # (strmid, index, genno)
123 # or (None, pos, genno)
124 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
125 raise PDFKeyError(objid)
127 def load(self, parser: PDFParser) -> None:
128 raise NotImplementedError
131class PDFXRef(PDFBaseXRef):
132 def __init__(self) -> None:
133 self.offsets: Dict[int, Tuple[Optional[int], int, int]] = {}
134 self.trailer: Dict[str, Any] = {}
136 def __repr__(self) -> str:
137 return "<PDFXRef: offsets=%r>" % (self.offsets.keys())
139 def load(self, parser: PDFParser) -> None:
140 while True:
141 try:
142 (pos, line) = parser.nextline()
143 line = line.strip()
144 if not line:
145 continue
146 except PSEOF:
147 raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
148 if line.startswith(b"trailer"):
149 parser.seek(pos)
150 break
151 f = line.split(b" ")
152 if len(f) != 2:
153 error_msg = f"Trailer not found: {parser!r}: line={line!r}"
154 raise PDFNoValidXRef(error_msg)
155 try:
156 (start, nobjs) = map(int, f)
157 except ValueError:
158 error_msg = f"Invalid line: {parser!r}: line={line!r}"
159 raise PDFNoValidXRef(error_msg)
160 for objid in range(start, start + nobjs):
161 try:
162 (_, line) = parser.nextline()
163 line = line.strip()
164 except PSEOF:
165 raise PDFNoValidXRef("Unexpected EOF - file corrupted?")
166 f = line.split(b" ")
167 if len(f) != 3:
168 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}"
169 raise PDFNoValidXRef(error_msg)
170 (pos_b, genno_b, use_b) = f
171 if use_b != b"n":
172 continue
173 self.offsets[objid] = (None, int(pos_b), int(genno_b))
174 log.debug("xref objects: %r", self.offsets)
175 self.load_trailer(parser)
177 def load_trailer(self, parser: PDFParser) -> None:
178 try:
179 (_, kwd) = parser.nexttoken()
180 assert kwd is KWD(b"trailer"), str(kwd)
181 (_, dic) = parser.nextobject()
182 except PSEOF:
183 x = parser.pop(1)
184 if not x:
185 raise PDFNoValidXRef("Unexpected EOF - file corrupted")
186 (_, dic) = x[0]
187 self.trailer.update(dict_value(dic))
188 log.debug("trailer=%r", self.trailer)
190 def get_trailer(self) -> Dict[str, Any]:
191 return self.trailer
193 def get_objids(self) -> KeysView[int]:
194 return self.offsets.keys()
196 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
197 return self.offsets[objid]
200class PDFXRefFallback(PDFXRef):
201 def __repr__(self) -> str:
202 return "<PDFXRefFallback: offsets=%r>" % (self.offsets.keys())
204 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b")
206 def load(self, parser: PDFParser) -> None:
207 parser.seek(0)
208 while 1:
209 try:
210 (pos, line_bytes) = parser.nextline()
211 except PSEOF:
212 break
213 if line_bytes.startswith(b"trailer"):
214 parser.seek(pos)
215 self.load_trailer(parser)
216 log.debug("trailer: %r", self.trailer)
217 break
218 line = line_bytes.decode("latin-1") # default pdf encoding
219 m = self.PDFOBJ_CUE.match(line)
220 if not m:
221 continue
222 (objid_s, genno_s) = m.groups()
223 objid = int(objid_s)
224 genno = int(genno_s)
225 self.offsets[objid] = (None, pos, genno)
226 # expand ObjStm.
227 parser.seek(pos)
228 (_, obj) = parser.nextobject()
229 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM:
230 stream = stream_value(obj)
231 try:
232 n = stream["N"]
233 except KeyError:
234 if settings.STRICT:
235 raise PDFSyntaxError("N is not defined: %r" % stream)
236 n = 0
237 parser1 = PDFStreamParser(stream.get_data())
238 objs: List[int] = []
239 try:
240 while 1:
241 (_, obj) = parser1.nextobject()
242 objs.append(cast(int, obj))
243 except PSEOF:
244 pass
245 n = min(n, len(objs) // 2)
246 for index in range(n):
247 objid1 = objs[index * 2]
248 self.offsets[objid1] = (objid, index, 0)
251class PDFXRefStream(PDFBaseXRef):
252 def __init__(self) -> None:
253 self.data: Optional[bytes] = None
254 self.entlen: Optional[int] = None
255 self.fl1: Optional[int] = None
256 self.fl2: Optional[int] = None
257 self.fl3: Optional[int] = None
258 self.ranges: List[Tuple[int, int]] = []
260 def __repr__(self) -> str:
261 return "<PDFXRefStream: ranges=%r>" % (self.ranges)
263 def load(self, parser: PDFParser) -> None:
264 (_, objid) = parser.nexttoken() # ignored
265 (_, genno) = parser.nexttoken() # ignored
266 (_, kwd) = parser.nexttoken()
267 (_, stream) = parser.nextobject()
268 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF:
269 raise PDFNoValidXRef("Invalid PDF stream spec.")
270 size = stream["Size"]
271 index_array = stream.get("Index", (0, size))
272 if len(index_array) % 2 != 0:
273 raise PDFSyntaxError("Invalid index number")
274 self.ranges.extend(cast(Iterator[Tuple[int, int]], choplist(2, index_array)))
275 (self.fl1, self.fl2, self.fl3) = stream["W"]
276 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
277 self.data = stream.get_data()
278 self.entlen = self.fl1 + self.fl2 + self.fl3
279 self.trailer = stream.attrs
280 log.debug(
281 "xref stream: objid=%s, fields=%d,%d,%d",
282 ", ".join(map(repr, self.ranges)),
283 self.fl1,
284 self.fl2,
285 self.fl3,
286 )
288 def get_trailer(self) -> Dict[str, Any]:
289 return self.trailer
291 def get_objids(self) -> Iterator[int]:
292 for start, nobjs in self.ranges:
293 for i in range(nobjs):
294 assert self.entlen is not None
295 assert self.data is not None
296 offset = self.entlen * i
297 ent = self.data[offset : offset + self.entlen]
298 f1 = nunpack(ent[: self.fl1], 1)
299 if f1 == 1 or f1 == 2:
300 yield start + i
302 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]:
303 index = 0
304 for start, nobjs in self.ranges:
305 if start <= objid and objid < start + nobjs:
306 index += objid - start
307 break
308 else:
309 index += nobjs
310 else:
311 raise PDFKeyError(objid)
312 assert self.entlen is not None
313 assert self.data is not None
314 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
315 offset = self.entlen * index
316 ent = self.data[offset : offset + self.entlen]
317 f1 = nunpack(ent[: self.fl1], 1)
318 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2])
319 f3 = nunpack(ent[self.fl1 + self.fl2 :])
320 if f1 == 1:
321 return (None, f2, f3)
322 elif f1 == 2:
323 return (f2, f3, 0)
324 else:
325 # this is a free object
326 raise PDFKeyError(objid)
329class PDFStandardSecurityHandler:
330 PASSWORD_PADDING = (
331 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08"
332 b"..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz"
333 )
334 supported_revisions: Tuple[int, ...] = (2, 3)
336 def __init__(
337 self,
338 docid: Sequence[bytes],
339 param: Dict[str, Any],
340 password: str = "",
341 ) -> None:
342 self.docid = docid
343 self.param = param
344 self.password = password
345 self.init()
347 def init(self) -> None:
348 self.init_params()
349 if self.r not in self.supported_revisions:
350 error_msg = "Unsupported revision: param=%r" % self.param
351 raise PDFEncryptionError(error_msg)
352 self.init_key()
354 def init_params(self) -> None:
355 self.v = int_value(self.param.get("V", 0))
356 self.r = int_value(self.param["R"])
357 self.p = uint_value(self.param["P"], 32)
358 self.o = str_value(self.param["O"])
359 self.u = str_value(self.param["U"])
360 self.length = int_value(self.param.get("Length", 40))
362 def init_key(self) -> None:
363 self.key = self.authenticate(self.password)
364 if self.key is None:
365 raise PDFPasswordIncorrect
367 def is_printable(self) -> bool:
368 return bool(self.p & 4)
370 def is_modifiable(self) -> bool:
371 return bool(self.p & 8)
373 def is_extractable(self) -> bool:
374 return bool(self.p & 16)
376 def compute_u(self, key: bytes) -> bytes:
377 if self.r == 2:
378 # Algorithm 3.4
379 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2
380 else:
381 # Algorithm 3.5
382 hash = md5(self.PASSWORD_PADDING) # 2
383 hash.update(self.docid[0]) # 3
384 result = Arcfour(key).encrypt(hash.digest()) # 4
385 for i in range(1, 20): # 5
386 k = b"".join(bytes((c ^ i,)) for c in iter(key))
387 result = Arcfour(k).encrypt(result)
388 result += result # 6
389 return result
391 def compute_encryption_key(self, password: bytes) -> bytes:
392 # Algorithm 3.2
393 password = (password + self.PASSWORD_PADDING)[:32] # 1
394 hash = md5(password) # 2
395 hash.update(self.o) # 3
396 # See https://github.com/pdfminer/pdfminer.six/issues/186
397 hash.update(struct.pack("<L", self.p)) # 4
398 hash.update(self.docid[0]) # 5
399 if self.r >= 4:
400 if not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata:
401 hash.update(b"\xff\xff\xff\xff")
402 result = hash.digest()
403 n = 5
404 if self.r >= 3:
405 n = self.length // 8
406 for _ in range(50):
407 result = md5(result[:n]).digest()
408 return result[:n]
410 def authenticate(self, password: str) -> Optional[bytes]:
411 password_bytes = password.encode("latin1")
412 key = self.authenticate_user_password(password_bytes)
413 if key is None:
414 key = self.authenticate_owner_password(password_bytes)
415 return key
417 def authenticate_user_password(self, password: bytes) -> Optional[bytes]:
418 key = self.compute_encryption_key(password)
419 if self.verify_encryption_key(key):
420 return key
421 else:
422 return None
424 def verify_encryption_key(self, key: bytes) -> bool:
425 # Algorithm 3.6
426 u = self.compute_u(key)
427 if self.r == 2:
428 return u == self.u
429 return u[:16] == self.u[:16]
431 def authenticate_owner_password(self, password: bytes) -> Optional[bytes]:
432 # Algorithm 3.7
433 password = (password + self.PASSWORD_PADDING)[:32]
434 hash = md5(password)
435 if self.r >= 3:
436 for _ in range(50):
437 hash = md5(hash.digest())
438 n = 5
439 if self.r >= 3:
440 n = self.length // 8
441 key = hash.digest()[:n]
442 if self.r == 2:
443 user_password = Arcfour(key).decrypt(self.o)
444 else:
445 user_password = self.o
446 for i in range(19, -1, -1):
447 k = b"".join(bytes((c ^ i,)) for c in iter(key))
448 user_password = Arcfour(k).decrypt(user_password)
449 return self.authenticate_user_password(user_password)
451 def decrypt(
452 self,
453 objid: int,
454 genno: int,
455 data: bytes,
456 attrs: Optional[Dict[str, Any]] = None,
457 ) -> bytes:
458 return self.decrypt_rc4(objid, genno, data)
460 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes:
461 assert self.key is not None
462 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2]
463 hash = md5(key)
464 key = hash.digest()[: min(len(key), 16)]
465 return Arcfour(key).decrypt(data)
468class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler):
469 supported_revisions: Tuple[int, ...] = (4,)
471 def init_params(self) -> None:
472 super().init_params()
473 self.length = 128
474 self.cf = dict_value(self.param.get("CF"))
475 self.stmf = literal_name(self.param["StmF"])
476 self.strf = literal_name(self.param["StrF"])
477 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True))
478 if self.stmf != self.strf:
479 error_msg = "Unsupported crypt filter: param=%r" % self.param
480 raise PDFEncryptionError(error_msg)
481 self.cfm = {}
482 for k, v in self.cf.items():
483 f = self.get_cfm(literal_name(v["CFM"]))
484 if f is None:
485 error_msg = "Unknown crypt filter method: param=%r" % self.param
486 raise PDFEncryptionError(error_msg)
487 self.cfm[k] = f
488 self.cfm["Identity"] = self.decrypt_identity
489 if self.strf not in self.cfm:
490 error_msg = "Undefined crypt filter: param=%r" % self.param
491 raise PDFEncryptionError(error_msg)
493 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
494 if name == "V2":
495 return self.decrypt_rc4
496 elif name == "AESV2":
497 return self.decrypt_aes128
498 else:
499 return None
501 def decrypt(
502 self,
503 objid: int,
504 genno: int,
505 data: bytes,
506 attrs: Optional[Dict[str, Any]] = None,
507 name: Optional[str] = None,
508 ) -> bytes:
509 if not self.encrypt_metadata and attrs is not None:
510 t = attrs.get("Type")
511 if t is not None and literal_name(t) == "Metadata":
512 return data
513 if name is None:
514 name = self.strf
515 return self.cfm[name](objid, genno, data)
517 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes:
518 return data
520 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes:
521 assert self.key is not None
522 key = (
523 self.key
524 + struct.pack("<L", objid)[:3]
525 + struct.pack("<L", genno)[:2]
526 + b"sAlT"
527 )
528 hash = md5(key)
529 key = hash.digest()[: min(len(key), 16)]
530 initialization_vector = data[:16]
531 ciphertext = data[16:]
532 cipher = Cipher(
533 algorithms.AES(key),
534 modes.CBC(initialization_vector),
535 backend=default_backend(),
536 ) # type: ignore
537 return cipher.decryptor().update(ciphertext) # type: ignore
540class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4):
541 supported_revisions = (5, 6)
543 def init_params(self) -> None:
544 super().init_params()
545 self.length = 256
546 self.oe = str_value(self.param["OE"])
547 self.ue = str_value(self.param["UE"])
548 self.o_hash = self.o[:32]
549 self.o_validation_salt = self.o[32:40]
550 self.o_key_salt = self.o[40:]
551 self.u_hash = self.u[:32]
552 self.u_validation_salt = self.u[32:40]
553 self.u_key_salt = self.u[40:]
555 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]:
556 if name == "AESV3":
557 return self.decrypt_aes256
558 else:
559 return None
561 def authenticate(self, password: str) -> Optional[bytes]:
562 password_b = self._normalize_password(password)
563 hash = self._password_hash(password_b, self.o_validation_salt, self.u)
564 if hash == self.o_hash:
565 hash = self._password_hash(password_b, self.o_key_salt, self.u)
566 cipher = Cipher(
567 algorithms.AES(hash),
568 modes.CBC(b"\0" * 16),
569 backend=default_backend(),
570 ) # type: ignore
571 return cipher.decryptor().update(self.oe) # type: ignore
572 hash = self._password_hash(password_b, self.u_validation_salt)
573 if hash == self.u_hash:
574 hash = self._password_hash(password_b, self.u_key_salt)
575 cipher = Cipher(
576 algorithms.AES(hash),
577 modes.CBC(b"\0" * 16),
578 backend=default_backend(),
579 ) # type: ignore
580 return cipher.decryptor().update(self.ue) # type: ignore
581 return None
583 def _normalize_password(self, password: str) -> bytes:
584 if self.r == 6:
585 # saslprep expects non-empty strings, apparently
586 if not password:
587 return b""
588 from pdfminer._saslprep import saslprep
590 password = saslprep(password)
591 return password.encode("utf-8")[:127]
593 def _password_hash(
594 self,
595 password: bytes,
596 salt: bytes,
597 vector: Optional[bytes] = None,
598 ) -> bytes:
599 """Compute password hash depending on revision number"""
600 if self.r == 5:
601 return self._r5_password(password, salt, vector)
602 return self._r6_password(password, salt[0:8], vector)
604 def _r5_password(
605 self,
606 password: bytes,
607 salt: bytes,
608 vector: Optional[bytes] = None,
609 ) -> bytes:
610 """Compute the password for revision 5"""
611 hash = sha256(password)
612 hash.update(salt)
613 if vector is not None:
614 hash.update(vector)
615 return hash.digest()
617 def _r6_password(
618 self,
619 password: bytes,
620 salt: bytes,
621 vector: Optional[bytes] = None,
622 ) -> bytes:
623 """Compute the password for revision 6"""
624 initial_hash = sha256(password)
625 initial_hash.update(salt)
626 if vector is not None:
627 initial_hash.update(vector)
628 k = initial_hash.digest()
629 hashes = (sha256, sha384, sha512)
630 round_no = last_byte_val = 0
631 while round_no < 64 or last_byte_val > round_no - 32:
632 k1 = (password + k + (vector or b"")) * 64
633 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1)
634 # compute the first 16 bytes of e,
635 # interpreted as an unsigned integer mod 3
636 next_hash = hashes[self._bytes_mod_3(e[:16])]
637 k = next_hash(e).digest()
638 last_byte_val = e[len(e) - 1]
639 round_no += 1
640 return k[:32]
642 @staticmethod
643 def _bytes_mod_3(input_bytes: bytes) -> int:
644 # 256 is 1 mod 3, so we can just sum 'em
645 return sum(b % 3 for b in input_bytes) % 3
647 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes:
648 cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
649 encryptor = cipher.encryptor() # type: ignore
650 return encryptor.update(data) + encryptor.finalize() # type: ignore
652 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes:
653 initialization_vector = data[:16]
654 ciphertext = data[16:]
655 assert self.key is not None
656 cipher = Cipher(
657 algorithms.AES(self.key),
658 modes.CBC(initialization_vector),
659 backend=default_backend(),
660 ) # type: ignore
661 return cipher.decryptor().update(ciphertext) # type: ignore
664class PDFDocument:
665 """PDFDocument object represents a PDF document.
667 Since a PDF file can be very big, normally it is not loaded at
668 once. So PDF document has to cooperate with a PDF parser in order to
669 dynamically import the data as processing goes.
671 Typical usage:
672 doc = PDFDocument(parser, password)
673 obj = doc.getobj(objid)
675 """
677 security_handler_registry: Dict[int, Type[PDFStandardSecurityHandler]] = {
678 1: PDFStandardSecurityHandler,
679 2: PDFStandardSecurityHandler,
680 4: PDFStandardSecurityHandlerV4,
681 5: PDFStandardSecurityHandlerV5,
682 }
684 def __init__(
685 self,
686 parser: PDFParser,
687 password: str = "",
688 caching: bool = True,
689 fallback: bool = True,
690 ) -> None:
691 """Set the document to use a given PDFParser object."""
692 self.caching = caching
693 self.xrefs: List[PDFBaseXRef] = []
694 self.info = []
695 self.catalog: Dict[str, Any] = {}
696 self.encryption: Optional[Tuple[Any, Any]] = None
697 self.decipher: Optional[DecipherCallable] = None
698 self._parser = None
699 self._cached_objs: Dict[int, Tuple[object, int]] = {}
700 self._parsed_objs: Dict[int, Tuple[List[object], int]] = {}
701 self._parser = parser
702 self._parser.set_document(self)
703 self.is_printable = self.is_modifiable = self.is_extractable = True
704 # Retrieve the information of each header that was appended
705 # (maybe multiple times) at the end of the document.
706 try:
707 pos = self.find_xref(parser)
708 self.read_xref_from(parser, pos, self.xrefs)
709 except PDFNoValidXRef:
710 if fallback:
711 parser.fallback = True
712 newxref = PDFXRefFallback()
713 newxref.load(parser)
714 self.xrefs.append(newxref)
716 for xref in self.xrefs:
717 trailer = xref.get_trailer()
718 if not trailer:
719 continue
720 # If there's an encryption info, remember it.
721 if "Encrypt" in trailer:
722 if "ID" in trailer:
723 id_value = list_value(trailer["ID"])
724 else:
725 # Some documents may not have a /ID, use two empty
726 # byte strings instead. Solves
727 # https://github.com/pdfminer/pdfminer.six/issues/594
728 id_value = (b"", b"")
729 self.encryption = (id_value, dict_value(trailer["Encrypt"]))
730 self._initialize_password(password)
731 if "Info" in trailer:
732 self.info.append(dict_value(trailer["Info"]))
733 if "Root" in trailer:
734 # Every PDF file must have exactly one /Root dictionary.
735 self.catalog = dict_value(trailer["Root"])
736 break
737 else:
738 raise PDFSyntaxError("No /Root object! - Is this really a PDF?")
739 if self.catalog.get("Type") is not LITERAL_CATALOG:
740 if settings.STRICT:
741 raise PDFSyntaxError("Catalog not found!")
743 KEYWORD_OBJ = KWD(b"obj")
745 # _initialize_password(password=b'')
746 # Perform the initialization with a given password.
747 def _initialize_password(self, password: str = "") -> None:
748 assert self.encryption is not None
749 (docid, param) = self.encryption
750 if literal_name(param.get("Filter")) != "Standard":
751 raise PDFEncryptionError("Unknown filter: param=%r" % param)
752 v = int_value(param.get("V", 0))
753 factory = self.security_handler_registry.get(v)
754 if factory is None:
755 raise PDFEncryptionError("Unknown algorithm: param=%r" % param)
756 handler = factory(docid, param, password)
757 self.decipher = handler.decrypt
758 self.is_printable = handler.is_printable()
759 self.is_modifiable = handler.is_modifiable()
760 self.is_extractable = handler.is_extractable()
761 assert self._parser is not None
762 self._parser.fallback = False # need to read streams with exact length
764 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object:
765 if stream.objid in self._parsed_objs:
766 (objs, n) = self._parsed_objs[stream.objid]
767 else:
768 (objs, n) = self._get_objects(stream)
769 if self.caching:
770 assert stream.objid is not None
771 self._parsed_objs[stream.objid] = (objs, n)
772 i = n * 2 + index
773 try:
774 obj = objs[i]
775 except IndexError:
776 raise PDFSyntaxError("index too big: %r" % index)
777 return obj
779 def _get_objects(self, stream: PDFStream) -> Tuple[List[object], int]:
780 if stream.get("Type") is not LITERAL_OBJSTM:
781 if settings.STRICT:
782 raise PDFSyntaxError("Not a stream object: %r" % stream)
783 try:
784 n = cast(int, stream["N"])
785 except KeyError:
786 if settings.STRICT:
787 raise PDFSyntaxError("N is not defined: %r" % stream)
788 n = 0
789 parser = PDFStreamParser(stream.get_data())
790 parser.set_document(self)
791 objs: List[object] = []
792 try:
793 while 1:
794 (_, obj) = parser.nextobject()
795 objs.append(obj)
796 except PSEOF:
797 pass
798 return (objs, n)
800 def _getobj_parse(self, pos: int, objid: int) -> object:
801 assert self._parser is not None
802 self._parser.seek(pos)
803 (_, objid1) = self._parser.nexttoken() # objid
804 (_, genno) = self._parser.nexttoken() # genno
805 (_, kwd) = self._parser.nexttoken()
806 # hack around malformed pdf files
807 # copied from https://github.com/jaepil/pdfminer3k/blob/master/
808 # pdfminer/pdfparser.py#L399
809 # to solve https://github.com/pdfminer/pdfminer.six/issues/56
810 # assert objid1 == objid, str((objid1, objid))
811 if objid1 != objid:
812 x = []
813 while kwd is not self.KEYWORD_OBJ:
814 (_, kwd) = self._parser.nexttoken()
815 x.append(kwd)
816 if len(x) >= 2:
817 objid1 = x[-2]
818 # #### end hack around malformed pdf files
819 if objid1 != objid:
820 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}")
822 if kwd != KWD(b"obj"):
823 raise PDFSyntaxError("Invalid object spec: offset=%r" % pos)
824 (_, obj) = self._parser.nextobject()
825 return obj
827 # can raise PDFObjectNotFound
828 def getobj(self, objid: int) -> object:
829 """Get object from PDF
831 :raises PDFException if PDFDocument is not initialized
832 :raises PDFObjectNotFound if objid does not exist in PDF
833 """
834 if not self.xrefs:
835 raise PDFException("PDFDocument is not initialized")
836 log.debug("getobj: objid=%r", objid)
837 if objid in self._cached_objs:
838 (obj, genno) = self._cached_objs[objid]
839 else:
840 for xref in self.xrefs:
841 try:
842 (strmid, index, genno) = xref.get_pos(objid)
843 except KeyError:
844 continue
845 try:
846 if strmid is not None:
847 stream = stream_value(self.getobj(strmid))
848 obj = self._getobj_objstm(stream, index, objid)
849 else:
850 obj = self._getobj_parse(index, objid)
851 if self.decipher:
852 obj = decipher_all(self.decipher, objid, genno, obj)
854 if isinstance(obj, PDFStream):
855 obj.set_objid(objid, genno)
856 break
857 except (PSEOF, PDFSyntaxError):
858 continue
859 else:
860 raise PDFObjectNotFound(objid)
861 log.debug("register: objid=%r: %r", objid, obj)
862 if self.caching:
863 self._cached_objs[objid] = (obj, genno)
864 return obj
866 OutlineType = Tuple[Any, Any, Any, Any, Any]
868 def get_outlines(self) -> Iterator[OutlineType]:
869 if "Outlines" not in self.catalog:
870 raise PDFNoOutlines
872 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]:
873 entry = dict_value(entry)
874 if "Title" in entry:
875 if "A" in entry or "Dest" in entry:
876 title = decode_text(str_value(entry["Title"]))
877 dest = entry.get("Dest")
878 action = entry.get("A")
879 se = entry.get("SE")
880 yield (level, title, dest, action, se)
881 if "First" in entry and "Last" in entry:
882 yield from search(entry["First"], level + 1)
883 if "Next" in entry:
884 yield from search(entry["Next"], level)
886 return search(self.catalog["Outlines"], 0)
888 def get_page_labels(self) -> Iterator[str]:
889 """Generate page label strings for the PDF document.
891 If the document includes page labels, generates strings, one per page.
892 If not, raises PDFNoPageLabels.
894 The resulting iteration is unbounded.
895 """
896 assert self.catalog is not None
898 try:
899 page_labels = PageLabels(self.catalog["PageLabels"])
900 except (PDFTypeError, KeyError):
901 raise PDFNoPageLabels
903 return page_labels.labels
905 def lookup_name(self, cat: str, key: Union[str, bytes]) -> Any:
906 try:
907 names = dict_value(self.catalog["Names"])
908 except (PDFTypeError, KeyError):
909 raise PDFKeyError((cat, key))
910 # may raise KeyError
911 d0 = dict_value(names[cat])
913 def lookup(d: Dict[str, Any]) -> Any:
914 if "Limits" in d:
915 (k1, k2) = list_value(d["Limits"])
916 if key < k1 or k2 < key:
917 return None
918 if "Names" in d:
919 objs = list_value(d["Names"])
920 names = dict(
921 cast(Iterator[Tuple[Union[str, bytes], Any]], choplist(2, objs)),
922 )
923 return names[key]
924 if "Kids" in d:
925 for c in list_value(d["Kids"]):
926 v = lookup(dict_value(c))
927 if v:
928 return v
929 raise PDFKeyError((cat, key))
931 return lookup(d0)
933 def get_dest(self, name: Union[str, bytes]) -> Any:
934 try:
935 # PDF-1.2 or later
936 obj = self.lookup_name("Dests", name)
937 except KeyError:
938 # PDF-1.1 or prior
939 if "Dests" not in self.catalog:
940 raise PDFDestinationNotFound(name)
941 d0 = dict_value(self.catalog["Dests"])
942 if name not in d0:
943 raise PDFDestinationNotFound(name)
944 obj = d0[name]
945 return obj
947 # find_xref
948 def find_xref(self, parser: PDFParser) -> int:
949 """Internal function used to locate the first XRef."""
950 # search the last xref table by scanning the file backwards.
951 prev = b""
952 for line in parser.revreadlines():
953 line = line.strip()
954 log.debug("find_xref: %r", line)
956 if line == b"startxref":
957 log.debug("xref found: pos=%r", prev)
959 if not prev.isdigit():
960 raise PDFNoValidXRef(f"Invalid xref position: {prev!r}")
962 start = int(prev)
964 if not start >= 0:
965 raise PDFNoValidXRef(f"Invalid negative xref position: {start}")
967 return start
969 if line:
970 prev = line
972 raise PDFNoValidXRef("Unexpected EOF")
974 # read xref table
975 def read_xref_from(
976 self,
977 parser: PDFParser,
978 start: int,
979 xrefs: List[PDFBaseXRef],
980 ) -> None:
981 """Reads XRefs from the given location."""
982 parser.seek(start)
983 parser.reset()
984 try:
985 (pos, token) = parser.nexttoken()
986 except PSEOF:
987 raise PDFNoValidXRef("Unexpected EOF")
988 log.debug("read_xref_from: start=%d, token=%r", start, token)
989 if isinstance(token, int):
990 # XRefStream: PDF-1.5
991 parser.seek(pos)
992 parser.reset()
993 xref: PDFBaseXRef = PDFXRefStream()
994 xref.load(parser)
995 else:
996 if token is parser.KEYWORD_XREF:
997 parser.nextline()
998 xref = PDFXRef()
999 xref.load(parser)
1000 xrefs.append(xref)
1001 trailer = xref.get_trailer()
1002 log.debug("trailer: %r", trailer)
1003 if "XRefStm" in trailer:
1004 pos = int_value(trailer["XRefStm"])
1005 self.read_xref_from(parser, pos, xrefs)
1006 if "Prev" in trailer:
1007 # find previous xref
1008 pos = int_value(trailer["Prev"])
1009 self.read_xref_from(parser, pos, xrefs)
1012class PageLabels(NumberTree):
1013 """PageLabels from the document catalog.
1015 See Section 8.3.1 in the PDF Reference.
1016 """
1018 @property
1019 def labels(self) -> Iterator[str]:
1020 ranges = self.values
1022 # The tree must begin with page index 0
1023 if len(ranges) == 0 or ranges[0][0] != 0:
1024 if settings.STRICT:
1025 raise PDFSyntaxError("PageLabels is missing page index 0")
1026 else:
1027 # Try to cope, by assuming empty labels for the initial pages
1028 ranges.insert(0, (0, {}))
1030 for next, (start, label_dict_unchecked) in enumerate(ranges, 1):
1031 label_dict = dict_value(label_dict_unchecked)
1032 style = label_dict.get("S")
1033 prefix = decode_text(str_value(label_dict.get("P", b"")))
1034 first_value = int_value(label_dict.get("St", 1))
1036 if next == len(ranges):
1037 # This is the last specified range. It continues until the end
1038 # of the document.
1039 values: Iterable[int] = itertools.count(first_value)
1040 else:
1041 end, _ = ranges[next]
1042 range_length = end - start
1043 values = range(first_value, first_value + range_length)
1045 for value in values:
1046 label = self._format_page_label(value, style)
1047 yield prefix + label
1049 @staticmethod
1050 def _format_page_label(value: int, style: Any) -> str:
1051 """Format page label value in a specific style"""
1052 if style is None:
1053 label = ""
1054 elif style is LIT("D"): # Decimal arabic numerals
1055 label = str(value)
1056 elif style is LIT("R"): # Uppercase roman numerals
1057 label = format_int_roman(value).upper()
1058 elif style is LIT("r"): # Lowercase roman numerals
1059 label = format_int_roman(value)
1060 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ...
1061 label = format_int_alpha(value).upper()
1062 elif style is LIT("a"): # Lowercase letters a-z, aa-zz...
1063 label = format_int_alpha(value)
1064 else:
1065 log.warning("Unknown page label style: %r", style)
1066 label = ""
1067 return label