Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 88%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

723 statements  

1import itertools 

2import logging 

3import re 

4import struct 

5from hashlib import md5, sha256, sha384, sha512 

6from typing import ( 

7 Any, 

8 Callable, 

9 Dict, 

10 Iterable, 

11 Iterator, 

12 KeysView, 

13 List, 

14 Optional, 

15 Sequence, 

16 Tuple, 

17 Type, 

18 Union, 

19 cast, 

20) 

21 

22from cryptography.hazmat.backends import default_backend 

23from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

24 

25from pdfminer import settings 

26from pdfminer.arcfour import Arcfour 

27from pdfminer.casting import safe_int 

28from pdfminer.data_structures import NumberTree 

29from pdfminer.pdfexceptions import ( 

30 PDFException, 

31 PDFKeyError, 

32 PDFObjectNotFound, 

33 PDFTypeError, 

34) 

35from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError 

36from pdfminer.pdftypes import ( 

37 DecipherCallable, 

38 PDFStream, 

39 decipher_all, 

40 dict_value, 

41 int_value, 

42 list_value, 

43 str_value, 

44 stream_value, 

45 uint_value, 

46) 

47from pdfminer.psexceptions import PSEOF 

48from pdfminer.psparser import KWD, LIT, literal_name 

49from pdfminer.utils import ( 

50 choplist, 

51 decode_text, 

52 format_int_alpha, 

53 format_int_roman, 

54 nunpack, 

55 unpad_aes, 

56) 

57 

58log = logging.getLogger(__name__) 

59 

60 

61class PDFNoValidXRef(PDFSyntaxError): 

62 pass 

63 

64 

65class PDFNoValidXRefWarning(SyntaxWarning): 

66 """Legacy warning for missing xref. 

67 

68 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

69 """ 

70 

71 

72class PDFNoOutlines(PDFException): 

73 pass 

74 

75 

76class PDFNoPageLabels(PDFException): 

77 pass 

78 

79 

80class PDFDestinationNotFound(PDFException): 

81 pass 

82 

83 

84class PDFEncryptionError(PDFException): 

85 pass 

86 

87 

88class PDFPasswordIncorrect(PDFEncryptionError): 

89 pass 

90 

91 

92class PDFEncryptionWarning(UserWarning): 

93 """Legacy warning for failed decryption. 

94 

95 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

96 """ 

97 

98 

99class PDFTextExtractionNotAllowedWarning(UserWarning): 

100 """Legacy warning for PDF that does not allow extraction. 

101 

102 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

103 """ 

104 

105 

106class PDFTextExtractionNotAllowed(PDFEncryptionError): 

107 pass 

108 

109 

110# some predefined literals and keywords. 

111LITERAL_OBJSTM = LIT("ObjStm") 

112LITERAL_XREF = LIT("XRef") 

113LITERAL_CATALOG = LIT("Catalog") 

114 

115 

116class PDFBaseXRef: 

117 def get_trailer(self) -> Dict[str, Any]: 

118 raise NotImplementedError 

119 

120 def get_objids(self) -> Iterable[int]: 

121 return [] 

122 

123 # Must return 

124 # (strmid, index, genno) 

125 # or (None, pos, genno) 

126 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: 

127 raise PDFKeyError(objid) 

128 

129 def load(self, parser: PDFParser) -> None: 

130 raise NotImplementedError 

131 

132 

133class PDFXRef(PDFBaseXRef): 

134 def __init__(self) -> None: 

135 self.offsets: Dict[int, Tuple[Optional[int], int, int]] = {} 

136 self.trailer: Dict[str, Any] = {} 

137 

138 def __repr__(self) -> str: 

139 return "<PDFXRef: offsets=%r>" % (self.offsets.keys()) 

140 

141 def load(self, parser: PDFParser) -> None: 

142 while True: 

143 try: 

144 (pos, line) = parser.nextline() 

145 line = line.strip() 

146 if not line: 

147 continue 

148 except PSEOF: 

149 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") 

150 if line.startswith(b"trailer"): 

151 parser.seek(pos) 

152 break 

153 f = line.split(b" ") 

154 if len(f) != 2: 

155 error_msg = f"Trailer not found: {parser!r}: line={line!r}" 

156 raise PDFNoValidXRef(error_msg) 

157 try: 

158 (start, nobjs) = map(int, f) 

159 except ValueError: 

160 error_msg = f"Invalid line: {parser!r}: line={line!r}" 

161 raise PDFNoValidXRef(error_msg) 

162 for objid in range(start, start + nobjs): 

163 try: 

164 (_, line) = parser.nextline() 

165 line = line.strip() 

166 except PSEOF: 

167 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") 

168 f = line.split(b" ") 

169 if len(f) != 3: 

170 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}" 

171 raise PDFNoValidXRef(error_msg) 

172 (pos_b, genno_b, use_b) = f 

