Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 62%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import itertools
2import logging
3import re
4import struct
5from collections.abc import Callable, Iterable, Iterator, KeysView, Sequence
6from hashlib import md5, sha256, sha384, sha512
7from typing import (
8 Any,
9 ClassVar,
10 cast,
11)
13from cryptography.hazmat.backends import default_backend
14from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
16from pdfminer import settings
17from pdfminer.arcfour import Arcfour
18from pdfminer.casting import safe_int
19from pdfminer.data_structures import NumberTree
20from pdfminer.pdfexceptions import (
21 PDFException,
22 PDFKeyError,
23 PDFObjectNotFound,
24 PDFTypeError,
25)
26from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError
27from pdfminer.pdftypes import (
28 DecipherCallable,
29 PDFStream,
30 decipher_all,
31 dict_value,
32 int_value,
33 list_value,
34 str_value,
35 stream_value,
36 uint_value,
37)
38from pdfminer.psexceptions import PSEOF
39from pdfminer.psparser import KWD, LIT, literal_name
40from pdfminer.utils import (
41 choplist,
42 decode_text,
43 format_int_alpha,
44 format_int_roman,
45 nunpack,
46 unpad_aes,
47)
49log = logging.getLogger(__name__)
52class PDFNoValidXRef(PDFSyntaxError):
53 pass
56class PDFNoValidXRefWarning(SyntaxWarning):
57 """Legacy warning for missing xref.
59 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
60 """
63class PDFNoOutlines(PDFException):
64 pass
67class PDFNoPageLabels(PDFException):
68 pass
71class PDFDestinationNotFound(PDFException):
72 pass
75class PDFEncryptionError(PDFException):
76 pass
79class PDFPasswordIncorrect(PDFEncryptionError):
80 pass
83class PDFEncryptionWarning(UserWarning):
84 """Legacy warning for failed decryption.
86 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
87 """
90class PDFTextExtractionNotAllowedWarning(UserWarning):
91 """Legacy warning for PDF that does not allow extraction.
93 Not used anymore because warnings.warn is replaced by logger.Logger.warn.
94 """
97class PDFTextExtractionNotAllowed(PDFEncryptionError):
98 pass
101# some predefined literals and keywords.
102LITERAL_OBJSTM = LIT("ObjStm")
103LITERAL_XREF = LIT("XRef")
104LITERAL_CATALOG = LIT("Catalog")
107class PDFBaseXRef:
108 def get_trailer(self) -> dict[str, Any]:
109 raise NotImplementedError
111 def get_objids(self) -> Iterable[int]:
112 return []
114 # Must return
115 # (strmid, index, genno)
116 # or (None, pos, genno)
117 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
118 raise PDFKeyError(objid)
120 def load(self, parser: PDFParser) -> None:
121 raise NotImplementedError
124class PDFXRef(PDFBaseXRef):
125 def __init__(self) -> None:
126 self.offsets: dict[int, tuple[int | None, int, int]] = {}
127 self.trailer: dict[str, Any] = {}
129 def __repr__(self) -> str:
130 return f"<PDFXRef: offsets={self.offsets.keys()!r}>"
132 def load(self, parser: PDFParser) -> None:
133 while True:
134 try:
135 (pos, line) = parser.nextline()
136 line = line.strip()
137 if not line:
138 continue
139 except PSEOF as err:
140 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err
141 if line.startswith(b"trailer"):
142 parser.seek(pos)
143 break
144 f = line.split(b" ")
145 if len(f) != 2:
146 error_msg = f"Trailer not found: {parser!r}: line={line!r}"
147 raise PDFNoValidXRef(error_msg)
148 try:
149 (start, nobjs) = map(int, f)
150 except ValueError as err:
151 error_msg = f"Invalid line: {parser!r}: line={line!r}"
152 raise PDFNoValidXRef(error_msg) from err
153 for objid in range(start, start + nobjs):
154 try:
155 (_, line) = parser.nextline()
156 line = line.strip()
157 except PSEOF as err:
158 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err
159 f = line.split(b" ")
160 if len(f) != 3:
161 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}"
162 raise PDFNoValidXRef(error_msg)
163 (pos_b, genno_b, use_b) = f
164 if use_b != b"n":
165 continue
167 pos_i = safe_int(pos_b)
168 genno_i = safe_int(genno_b)
169 if pos_i is not None and genno_i is not None:
170 self.offsets[objid] = (None, pos_i, genno_i)
171 else:
172 log.warning(
173 "Not adding object %s to xref because position %r "
174 "or generation number %r cannot be parsed as an int",
175 objid,
176 pos_b,
177 genno_b,
178 )
180 log.debug("xref objects: %r", self.offsets)
181 self.load_trailer(parser)
183 def load_trailer(self, parser: PDFParser) -> None:
184 try:
185 (_, kwd) = parser.nexttoken()
186 assert kwd is KWD(b"trailer"), str(kwd)
187 (_, dic) = parser.nextobject()
188 except PSEOF:
189 x = parser.pop(1)
190 if not x:
191 raise PDFNoValidXRef("Unexpected EOF - file corrupted") from None
192 (_, dic) = x[0]
193 self.trailer.update(dict_value(dic))
194 log.debug("trailer=%r", self.trailer)
196 def get_trailer(self) -> dict[str, Any]:
197 return self.trailer
199 def get_objids(self) -> KeysView[int]:
200 return self.offsets.keys()
202 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
203 return self.offsets[objid]
206class PDFXRefFallback(PDFXRef):
207 def __repr__(self) -> str:
208 return f"<PDFXRefFallback: offsets={self.offsets.keys()!r}>"
210 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b")
212 def load(self, parser: PDFParser) -> None:
213 parser.seek(0)
214 while 1:
215 try:
216 (pos, line_bytes) = parser.nextline()
217 except PSEOF:
218 break
219 if line_bytes.startswith(b"trailer"):
220 parser.seek(pos)
221 self.load_trailer(parser)
222 log.debug("trailer: %r", self.trailer)
223 break
224 line = line_bytes.decode("latin-1") # default pdf encoding
225 m = self.PDFOBJ_CUE.match(line)
226 if not m:
227 continue
228 (objid_s, genno_s) = m.groups()
229 objid = int(objid_s)
230 genno = int(genno_s)
231 self.offsets[objid] = (None, pos, genno)
232 # expand ObjStm.
233 parser.seek(pos)
234 (_, obj) = parser.nextobject()
235 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM:
236 stream = stream_value(obj)
237 try:
238 n = stream["N"]
239 except KeyError:
240 if settings.STRICT:
241 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None
242 n = 0
243 parser1 = PDFStreamParser(stream.get_data())
244 objs: list[int] = []
245 try:
246 while 1:
247 (_, obj) = parser1.nextobject()
248 objs.append(cast(int, obj))
249 except PSEOF:
250 pass
251 n = min(n, len(objs) // 2)
252 for index in range(n):
253 objid1 = objs[index * 2]
254 self.offsets[objid1] = (objid, index, 0)
257class PDFXRefStream(PDFBaseXRef):
258 def __init__(self) -> None:
259 self.data: bytes | None = None
260 self.entlen: int | None = None
261 self.fl1: int | None = None
262 self.fl2: int | None = None
263 self.fl3: int | None = None
264 self.ranges: list[tuple[int, int]] = []
266 def __repr__(self) -> str:
267 return f"<PDFXRefStream: ranges={self.ranges!r}>"
269 def load(self, parser: PDFParser) -> None:
270 (_, _objid) = parser.nexttoken() # ignored
271 (_, _genno) = parser.nexttoken() # ignored
272 (_, _kwd) = parser.nexttoken()
273 (_, stream) = parser.nextobject()
274 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF:
275 raise PDFNoValidXRef("Invalid PDF stream spec.")
276 size = stream["Size"]
277 index_array = stream.get("Index", (0, size))
278 if len(index_array) % 2 != 0:
279 raise PDFSyntaxError("Invalid index number")
280 self.ranges.extend(cast(Iterator[tuple[int, int]], choplist(2, index_array)))
281 (self.fl1, self.fl2, self.fl3) = stream["W"]
282 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
283 self.data = stream.get_data()
284 self.entlen = self.fl1 + self.fl2 + self.fl3
285 self.trailer = stream.attrs
286 log.debug(
287 "xref stream: objid=%s, fields=%d,%d,%d",
288 ", ".join(map(repr, self.ranges)),
289 self.fl1,
290 self.fl2,
291 self.fl3,
292 )
294 def get_trailer(self) -> dict[str, Any]:
295 return self.trailer
297 def get_objids(self) -> Iterator[int]:
298 for start, nobjs in self.ranges:
299 for i in range(nobjs):
300 assert self.entlen is not None
301 assert self.data is not None
302 offset = self.entlen * i
303 ent = self.data[offset : offset + self.entlen]
304 f1 = nunpack(ent[: self.fl1], 1)
305 if f1 == 1 or f1 == 2:
306 yield start + i
308 def get_pos(self, objid: int) -> tuple[int | None, int, int]:
309 index = 0
310 for start, nobjs in self.ranges:
311 if start <= objid and objid < start + nobjs:
312 index += objid - start
313 break
314 else:
315 index += nobjs
316 else:
317 raise PDFKeyError(objid)
318 assert self.entlen is not None
319 assert self.data is not None
320 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None
321 offset = self.entlen * index
322 ent = self.data[offset : offset + self.entlen]
323 f1 = nunpack(ent[: self.fl1], 1)
324 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2])
325 f3 = nunpack(ent[self.fl1 + self.fl2 :])
326 if f1 == 1:
327 return (None, f2, f3)
328 elif f1 == 2:
329 return (f2, f3, 0)
330 else:
331 # this is a free object
332 raise PDFKeyError(objid)
335class PDFStandardSecurityHandler:
336 PASSWORD_PADDING = (
337 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz"
338 )
339 supported_revisions: tuple[int, ...] = (2, 3)
341 def __init__(
342 self,
343 docid: Sequence[bytes],
344 param: dict[str, Any],
345 password: str = "",
346 ) -> None:
347 self.docid = docid
348 self.param = param
349 self.password = password
350 self.init()
352 def init(self) -> None:
353 self.init_params()
354 if self.r not in self.supported_revisions:
355 error_msg = f"Unsupported revision: param={self.param!r}"
356 raise PDFEncryptionError(error_msg)
357 self.init_key()
359 def init_params(self) -> None:
360 self.v = int_value(self.param.get("V", 0))
361 self.r = int_value(self.param["R"])
362 self.p = uint_value(self.param["P"], 32)
363 self.o = str_value(self.param["O"])
364 self.u = str_value(self.param["U"])
365 self.length = int_value(self.param.get("Length", 40))
367 def init_key(self) -> None:
368 self.key = self.authenticate(self.password)
369 if self.key is None:
370 raise PDFPasswordIncorrect
372 def is_printable(self) -> bool:
373 return bool(self.p & 4)
375 def is_modifiable(self) -> bool:
376 return bool(self.p & 8)
378 def is_extractable(self) -> bool:
379 return bool(self.p & 16)
381 def compute_u(self, key: bytes) -> bytes:
382 if self.r == 2:
383 # Algorithm 3.4
384 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2
385 else:
386 # Algorithm 3.5
387 hash = md5(self.PASSWORD_PADDING) # 2
388 hash.update(self.docid[0]) # 3
389 result = Arcfour(key).encrypt(hash.digest()) # 4
390 for i in range(1, 20): # 5
391 k = b"".join(bytes((c ^ i,)) for c in iter(key))
392 result = Arcfour(k).encrypt(result)
393 result += result # 6
394 return result
396 def compute_encryption_key(self, password: bytes) -> bytes:
397 # Algorithm 3.2
398 password = (password + self.PASSWORD_PADDING)[:32] # 1
399 hash = md5(password) # 2
400 hash.update(self.o) # 3
401 # See https://github.com/pdfminer/pdfminer.six/issues/186
402 hash.update(struct.pack("<L", self.p)) # 4
403 hash.update(self.docid[0]) # 5
404 if (
405 self.r >= 4
406 and not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata
407 ):
408 hash.update(b"\xff\xff\xff\xff")
409 result = hash.digest()
410 n = 5
411 if self.r >= 3:
412 n = self.length // 8
413 for _ in range(50):
414 result = md5(result[:n]).digest()
415 return result[:n]
417 def authenticate(self, password: str) -> bytes | None:
418 password_bytes = password.encode("latin1")
419 key = self.authenticate_user_password(password_bytes)
420 if key is None:
421 key = self.authenticate_owner_password(password_bytes)
422 return key
424 def authenticate_user_password(self, password: bytes) -> bytes | None:
425 key = self.compute_encryption_key(password)
426 if self.verify_encryption_key(key):
427 return key
428 else:
429 return None
431 def verify_encryption_key(self, key: bytes) -> bool:
432 # Algorithm 3.6
433 u = self.compute_u(key)
434 if self.r == 2:
435 return u == self.u
436 return u[:16] == self.u[:16]
438 def authenticate_owner_password(self, password: bytes) -> bytes | None:
439 # Algorithm 3.7
440 password = (password + self.PASSWORD_PADDING)[:32]
441 hash = md5(password)
442 if self.r >= 3:
443 for _ in range(50):
444 hash = md5(hash.digest())
445 n = 5
446 if self.r >= 3:
447 n = self.length // 8
448 key = hash.digest()[:n]
449 if self.r == 2:
450 user_password = Arcfour(key).decrypt(self.o)
451 else:
452 user_password = self.o
453 for i in range(19, -1, -1):
454 k = b"".join(bytes((c ^ i,)) for c in iter(key))
455 user_password = Arcfour(k).decrypt(user_password)
456 return self.authenticate_user_password(user_password)
458 def decrypt(
459 self,
460 objid: int,
461 genno: int,
462 data: bytes,
463 attrs: dict[str, Any] | None = None,
464 ) -> bytes:
465 return self.decrypt_rc4(objid, genno, data)
467 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes:
468 assert self.key is not None
469 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2]
470 hash = md5(key)
471 key = hash.digest()[: min(len(key), 16)]
472 return Arcfour(key).decrypt(data)
475class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler):
476 supported_revisions: tuple[int, ...] = (4,)
478 def init_params(self) -> None:
479 super().init_params()
480 self.length = 128
481 self.cf = dict_value(self.param.get("CF"))
482 self.stmf = literal_name(self.param["StmF"])
483 self.strf = literal_name(self.param["StrF"])
484 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True))
485 if self.stmf != self.strf:
486 error_msg = f"Unsupported crypt filter: param={self.param!r}"
487 raise PDFEncryptionError(error_msg)
488 self.cfm = {}
489 for k, v in self.cf.items():
490 f = self.get_cfm(literal_name(v["CFM"]))
491 if f is None:
492 error_msg = f"Unknown crypt filter method: param={self.param!r}"
493 raise PDFEncryptionError(error_msg)
494 self.cfm[k] = f
495 self.cfm["Identity"] = self.decrypt_identity
496 if self.strf not in self.cfm:
497 error_msg = f"Undefined crypt filter: param={self.param!r}"
498 raise PDFEncryptionError(error_msg)
500 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None:
501 if name == "V2":
502 return self.decrypt_rc4
503 elif name == "AESV2":
504 return self.decrypt_aes128
505 else:
506 return None
508 def decrypt(
509 self,
510 objid: int,
511 genno: int,
512 data: bytes,
513 attrs: dict[str, Any] | None = None,
514 name: str | None = None,
515 ) -> bytes:
516 if not self.encrypt_metadata and attrs is not None:
517 t = attrs.get("Type")
518 if t is not None and literal_name(t) == "Metadata":
519 return data
520 if name is None:
521 name = self.strf
522 return self.cfm[name](objid, genno, data)
524 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes:
525 return data
527 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes:
528 assert self.key is not None
529 key = (
530 self.key
531 + struct.pack("<L", objid)[:3]
532 + struct.pack("<L", genno)[:2]
533 + b"sAlT"
534 )
535 hash = md5(key)
536 key = hash.digest()[: min(len(key), 16)]
537 initialization_vector = data[:16]
538 ciphertext = data[16:]
539 cipher = Cipher(
540 algorithms.AES(key),
541 modes.CBC(initialization_vector),
542 backend=default_backend(),
543 ) # type: ignore
544 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
545 return unpad_aes(plaintext)
548class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4):
549 supported_revisions = (5, 6)
551 def init_params(self) -> None:
552 super().init_params()
553 self.length = 256
554 self.oe = str_value(self.param["OE"])
555 self.ue = str_value(self.param["UE"])
556 self.o_hash = self.o[:32]
557 self.o_validation_salt = self.o[32:40]
558 self.o_key_salt = self.o[40:]
559 self.u_hash = self.u[:32]
560 self.u_validation_salt = self.u[32:40]
561 self.u_key_salt = self.u[40:]
563 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None:
564 if name == "AESV3":
565 return self.decrypt_aes256
566 else:
567 return None
569 def authenticate(self, password: str) -> bytes | None:
570 password_b = self._normalize_password(password)
571 hash = self._password_hash(password_b, self.o_validation_salt, self.u)
572 if hash == self.o_hash:
573 hash = self._password_hash(password_b, self.o_key_salt, self.u)
574 cipher = Cipher(
575 algorithms.AES(hash),
576 modes.CBC(b"\0" * 16),
577 backend=default_backend(),
578 ) # type: ignore
579 return cipher.decryptor().update(self.oe) # type: ignore
580 hash = self._password_hash(password_b, self.u_validation_salt)
581 if hash == self.u_hash:
582 hash = self._password_hash(password_b, self.u_key_salt)
583 cipher = Cipher(
584 algorithms.AES(hash),
585 modes.CBC(b"\0" * 16),
586 backend=default_backend(),
587 ) # type: ignore
588 return cipher.decryptor().update(self.ue) # type: ignore
589 return None
591 def _normalize_password(self, password: str) -> bytes:
592 if self.r == 6:
593 # saslprep expects non-empty strings, apparently
594 if not password:
595 return b""
596 from pdfminer._saslprep import saslprep
598 password = saslprep(password)
599 return password.encode("utf-8")[:127]
601 def _password_hash(
602 self,
603 password: bytes,
604 salt: bytes,
605 vector: bytes | None = None,
606 ) -> bytes:
607 """Compute password hash depending on revision number"""
608 if self.r == 5:
609 return self._r5_password(password, salt, vector)
610 return self._r6_password(password, salt[0:8], vector)
612 def _r5_password(
613 self,
614 password: bytes,
615 salt: bytes,
616 vector: bytes | None = None,
617 ) -> bytes:
618 """Compute the password for revision 5"""
619 hash = sha256(password)
620 hash.update(salt)
621 if vector is not None:
622 hash.update(vector)
623 return hash.digest()
625 def _r6_password(
626 self,
627 password: bytes,
628 salt: bytes,
629 vector: bytes | None = None,
630 ) -> bytes:
631 """Compute the password for revision 6"""
632 initial_hash = sha256(password)
633 initial_hash.update(salt)
634 if vector is not None:
635 initial_hash.update(vector)
636 k = initial_hash.digest()
637 hashes = (sha256, sha384, sha512)
638 round_no = last_byte_val = 0
639 while round_no < 64 or last_byte_val > round_no - 32:
640 k1 = (password + k + (vector or b"")) * 64
641 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1)
642 # compute the first 16 bytes of e,
643 # interpreted as an unsigned integer mod 3
644 next_hash = hashes[self._bytes_mod_3(e[:16])]
645 k = next_hash(e).digest()
646 last_byte_val = e[len(e) - 1]
647 round_no += 1
648 return k[:32]
650 @staticmethod
651 def _bytes_mod_3(input_bytes: bytes) -> int:
652 # 256 is 1 mod 3, so we can just sum 'em
653 return sum(b % 3 for b in input_bytes) % 3
655 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes:
656 cipher = Cipher(algorithms.AES(key), modes.CBC(iv))
657 encryptor = cipher.encryptor() # type: ignore
658 return encryptor.update(data) + encryptor.finalize() # type: ignore
660 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes:
661 initialization_vector = data[:16]
662 ciphertext = data[16:]
663 assert self.key is not None
664 cipher = Cipher(
665 algorithms.AES(self.key),
666 modes.CBC(initialization_vector),
667 backend=default_backend(),
668 ) # type: ignore
669 plaintext = cipher.decryptor().update(ciphertext) # type: ignore
670 return unpad_aes(plaintext)
673class PDFDocument:
674 """PDFDocument object represents a PDF document.
676 Since a PDF file can be very big, normally it is not loaded at
677 once. So PDF document has to cooperate with a PDF parser in order to
678 dynamically import the data as processing goes.
680 Typical usage:
681 doc = PDFDocument(parser, password)
682 obj = doc.getobj(objid)
684 """
686 security_handler_registry: ClassVar[dict[int, type[PDFStandardSecurityHandler]]] = {
687 1: PDFStandardSecurityHandler,
688 2: PDFStandardSecurityHandler,
689 4: PDFStandardSecurityHandlerV4,
690 5: PDFStandardSecurityHandlerV5,
691 }
693 def __init__(
694 self,
695 parser: PDFParser,
696 password: str = "",
697 caching: bool = True,
698 fallback: bool = True,
699 ) -> None:
700 """Set the document to use a given PDFParser object."""
701 self.caching = caching
702 self.xrefs: list[PDFBaseXRef] = []
703 self.info = []
704 self.catalog: dict[str, Any] = {}
705 self.encryption: tuple[Any, Any] | None = None
706 self.decipher: DecipherCallable | None = None
707 self._parser = None
708 self._cached_objs: dict[int, tuple[object, int]] = {}
709 self._parsed_objs: dict[int, tuple[list[object], int]] = {}
710 self._parser = parser
711 self._parser.set_document(self)
712 self.is_printable = self.is_modifiable = self.is_extractable = True
713 # Retrieve the information of each header that was appended
714 # (maybe multiple times) at the end of the document.
715 try:
716 pos = self.find_xref(parser)
717 self.read_xref_from(parser, pos, self.xrefs)
718 except PDFNoValidXRef:
719 if fallback:
720 parser.fallback = True
721 newxref = PDFXRefFallback()
722 newxref.load(parser)
723 self.xrefs.append(newxref)
725 for xref in self.xrefs:
726 trailer = xref.get_trailer()
727 if not trailer:
728 continue
729 # If there's an encryption info, remember it.
730 if "Encrypt" in trailer:
731 # Some documents may not have a /ID, use two empty
732 # byte strings instead. Solves
733 # https://github.com/pdfminer/pdfminer.six/issues/594
734 id_value = list_value(trailer["ID"]) if "ID" in trailer else (b"", b"")
735 self.encryption = (id_value, dict_value(trailer["Encrypt"]))
736 self._initialize_password(password)
737 if "Info" in trailer:
738 self.info.append(dict_value(trailer["Info"]))
739 if "Root" in trailer:
740 # Every PDF file must have exactly one /Root dictionary.
741 self.catalog = dict_value(trailer["Root"])
742 break
743 else:
744 raise PDFSyntaxError("No /Root object! - Is this really a PDF?")
745 if self.catalog.get("Type") is not LITERAL_CATALOG and settings.STRICT:
746 raise PDFSyntaxError("Catalog not found!")
748 KEYWORD_OBJ = KWD(b"obj")
750 # _initialize_password(password=b'')
751 # Perform the initialization with a given password.
752 def _initialize_password(self, password: str = "") -> None:
753 assert self.encryption is not None
754 (docid, param) = self.encryption
755 if literal_name(param.get("Filter")) != "Standard":
756 raise PDFEncryptionError(f"Unknown filter: param={param!r}")
757 v = int_value(param.get("V", 0))
758 factory = self.security_handler_registry.get(v)
759 if factory is None:
760 raise PDFEncryptionError(f"Unknown algorithm: param={param!r}")
761 handler = factory(docid, param, password)
762 self.decipher = handler.decrypt
763 self.is_printable = handler.is_printable()
764 self.is_modifiable = handler.is_modifiable()
765 self.is_extractable = handler.is_extractable()
766 assert self._parser is not None
767 self._parser.fallback = False # need to read streams with exact length
769 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object:
770 if stream.objid in self._parsed_objs:
771 (objs, n) = self._parsed_objs[stream.objid]
772 else:
773 (objs, n) = self._get_objects(stream)
774 if self.caching:
775 assert stream.objid is not None
776 self._parsed_objs[stream.objid] = (objs, n)
777 i = n * 2 + index
778 try:
779 obj = objs[i]
780 except IndexError as err:
781 raise PDFSyntaxError(f"index too big: {index!r}") from err
782 return obj
784 def _get_objects(self, stream: PDFStream) -> tuple[list[object], int]:
785 if stream.get("Type") is not LITERAL_OBJSTM and settings.STRICT:
786 raise PDFSyntaxError(f"Not a stream object: {stream!r}")
787 try:
788 n = cast(int, stream["N"])
789 except KeyError:
790 if settings.STRICT:
791 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None
792 n = 0
793 parser = PDFStreamParser(stream.get_data())
794 parser.set_document(self)
795 objs: list[object] = []
796 try:
797 while 1:
798 (_, obj) = parser.nextobject()
799 objs.append(obj)
800 except PSEOF:
801 pass
802 return (objs, n)
804 def _getobj_parse(self, pos: int, objid: int) -> object:
805 assert self._parser is not None
806 self._parser.seek(pos)
807 (_, objid1) = self._parser.nexttoken() # objid
808 (_, _genno) = self._parser.nexttoken() # genno
809 (_, kwd) = self._parser.nexttoken()
810 # hack around malformed pdf files
811 # copied from https://github.com/jaepil/pdfminer3k/blob/master/
812 # pdfminer/pdfparser.py#L399
813 # to solve https://github.com/pdfminer/pdfminer.six/issues/56
814 # assert objid1 == objid, str((objid1, objid))
815 if objid1 != objid:
816 x = []
817 while kwd is not self.KEYWORD_OBJ:
818 (_, kwd) = self._parser.nexttoken()
819 x.append(kwd)
820 if len(x) >= 2:
821 objid1 = x[-2]
822 # #### end hack around malformed pdf files
823 if objid1 != objid:
824 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}")
826 if kwd != KWD(b"obj"):
827 raise PDFSyntaxError(f"Invalid object spec: offset={pos!r}")
828 (_, obj) = self._parser.nextobject()
829 return obj
831 # can raise PDFObjectNotFound
832 def getobj(self, objid: int) -> object:
833 """Get object from PDF
835 :raises PDFException if PDFDocument is not initialized
836 :raises PDFObjectNotFound if objid does not exist in PDF
837 """
838 if not self.xrefs:
839 raise PDFException("PDFDocument is not initialized")
840 log.debug("getobj: objid=%r", objid)
841 obj: object # Initialize to satisfy mypy; always assigned in branches below
842 genno: int
843 if objid in self._cached_objs:
844 (obj, genno) = self._cached_objs[objid]
845 else:
846 for xref in self.xrefs:
847 try:
848 (strmid, index, genno) = xref.get_pos(objid)
849 except KeyError:
850 continue
851 try:
852 if strmid is not None:
853 stream = stream_value(self.getobj(strmid))
854 obj = self._getobj_objstm(stream, index, objid)
855 else:
856 obj = self._getobj_parse(index, objid)
857 if self.decipher:
858 obj = decipher_all(self.decipher, objid, genno, obj)
860 if isinstance(obj, PDFStream):
861 obj.set_objid(objid, genno)
862 break
863 except (PSEOF, PDFSyntaxError):
864 continue
865 else:
866 raise PDFObjectNotFound(objid)
867 log.debug("register: objid=%r: %r", objid, obj)
868 if self.caching:
869 self._cached_objs[objid] = (obj, genno)
870 return obj
872 OutlineType = tuple[Any, Any, Any, Any, Any]
874 def get_outlines(self) -> Iterator[OutlineType]:
875 if "Outlines" not in self.catalog:
876 raise PDFNoOutlines
878 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]:
879 entry = dict_value(entry)
880 if "Title" in entry and ("A" in entry or "Dest" in entry):
881 title = decode_text(str_value(entry["Title"]))
882 dest = entry.get("Dest")
883 action = entry.get("A")
884 se = entry.get("SE")
885 yield (level, title, dest, action, se)
886 if "First" in entry and "Last" in entry:
887 yield from search(entry["First"], level + 1)
888 if "Next" in entry:
889 yield from search(entry["Next"], level)
891 return search(self.catalog["Outlines"], 0)
893 def get_page_labels(self) -> Iterator[str]:
894 """Generate page label strings for the PDF document.
896 If the document includes page labels, generates strings, one per page.
897 If not, raises PDFNoPageLabels.
899 The resulting iteration is unbounded.
900 """
901 assert self.catalog is not None
903 try:
904 page_labels = PageLabels(self.catalog["PageLabels"])
905 except (PDFTypeError, KeyError) as err:
906 raise PDFNoPageLabels from err
908 return page_labels.labels
910 def lookup_name(self, cat: str, key: str | bytes) -> Any:
911 try:
912 names = dict_value(self.catalog["Names"])
913 except (PDFTypeError, KeyError) as err:
914 raise PDFKeyError((cat, key)) from err
915 # may raise KeyError
916 d0 = dict_value(names[cat])
918 def lookup(d: dict[str, Any]) -> Any:
919 if "Limits" in d:
920 (k1, k2) = list_value(d["Limits"])
921 if key < k1 or k2 < key:
922 return None
923 if "Names" in d:
924 objs = list_value(d["Names"])
925 names = dict(
926 cast(Iterator[tuple[str | bytes, Any]], choplist(2, objs)),
927 )
928 return names[key]
929 if "Kids" in d:
930 for c in list_value(d["Kids"]):
931 v = lookup(dict_value(c))
932 if v:
933 return v
934 raise PDFKeyError((cat, key))
936 return lookup(d0)
938 def get_dest(self, name: str | bytes) -> Any:
939 try:
940 # PDF-1.2 or later
941 obj = self.lookup_name("Dests", name)
942 except KeyError:
943 # PDF-1.1 or prior
944 if "Dests" not in self.catalog:
945 raise PDFDestinationNotFound(name) from None
946 d0 = dict_value(self.catalog["Dests"])
947 if name not in d0:
948 raise PDFDestinationNotFound(name) from None
949 obj = d0[name]
950 return obj
952 # find_xref
953 def find_xref(self, parser: PDFParser) -> int:
954 """Internal function used to locate the first XRef."""
955 # search the last xref table by scanning the file backwards.
956 prev = b""
957 for line in parser.revreadlines():
958 line = line.strip()
959 log.debug("find_xref: %r", line)
961 if line == b"startxref":
962 log.debug("xref found: pos=%r", prev)
964 if not prev.isdigit():
965 raise PDFNoValidXRef(f"Invalid xref position, no digit: {prev!r}")
967 start = int(prev)
969 if not start >= 0:
970 raise PDFNoValidXRef(f"Invalid xref position, negative: {start}")
972 # The xref start needs to fit in a C ssize_t to be a proper file offset
973 if start >= 2**31:
974 raise PDFNoValidXRef(f"Invalid xref position, too large: {start!r}")
976 return start
978 if line:
979 prev = line
981 raise PDFNoValidXRef("Unexpected EOF")
983 # read xref table
984 def read_xref_from(
985 self,
986 parser: PDFParser,
987 start: int,
988 xrefs: list[PDFBaseXRef],
989 ) -> None:
990 """Reads XRefs from the given location."""
991 parser.seek(start)
992 parser.reset()
993 try:
994 (pos, token) = parser.nexttoken()
995 except PSEOF as err:
996 raise PDFNoValidXRef("Unexpected EOF") from err
997 log.debug("read_xref_from: start=%d, token=%r", start, token)
998 if isinstance(token, int):
999 # XRefStream: PDF-1.5
1000 parser.seek(pos)
1001 parser.reset()
1002 xref: PDFBaseXRef = PDFXRefStream()
1003 xref.load(parser)
1004 else:
1005 if token is parser.KEYWORD_XREF:
1006 parser.nextline()
1007 xref = PDFXRef()
1008 xref.load(parser)
1009 xrefs.append(xref)
1010 trailer = xref.get_trailer()
1011 log.debug("trailer: %r", trailer)
1012 if "XRefStm" in trailer:
1013 pos = int_value(trailer["XRefStm"])
1014 self.read_xref_from(parser, pos, xrefs)
1015 if "Prev" in trailer:
1016 # find previous xref
1017 pos = int_value(trailer["Prev"])
1018 self.read_xref_from(parser, pos, xrefs)
1021class PageLabels(NumberTree):
1022 """PageLabels from the document catalog.
1024 See Section 8.3.1 in the PDF Reference.
1025 """
1027 @property
1028 def labels(self) -> Iterator[str]:
1029 ranges = self.values
1031 # The tree must begin with page index 0
1032 if len(ranges) == 0 or ranges[0][0] != 0:
1033 if settings.STRICT:
1034 raise PDFSyntaxError("PageLabels is missing page index 0")
1035 else:
1036 # Try to cope, by assuming empty labels for the initial pages
1037 ranges.insert(0, (0, {}))
1039 for next, (start, label_dict_unchecked) in enumerate(ranges, 1):
1040 label_dict = dict_value(label_dict_unchecked)
1041 style = label_dict.get("S")
1042 prefix = decode_text(str_value(label_dict.get("P", b"")))
1043 first_value = int_value(label_dict.get("St", 1))
1045 if next == len(ranges):
1046 # This is the last specified range. It continues until the end
1047 # of the document.
1048 values: Iterable[int] = itertools.count(first_value)
1049 else:
1050 end, _ = ranges[next]
1051 range_length = end - start
1052 values = range(first_value, first_value + range_length)
1054 for value in values:
1055 label = self._format_page_label(value, style)
1056 yield prefix + label
1058 @staticmethod
1059 def _format_page_label(value: int, style: Any) -> str:
1060 """Format page label value in a specific style"""
1061 if style is None:
1062 label = ""
1063 elif style is LIT("D"): # Decimal arabic numerals
1064 label = str(value)
1065 elif style is LIT("R"): # Uppercase roman numerals
1066 label = format_int_roman(value).upper()
1067 elif style is LIT("r"): # Lowercase roman numerals
1068 label = format_int_roman(value)
1069 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ...
1070 label = format_int_alpha(value).upper()
1071 elif style is LIT("a"): # Lowercase letters a-z, aa-zz...
1072 label = format_int_alpha(value)
1073 else:
1074 log.warning("Unknown page label style: %r", style)
1075 label = ""
1076 return label