173 if use_b != b"n": 

174 continue 

175 

176 pos_i = safe_int(pos_b) 

177 genno_i = safe_int(genno_b) 

178 if pos_i is not None and genno_i is not None: 

179 self.offsets[objid] = (None, pos_i, genno_i) 

180 else: 

181 log.warning( 

182 f"Not adding object {objid} to xref because position {pos_b!r} " 

183 f"or generation number {genno_b!r} cannot be parsed as an int" 

184 ) 

185 

186 log.debug("xref objects: %r", self.offsets) 

187 self.load_trailer(parser) 

188 

189 def load_trailer(self, parser: PDFParser) -> None: 

190 try: 

191 (_, kwd) = parser.nexttoken() 

192 assert kwd is KWD(b"trailer"), str(kwd) 

193 (_, dic) = parser.nextobject() 

194 except PSEOF: 

195 x = parser.pop(1) 

196 if not x: 

197 raise PDFNoValidXRef("Unexpected EOF - file corrupted") 

198 (_, dic) = x[0] 

199 self.trailer.update(dict_value(dic)) 

200 log.debug("trailer=%r", self.trailer) 

201 

202 def get_trailer(self) -> Dict[str, Any]: 

203 return self.trailer 

204 

205 def get_objids(self) -> KeysView[int]: 

206 return self.offsets.keys() 

207 

208 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: 

209 return self.offsets[objid] 

210 

211 

212class PDFXRefFallback(PDFXRef): 

213 def __repr__(self) -> str: 

214 return "<PDFXRefFallback: offsets=%r>" % (self.offsets.keys()) 

215 

216 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b") 

217 

218 def load(self, parser: PDFParser) -> None: 

219 parser.seek(0) 

220 while 1: 

221 try: 

222 (pos, line_bytes) = parser.nextline() 

223 except PSEOF: 

224 break 

225 if line_bytes.startswith(b"trailer"): 

226 parser.seek(pos) 

227 self.load_trailer(parser) 

228 log.debug("trailer: %r", self.trailer) 

229 break 

230 line = line_bytes.decode("latin-1") # default pdf encoding 

231 m = self.PDFOBJ_CUE.match(line) 

232 if not m: 

233 continue 

234 (objid_s, genno_s) = m.groups() 

235 objid = int(objid_s) 

236 genno = int(genno_s) 

237 self.offsets[objid] = (None, pos, genno) 

238 # expand ObjStm. 

239 parser.seek(pos) 

240 (_, obj) = parser.nextobject() 

241 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM: 

242 stream = stream_value(obj) 

243 try: 

244 n = stream["N"] 

245 except KeyError: 

246 if settings.STRICT: 

247 raise PDFSyntaxError("N is not defined: %r" % stream) 

248 n = 0 

249 parser1 = PDFStreamParser(stream.get_data()) 

250 objs: List[int] = [] 

251 try: 

252 while 1: 

253 (_, obj) = parser1.nextobject() 

254 objs.append(cast(int, obj)) 

255 except PSEOF: 

256 pass 

257 n = min(n, len(objs) // 2) 

258 for index in range(n): 

259 objid1 = objs[index * 2] 

260 self.offsets[objid1] = (objid, index, 0) 

261 

262 

263class PDFXRefStream(PDFBaseXRef): 

264 def __init__(self) -> None: 

265 self.data: Optional[bytes] = None 

266 self.entlen: Optional[int] = None 

267 self.fl1: Optional[int] = None 

268 self.fl2: Optional[int] = None 

269 self.fl3: Optional[int] = None 

270 self.ranges: List[Tuple[int, int]] = [] 

271 

272 def __repr__(self) -> str: 

273 return "<PDFXRefStream: ranges=%r>" % (self.ranges) 

274 

275 def load(self, parser: PDFParser) -> None: 

276 (_, objid) = parser.nexttoken() # ignored 

277 (_, genno) = parser.nexttoken() # ignored 

278 (_, kwd) = parser.nexttoken() 

279 (_, stream) = parser.nextobject() 

280 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF: 

281 raise PDFNoValidXRef("Invalid PDF stream spec.") 

282 size = stream["Size"] 

283 index_array = stream.get("Index", (0, size)) 

284 if len(index_array) % 2 != 0: 

285 raise PDFSyntaxError("Invalid index number") 

286 self.ranges.extend(cast(Iterator[Tuple[int, int]], choplist(2, index_array))) 

287 (self.fl1, self.fl2, self.fl3) = stream["W"] 

288 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

289 self.data = stream.get_data() 

290 self.entlen = self.fl1 + self.fl2 + self.fl3 

291 self.trailer = stream.attrs 

292 log.debug( 

293 "xref stream: objid=%s, fields=%d,%d,%d", 

294 ", ".join(map(repr, self.ranges)), 

295 self.fl1, 

296 self.fl2, 

297 self.fl3, 

298 ) 

299 

300 def get_trailer(self) -> Dict[str, Any]: 

301 return self.trailer 

302 

303 def get_objids(self) -> Iterator[int]: 

304 for start, nobjs in self.ranges: 

305 for i in range(nobjs): 

306 assert self.entlen is not None 

307 assert self.data is not None 

308 offset = self.entlen * i 

309 ent = self.data[offset : offset + self.entlen] 

310 f1 = nunpack(ent[: self.fl1], 1) 

311 if f1 == 1 or f1 == 2: 

312 yield start + i 

313 

314 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: 

315 index = 0 

316 for start, nobjs in self.ranges: 

317 if start <= objid and objid < start + nobjs: 

318 index += objid - start 

319 break 

320 else: 

321 index += nobjs 

322 else: 

323 raise PDFKeyError(objid) 

324 assert self.entlen is not None 

325 assert self.data is not None 

326 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

327 offset = self.entlen * index 

328 ent = self.data[offset : offset + self.entlen] 

329 f1 = nunpack(ent[: self.fl1], 1) 

330 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2]) 

331 f3 = nunpack(ent[self.fl1 + self.fl2 :]) 

332 if f1 == 1: 

333 return (None, f2, f3) 

334 elif f1 == 2: 

335 return (f2, f3, 0) 

336 else: 

337 # this is a free object 

338 raise PDFKeyError(objid) 

339 

340 

341class PDFStandardSecurityHandler: 

342 PASSWORD_PADDING = ( 

343 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08" 

344 b"..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz" 

345 ) 

346 supported_revisions: Tuple[int, ...] = (2, 3) 

347 

348 def __init__( 

349 self, 

350 docid: Sequence[bytes], 

351 param: Dict[str, Any], 

352 password: str = "", 

353 ) -> None: 

354 self.docid = docid 

355 self.param = param 

356 self.password = password 

357 self.init() 

358 

359 def init(self) -> None: 

360 self.init_params() 

361 if self.r not in self.supported_revisions: 

362 error_msg = "Unsupported revision: param=%r" % self.param 

363 raise PDFEncryptionError(error_msg) 

364 self.init_key() 

365 

366 def init_params(self) -> None: 

367 self.v = int_value(self.param.get("V", 0)) 

368 self.r = int_value(self.param["R"]) 

369 self.p = uint_value(self.param["P"], 32) 

370 self.o = str_value(self.param["O"]) 

371 self.u = str_value(self.param["U"]) 

372 self.length = int_value(self.param.get("Length", 40)) 

373 

374 def init_key(self) -> None: 

375 self.key = self.authenticate(self.password) 

376 if self.key is None: 

377 raise PDFPasswordIncorrect 

378 

379 def is_printable(self) -> bool: 

380 return bool(self.p & 4) 

381 

382 def is_modifiable(self) -> bool: 

383 return bool(self.p & 8) 

384 

385 def is_extractable(self) -> bool: 

386 return bool(self.p & 16) 

387 

388 def compute_u(self, key: bytes) -> bytes: 

389 if self.r == 2: 

390 # Algorithm 3.4 

391 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2 

392 else: 

393 # Algorithm 3.5 

394 hash = md5(self.PASSWORD_PADDING) # 2 

395 hash.update(self.docid[0]) # 3 

396 result = Arcfour(key).encrypt(hash.digest()) # 4 

397 for i in range(1, 20): # 5 

398 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

399 result = Arcfour(k).encrypt(result) 

400 result += result # 6 

401 return result 

402 

403 def compute_encryption_key(self, password: bytes) -> bytes: 

404 # Algorithm 3.2 

405 password = (password + self.PASSWORD_PADDING)[:32] # 1 

406 hash = md5(password) # 2 

407 hash.update(self.o) # 3 

408 # See https://github.com/pdfminer/pdfminer.six/issues/186 

409 hash.update(struct.pack("<L", self.p)) # 4 

410 hash.update(self.docid[0]) # 5 

411 if self.r >= 4: 

412 if not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata: 

413 hash.update(b"\xff\xff\xff\xff") 

414 result = hash.digest() 

415 n = 5 

416 if self.r >= 3: 

417 n = self.length // 8 

418 for _ in range(50): 

419 result = md5(result[:n]).digest() 

420 return result[:n] 

421 

422 def authenticate(self, password: str) -> Optional[bytes]: 

423 password_bytes = password.encode("latin1") 

424 key = self.authenticate_user_password(password_bytes) 

425 if key is None: 

426 key = self.authenticate_owner_password(password_bytes) 

427 return key 

428 

429 def authenticate_user_password(self, password: bytes) -> Optional[bytes]: 

430 key = self.compute_encryption_key(password) 

431 if self.verify_encryption_key(key): 

432 return key 

433 else: 

434 return None 

435 

436 def verify_encryption_key(self, key: bytes) -> bool: 

437 # Algorithm 3.6 

438 u = self.compute_u(key) 

439 if self.r == 2: 

440 return u == self.u 

441 return u[:16] == self.u[:16] 

442 

443 def authenticate_owner_password(self, password: bytes) -> Optional[bytes]: 

444 # Algorithm 3.7 

445 password = (password + self.PASSWORD_PADDING)[:32] 

446 hash = md5(password) 

447 if self.r >= 3: 

448 for _ in range(50): 

449 hash = md5(hash.digest()) 

450 n = 5 

451 if self.r >= 3: 

452 n = self.length // 8 

453 key = hash.digest()[:n] 

454 if self.r == 2: 

455 user_password = Arcfour(key).decrypt(self.o) 

456 else: 

457 user_password = self.o 

458 for i in range(19, -1, -1): 

459 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

460 user_password = Arcfour(k).decrypt(user_password) 

461 return self.authenticate_user_password(user_password) 

462 

463 def decrypt( 

464 self, 

465 objid: int, 

466 genno: int, 

467 data: bytes, 

468 attrs: Optional[Dict[str, Any]] = None, 

469 ) -> bytes: 

470 return self.decrypt_rc4(objid, genno, data) 

471 

472 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes: 

473 assert self.key is not None 

474 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2] 

475 hash = md5(key) 

476 key = hash.digest()[: min(len(key), 16)] 

477 return Arcfour(key).decrypt(data) 

478 

479 

480class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler): 

481 supported_revisions: Tuple[int, ...] = (4,) 

482 

483 def init_params(self) -> None: 

484 super().init_params() 

485 self.length = 128 

486 self.cf = dict_value(self.param.get("CF")) 

487 self.stmf = literal_name(self.param["StmF"]) 

488 self.strf = literal_name(self.param["StrF"]) 

489 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True)) 

490 if self.stmf != self.strf: 

491 error_msg = "Unsupported crypt filter: param=%r" % self.param 

492 raise PDFEncryptionError(error_msg) 

493 self.cfm = {} 

494 for k, v in self.cf.items(): 

495 f = self.get_cfm(literal_name(v["CFM"])) 

496 if f is None: 

497 error_msg = "Unknown crypt filter method: param=%r" % self.param 

498 raise PDFEncryptionError(error_msg) 

499 self.cfm[k] = f 

500 self.cfm["Identity"] = self.decrypt_identity 

501 if self.strf not in self.cfm: 

502 error_msg = "Undefined crypt filter: param=%r" % self.param 

503 raise PDFEncryptionError(error_msg) 

504 

505 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]: 

506 if name == "V2": 

507 return self.decrypt_rc4 

508 elif name == "AESV2": 

509 return self.decrypt_aes128 

510 else: 

511 return None 

512 

513 def decrypt( 

514 self, 

515 objid: int, 

516 genno: int, 

517 data: bytes, 

518 attrs: Optional[Dict[str, Any]] = None, 

519 name: Optional[str] = None, 

520 ) -> bytes: 

521 if not self.encrypt_metadata and attrs is not None: 

522 t = attrs.get("Type") 

523 if t is not None and literal_name(t) == "Metadata": 

524 return data 

525 if name is None: 

526 name = self.strf 

527 return self.cfm[name](objid, genno, data) 

528 

529 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes: 

530 return data 

531 

532 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes: 

533 assert self.key is not None 

534 key = ( 

535 self.key 

536 + struct.pack("<L", objid)[:3] 

537 + struct.pack("<L", genno)[:2] 

538 + b"sAlT" 

539 ) 

540 hash = md5(key) 

541 key = hash.digest()[: min(len(key), 16)] 

542 initialization_vector = data[:16] 

543 ciphertext = data[16:] 

544 cipher = Cipher( 

545 algorithms.AES(key), 

546 modes.CBC(initialization_vector), 

547 backend=default_backend(), 

548 ) # type: ignore 

549 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

550 return unpad_aes(plaintext) 

551 

552 

553class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4): 

554 supported_revisions = (5, 6) 

555 

556 def init_params(self) -> None: 

557 super().init_params() 

558 self.length = 256 

559 self.oe = str_value(self.param["OE"]) 

560 self.ue = str_value(self.param["UE"]) 

561 self.o_hash = self.o[:32] 

562 self.o_validation_salt = self.o[32:40] 

563 self.o_key_salt = self.o[40:] 

564 self.u_hash = self.u[:32] 

565 self.u_validation_salt = self.u[32:40] 

566 self.u_key_salt = self.u[40:] 

567 

568 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]: 

569 if name == "AESV3": 

570 return self.decrypt_aes256 

571 else: 

572 return None 

573 

574 def authenticate(self, password: str) -> Optional[bytes]: 

575 password_b = self._normalize_password(password) 

576 hash = self._password_hash(password_b, self.o_validation_salt, self.u) 

577 if hash == self.o_hash: 

578 hash = self._password_hash(password_b, self.o_key_salt, self.u) 

579 cipher = Cipher( 

580 algorithms.AES(hash), 

581 modes.CBC(b"\0" * 16), 

582 backend=default_backend(), 

583 ) # type: ignore 

584 return cipher.decryptor().update(self.oe) # type: ignore 

585 hash = self._password_hash(password_b, self.u_validation_salt) 

586 if hash == self.u_hash: 

587 hash = self._password_hash(password_b, self.u_key_salt) 

588 cipher = Cipher( 

589 algorithms.AES(hash), 

590 modes.CBC(b"\0" * 16), 

591 backend=default_backend(), 

592 ) # type: ignore 

593 return cipher.decryptor().update(self.ue) # type: ignore 

594 return None 

595 

596 def _normalize_password(self, password: str) -> bytes: 

597 if self.r == 6: 

598 # saslprep expects non-empty strings, apparently 

599 if not password: 

600 return b"" 

601 from pdfminer._saslprep import saslprep 

602 

603 password = saslprep(password) 

604 return password.encode("utf-8")[:127] 

605 

606 def _password_hash( 

607 self, 

608 password: bytes, 

609 salt: bytes, 

610 vector: Optional[bytes] = None, 

611 ) -> bytes: 

612 """Compute password hash depending on revision number""" 

613 if self.r == 5: 

614 return self._r5_password(password, salt, vector) 

615 return self._r6_password(password, salt[0:8], vector) 

616 

617 def _r5_password( 

618 self, 

619 password: bytes, 

620 salt: bytes, 

621 vector: Optional[bytes] = None, 

622 ) -> bytes: 

623 """Compute the password for revision 5""" 

624 hash = sha256(password) 

625 hash.update(salt) 

626 if vector is not None: 

627 hash.update(vector) 

628 return hash.digest() 

629 

630 def _r6_password( 

631 self, 

632 password: bytes, 

633 salt: bytes, 

634 vector: Optional[bytes] = None, 

635 ) -> bytes: 

636 """Compute the password for revision 6""" 

637 initial_hash = sha256(password) 

638 initial_hash.update(salt) 

639 if vector is not None: 

640 initial_hash.update(vector) 

641 k = initial_hash.digest() 

642 hashes = (sha256, sha384, sha512) 

643 round_no = last_byte_val = 0 

644 while round_no < 64 or last_byte_val > round_no - 32: 

645 k1 = (password + k + (vector or b"")) * 64 

646 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1) 

647 # compute the first 16 bytes of e, 

648 # interpreted as an unsigned integer mod 3 

649 next_hash = hashes[self._bytes_mod_3(e[:16])] 

650 k = next_hash(e).digest() 

651 last_byte_val = e[len(e) - 1] 

652 round_no += 1 

653 return k[:32] 

654 

655 @staticmethod 

656 def _bytes_mod_3(input_bytes: bytes) -> int: 

657 # 256 is 1 mod 3, so we can just sum 'em 

658 return sum(b % 3 for b in input_bytes) % 3 

659 

660 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes: 

661 cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) 

662 encryptor = cipher.encryptor() # type: ignore 

663 return encryptor.update(data) + encryptor.finalize() # type: ignore 

664 

665 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes: 

666 initialization_vector = data[:16] 

667 ciphertext = data[16:] 

668 assert self.key is not None 

669 cipher = Cipher( 

670 algorithms.AES(self.key), 

671 modes.CBC(initialization_vector), 

672 backend=default_backend(), 

673 ) # type: ignore 

674 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

675 return unpad_aes(plaintext) 

676 

677 

678class PDFDocument: 

679 """PDFDocument object represents a PDF document. 

680 

681 Since a PDF file can be very big, normally it is not loaded at 

682 once. So PDF document has to cooperate with a PDF parser in order to 

683 dynamically import the data as processing goes. 

684 

685 Typical usage: 

686 doc = PDFDocument(parser, password) 

687 obj = doc.getobj(objid) 

688 

689 """ 

690 

691 security_handler_registry: Dict[int, Type[PDFStandardSecurityHandler]] = { 

692 1: PDFStandardSecurityHandler, 

693 2: PDFStandardSecurityHandler, 

694 4: PDFStandardSecurityHandlerV4, 

695 5: PDFStandardSecurityHandlerV5, 

696 } 

697 

698 def __init__( 

699 self, 

700 parser: PDFParser, 

701 password: str = "", 

702 caching: bool = True, 

703 fallback: bool = True, 

704 ) -> None: 

705 """Set the document to use a given PDFParser object.""" 

706 self.caching = caching 

707 self.xrefs: List[PDFBaseXRef] = [] 

708 self.info = [] 

709 self.catalog: Dict[str, Any] = {} 

710 self.encryption: Optional[Tuple[Any, Any]] = None 

711 self.decipher: Optional[DecipherCallable] = None 

712 self._parser = None 

713 self._cached_objs: Dict[int, Tuple[object, int]] = {} 

714 self._parsed_objs: Dict[int, Tuple[List[object], int]] = {} 

715 self._parser = parser 

716 self._parser.set_document(self) 

717 self.is_printable = self.is_modifiable = self.is_extractable = True 

718 # Retrieve the information of each header that was appended 

719 # (maybe multiple times) at the end of the document. 

720 try: 

721 pos = self.find_xref(parser) 

722 self.read_xref_from(parser, pos, self.xrefs) 

723 except PDFNoValidXRef: 

724 if fallback: 

725 parser.fallback = True 

726 newxref = PDFXRefFallback() 

727 newxref.load(parser) 

728 self.xrefs.append(newxref) 

729 

730 for xref in self.xrefs: 

731 trailer = xref.get_trailer() 

732 if not trailer: 

733 continue 

734 # If there's an encryption info, remember it. 

735 if "Encrypt" in trailer: 

736 if "ID" in trailer: 

737 id_value = list_value(trailer["ID"]) 

738 else: 

739 # Some documents may not have a /ID, use two empty 

740 # byte strings instead. Solves 

741 # https://github.com/pdfminer/pdfminer.six/issues/594 

742 id_value = (b"", b"") 

743 self.encryption = (id_value, dict_value(trailer["Encrypt"])) 

744 self._initialize_password(password) 

745 if "Info" in trailer: 

746 self.info.append(dict_value(trailer["Info"])) 

747 if "Root" in trailer: 

748 # Every PDF file must have exactly one /Root dictionary. 

749 self.catalog = dict_value(trailer["Root"]) 

750 break 

751 else: 

752 raise PDFSyntaxError("No /Root object! - Is this really a PDF?") 

753 if self.catalog.get("Type") is not LITERAL_CATALOG: 

754 if settings.STRICT: 

755 raise PDFSyntaxError("Catalog not found!") 

756 

757 KEYWORD_OBJ = KWD(b"obj") 

758 

759 # _initialize_password(password=b'') 

760 # Perform the initialization with a given password. 

761 def _initialize_password(self, password: str = "") -> None: 

762 assert self.encryption is not None 

763 (docid, param) = self.encryption 

764 if literal_name(param.get("Filter")) != "Standard": 

765 raise PDFEncryptionError("Unknown filter: param=%r" % param) 

766 v = int_value(param.get("V", 0)) 

767 factory = self.security_handler_registry.get(v) 

768 if factory is None: 

769 raise PDFEncryptionError("Unknown algorithm: param=%r" % param) 

770 handler = factory(docid, param, password) 

771 self.decipher = handler.decrypt 

772 self.is_printable = handler.is_printable() 

773 self.is_modifiable = handler.is_modifiable() 

774 self.is_extractable = handler.is_extractable() 

775 assert self._parser is not None 

776 self._parser.fallback = False # need to read streams with exact length 

777 

778 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object: 

779 if stream.objid in self._parsed_objs: 

780 (objs, n) = self._parsed_objs[stream.objid] 

781 else: 

782 (objs, n) = self._get_objects(stream) 

783 if self.caching: 

784 assert stream.objid is not None 

785 self._parsed_objs[stream.objid] = (objs, n) 

786 i = n * 2 + index 

787 try: 

788 obj = objs[i] 

789 except IndexError: 

790 raise PDFSyntaxError("index too big: %r" % index) 

791 return obj 

792 

793 def _get_objects(self, stream: PDFStream) -> Tuple[List[object], int]: 

794 if stream.get("Type") is not LITERAL_OBJSTM: 

795 if settings.STRICT: 

796 raise PDFSyntaxError("Not a stream object: %r" % stream) 

797 try: 

798 n = cast(int, stream["N"]) 

799 except KeyError: 

800 if settings.STRICT: 

801 raise PDFSyntaxError("N is not defined: %r" % stream) 

802 n = 0 

803 parser = PDFStreamParser(stream.get_data()) 

804 parser.set_document(self) 

805 objs: List[object] = [] 

806 try: 

807 while 1: 

808 (_, obj) = parser.nextobject() 

809 objs.append(obj) 

810 except PSEOF: 

811 pass 

812 return (objs, n) 

813 

814 def _getobj_parse(self, pos: int, objid: int) -> object: 

815 assert self._parser is not None 

816 self._parser.seek(pos) 

817 (_, objid1) = self._parser.nexttoken() # objid 

818 (_, genno) = self._parser.nexttoken() # genno 

819 (_, kwd) = self._parser.nexttoken() 

820 # hack around malformed pdf files 

821 # copied from https://github.com/jaepil/pdfminer3k/blob/master/ 

822 # pdfminer/pdfparser.py#L399 

823 # to solve https://github.com/pdfminer/pdfminer.six/issues/56 

824 # assert objid1 == objid, str((objid1, objid)) 

825 if objid1 != objid: 

826 x = [] 

827 while kwd is not self.KEYWORD_OBJ: 

828 (_, kwd) = self._parser.nexttoken() 

829 x.append(kwd) 

830 if len(x) >= 2: 

831 objid1 = x[-2] 

832 # #### end hack around malformed pdf files 

833 if objid1 != objid: 

834 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}") 

835 

836 if kwd != KWD(b"obj"): 

837 raise PDFSyntaxError("Invalid object spec: offset=%r" % pos) 

838 (_, obj) = self._parser.nextobject() 

839 return obj 

840 

841 # can raise PDFObjectNotFound 

842 def getobj(self, objid: int) -> object: 

843 """Get object from PDF 

844 

845 :raises PDFException if PDFDocument is not initialized 

846 :raises PDFObjectNotFound if objid does not exist in PDF 

847 """ 

848 if not self.xrefs: 

849 raise PDFException("PDFDocument is not initialized") 

850 log.debug("getobj: objid=%r", objid) 

851 if objid in self._cached_objs: 

852 (obj, genno) = self._cached_objs[objid] 

853 else: 

854 for xref in self.xrefs: 

855 try: 

856 (strmid, index, genno) = xref.get_pos(objid) 

857 except KeyError: 

858 continue 

859 try: 

860 if strmid is not None: 

861 stream = stream_value(self.getobj(strmid)) 

862 obj = self._getobj_objstm(stream, index, objid) 

863 else: 

864 obj = self._getobj_parse(index, objid) 

865 if self.decipher: 

866 obj = decipher_all(self.decipher, objid, genno, obj) 

867 

868 if isinstance(obj, PDFStream): 

869 obj.set_objid(objid, genno) 

870 break 

871 except (PSEOF, PDFSyntaxError): 

872 continue 

873 else: 

874 raise PDFObjectNotFound(objid) 

875 log.debug("register: objid=%r: %r", objid, obj) 

876 if self.caching: 

877 self._cached_objs[objid] = (obj, genno) 

878 return obj 

879 

880 OutlineType = Tuple[Any, Any, Any, Any, Any] 

881 

882 def get_outlines(self) -> Iterator[OutlineType]: 

883 if "Outlines" not in self.catalog: 

884 raise PDFNoOutlines 

885 

886 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]: 

887 entry = dict_value(entry) 

888 if "Title" in entry: 

889 if "A" in entry or "Dest" in entry: 

890 title = decode_text(str_value(entry["Title"])) 

891 dest = entry.get("Dest") 

892 action = entry.get("A") 

893 se = entry.get("SE") 

894 yield (level, title, dest, action, se) 

895 if "First" in entry and "Last" in entry: 

896 yield from search(entry["First"], level + 1) 

897 if "Next" in entry: 

898 yield from search(entry["Next"], level) 

899 

900 return search(self.catalog["Outlines"], 0) 

901 

902 def get_page_labels(self) -> Iterator[str]: 

903 """Generate page label strings for the PDF document. 

904 

905 If the document includes page labels, generates strings, one per page. 

906 If not, raises PDFNoPageLabels. 

907 

908 The resulting iteration is unbounded. 

909 """ 

910 assert self.catalog is not None 

911 

912 try: 

913 page_labels = PageLabels(self.catalog["PageLabels"]) 

914 except (PDFTypeError, KeyError): 

915 raise PDFNoPageLabels 

916 

917 return page_labels.labels 

918 

919 def lookup_name(self, cat: str, key: Union[str, bytes]) -> Any: 

920 try: 

921 names = dict_value(self.catalog["Names"]) 

922 except (PDFTypeError, KeyError): 

923 raise PDFKeyError((cat, key)) 

924 # may raise KeyError 

925 d0 = dict_value(names[cat]) 

926 

927 def lookup(d: Dict[str, Any]) -> Any: 

928 if "Limits" in d: 

929 (k1, k2) = list_value(d["Limits"]) 

930 if key < k1 or k2 < key: 

931 return None 

932 if "Names" in d: 

933 objs = list_value(d["Names"]) 

934 names = dict( 

935 cast(Iterator[Tuple[Union[str, bytes], Any]], choplist(2, objs)), 

936 ) 

937 return names[key] 

938 if "Kids" in d: 

939 for c in list_value(d["Kids"]): 

940 v = lookup(dict_value(c)) 

941 if v: 

942 return v 

943 raise PDFKeyError((cat, key)) 

944 

945 return lookup(d0) 

946 

947 def get_dest(self, name: Union[str, bytes]) -> Any: 

948 try: 

949 # PDF-1.2 or later 

950 obj = self.lookup_name("Dests", name) 

951 except KeyError: 

952 # PDF-1.1 or prior 

953 if "Dests" not in self.catalog: 

954 raise PDFDestinationNotFound(name) 

955 d0 = dict_value(self.catalog["Dests"]) 

956 if name not in d0: 

957 raise PDFDestinationNotFound(name) 

958 obj = d0[name] 

959 return obj 

960 

961 # find_xref 

962 def find_xref(self, parser: PDFParser) -> int: 

963 """Internal function used to locate the first XRef.""" 

964 # search the last xref table by scanning the file backwards. 

965 prev = b"" 

966 for line in parser.revreadlines(): 

967 line = line.strip() 

968 log.debug("find_xref: %r", line) 

969 

970 if line == b"startxref": 

971 log.debug("xref found: pos=%r", prev) 

972 

973 if not prev.isdigit(): 

974 raise PDFNoValidXRef(f"Invalid xref position: {prev!r}") 

975 

976 start = int(prev) 

977 

978 if not start >= 0: 

979 raise PDFNoValidXRef(f"Invalid negative xref position: {start}") 

980 

981 return start 

982 

983 if line: 

984 prev = line 

985 

986 raise PDFNoValidXRef("Unexpected EOF") 

987 

988 # read xref table 

989 def read_xref_from( 

990 self, 

991 parser: PDFParser, 

992 start: int, 

993 xrefs: List[PDFBaseXRef], 

994 ) -> None: 

995 """Reads XRefs from the given location.""" 

996 parser.seek(start) 

997 parser.reset() 

998 try: 

999 (pos, token) = parser.nexttoken() 

1000 except PSEOF: 

1001 raise PDFNoValidXRef("Unexpected EOF") 

1002 log.debug("read_xref_from: start=%d, token=%r", start, token) 

1003 if isinstance(token, int): 

1004 # XRefStream: PDF-1.5 

1005 parser.seek(pos) 

1006 parser.reset() 

1007 xref: PDFBaseXRef = PDFXRefStream() 

1008 xref.load(parser) 

1009 else: 

1010 if token is parser.KEYWORD_XREF: 

1011 parser.nextline() 

1012 xref = PDFXRef() 

1013 xref.load(parser) 

1014 xrefs.append(xref) 

1015 trailer = xref.get_trailer() 

1016 log.debug("trailer: %r", trailer) 

1017 if "XRefStm" in trailer: 

1018 pos = int_value(trailer["XRefStm"]) 

1019 self.read_xref_from(parser, pos, xrefs) 

1020 if "Prev" in trailer: 

1021 # find previous xref 

1022 pos = int_value(trailer["Prev"]) 

1023 self.read_xref_from(parser, pos, xrefs) 

1024 

1025 

1026class PageLabels(NumberTree): 

1027 """PageLabels from the document catalog. 

1028 

1029 See Section 8.3.1 in the PDF Reference. 

1030 """ 

1031 

1032 @property 

1033 def labels(self) -> Iterator[str]: 

1034 ranges = self.values 

1035 

1036 # The tree must begin with page index 0 

1037 if len(ranges) == 0 or ranges[0][0] != 0: 

1038 if settings.STRICT: 

1039 raise PDFSyntaxError("PageLabels is missing page index 0") 

1040 else: 

1041 # Try to cope, by assuming empty labels for the initial pages 

1042 ranges.insert(0, (0, {})) 

1043 

1044 for next, (start, label_dict_unchecked) in enumerate(ranges, 1): 

1045 label_dict = dict_value(label_dict_unchecked) 

1046 style = label_dict.get("S") 

1047 prefix = decode_text(str_value(label_dict.get("P", b""))) 

1048 first_value = int_value(label_dict.get("St", 1)) 

1049 

1050 if next == len(ranges): 

1051 # This is the last specified range. It continues until the end 

1052 # of the document. 

1053 values: Iterable[int] = itertools.count(first_value) 

1054 else: 

1055 end, _ = ranges[next] 

1056 range_length = end - start 

1057 values = range(first_value, first_value + range_length) 

1058 

1059 for value in values: 

1060 label = self._format_page_label(value, style) 

1061 yield prefix + label 

1062 

1063 @staticmethod 

1064 def _format_page_label(value: int, style: Any) -> str: 

1065 """Format page label value in a specific style""" 

1066 if style is None: 

1067 label = "" 

1068 elif style is LIT("D"): # Decimal arabic numerals 

1069 label = str(value) 

1070 elif style is LIT("R"): # Uppercase roman numerals 

1071 label = format_int_roman(value).upper() 

1072 elif style is LIT("r"): # Lowercase roman numerals 

1073 label = format_int_roman(value) 

1074 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ... 

1075 label = format_int_alpha(value).upper() 

1076 elif style is LIT("a"): # Lowercase letters a-z, aa-zz... 

1077 label = format_int_alpha(value) 

1078 else: 

1079 log.warning("Unknown page label style: %r", style) 

1080 label = "" 

1081 return label