Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 87%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

724 statements  

1import itertools 

2import logging 

3import re 

4import struct 

5from collections.abc import Callable, Iterable, Iterator, KeysView, Sequence 

6from hashlib import md5, sha256, sha384, sha512 

7from typing import ( 

8 Any, 

9 ClassVar, 

10 cast, 

11) 

12 

13from cryptography.hazmat.backends import default_backend 

14from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

15 

16from pdfminer import settings 

17from pdfminer.arcfour import Arcfour 

18from pdfminer.casting import safe_int 

19from pdfminer.data_structures import NumberTree 

20from pdfminer.pdfexceptions import ( 

21 PDFException, 

22 PDFKeyError, 

23 PDFObjectNotFound, 

24 PDFTypeError, 

25) 

26from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError 

27from pdfminer.pdftypes import ( 

28 DecipherCallable, 

29 PDFStream, 

30 decipher_all, 

31 dict_value, 

32 int_value, 

33 list_value, 

34 str_value, 

35 stream_value, 

36 uint_value, 

37) 

38from pdfminer.psexceptions import PSEOF 

39from pdfminer.psparser import KWD, LIT, literal_name 

40from pdfminer.utils import ( 

41 choplist, 

42 decode_text, 

43 format_int_alpha, 

44 format_int_roman, 

45 nunpack, 

46 unpad_aes, 

47) 

48 

49log = logging.getLogger(__name__) 

50 

51 

52class PDFNoValidXRef(PDFSyntaxError): 

53 pass 

54 

55 

56class PDFNoValidXRefWarning(SyntaxWarning): 

57 """Legacy warning for missing xref. 

58 

59 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

60 """ 

61 

62 

63class PDFNoOutlines(PDFException): 

64 pass 

65 

66 

67class PDFNoPageLabels(PDFException): 

68 pass 

69 

70 

71class PDFDestinationNotFound(PDFException): 

72 pass 

73 

74 

75class PDFEncryptionError(PDFException): 

76 pass 

77 

78 

79class PDFPasswordIncorrect(PDFEncryptionError): 

80 pass 

81 

82 

83class PDFEncryptionWarning(UserWarning): 

84 """Legacy warning for failed decryption. 

85 

86 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

87 """ 

88 

89 

90class PDFTextExtractionNotAllowedWarning(UserWarning): 

91 """Legacy warning for PDF that does not allow extraction. 

92 

93 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

94 """ 

95 

96 

97class PDFTextExtractionNotAllowed(PDFEncryptionError): 

98 pass 

99 

100 

101# some predefined literals and keywords. 

102LITERAL_OBJSTM = LIT("ObjStm") 

103LITERAL_XREF = LIT("XRef") 

104LITERAL_CATALOG = LIT("Catalog") 

105 

106 

107class PDFBaseXRef: 

108 def get_trailer(self) -> dict[str, Any]: 

109 raise NotImplementedError 

110 

111 def get_objids(self) -> Iterable[int]: 

112 return [] 

113 

114 # Must return 

115 # (strmid, index, genno) 

116 # or (None, pos, genno) 

117 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

118 raise PDFKeyError(objid) 

119 

120 def load(self, parser: PDFParser) -> None: 

121 raise NotImplementedError 

122 

123 

124class PDFXRef(PDFBaseXRef): 

125 def __init__(self) -> None: 

126 self.offsets: dict[int, tuple[int | None, int, int]] = {} 

127 self.trailer: dict[str, Any] = {} 

128 

129 def __repr__(self) -> str: 

130 return f"<PDFXRef: offsets={self.offsets.keys()!r}>" 

131 

132 def load(self, parser: PDFParser) -> None: 

133 while True: 

134 try: 

135 (pos, line) = parser.nextline() 

136 line = line.strip() 

137 if not line: 

138 continue 

139 except PSEOF as err: 

140 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err 

141 if line.startswith(b"trailer"): 

142 parser.seek(pos) 

143 break 

144 f = line.split(b" ") 

145 if len(f) != 2: 

146 error_msg = f"Trailer not found: {parser!r}: line={line!r}" 

147 raise PDFNoValidXRef(error_msg) 

148 try: 

149 (start, nobjs) = map(int, f) 

150 except ValueError as err: 

151 error_msg = f"Invalid line: {parser!r}: line={line!r}" 

152 raise PDFNoValidXRef(error_msg) from err 

153 for objid in range(start, start + nobjs): 

154 try: 

155 (_, line) = parser.nextline() 

156 line = line.strip() 

157 except PSEOF as err: 

158 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err 

159 f = line.split(b" ") 

160 if len(f) != 3: 

161 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}" 

162 raise PDFNoValidXRef(error_msg) 

163 (pos_b, genno_b, use_b) = f 

164 if use_b != b"n": 

165 continue 

166 

167 pos_i = safe_int(pos_b) 

168 genno_i = safe_int(genno_b) 

169 if pos_i is not None and genno_i is not None: 

170 self.offsets[objid] = (None, pos_i, genno_i) 

171 else: 

172 log.warning( 

173 "Not adding object %s to xref because position %r " 

174 "or generation number %r cannot be parsed as an int", 

175 objid, 

176 pos_b, 

177 genno_b, 

178 ) 

179 

180 log.debug("xref objects: %r", self.offsets) 

181 self.load_trailer(parser) 

182 

183 def load_trailer(self, parser: PDFParser) -> None: 

184 try: 

185 (_, kwd) = parser.nexttoken() 

186 assert kwd is KWD(b"trailer"), str(kwd) 

187 (_, dic) = parser.nextobject() 

188 except PSEOF: 

189 x = parser.pop(1) 

190 if not x: 

191 raise PDFNoValidXRef("Unexpected EOF - file corrupted") from None 

192 (_, dic) = x[0] 

193 self.trailer.update(dict_value(dic)) 

194 log.debug("trailer=%r", self.trailer) 

195 

196 def get_trailer(self) -> dict[str, Any]: 

197 return self.trailer 

198 

199 def get_objids(self) -> KeysView[int]: 

200 return self.offsets.keys() 

201 

202 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

203 return self.offsets[objid] 

204 

205 

206class PDFXRefFallback(PDFXRef): 

207 def __repr__(self) -> str: 

208 return f"<PDFXRefFallback: offsets={self.offsets.keys()!r}>" 

209 

210 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b") 

211 

212 def load(self, parser: PDFParser) -> None: 

213 parser.seek(0) 

214 while 1: 

215 try: 

216 (pos, line_bytes) = parser.nextline() 

217 except PSEOF: 

218 break 

219 if line_bytes.startswith(b"trailer"): 

220 parser.seek(pos) 

221 self.load_trailer(parser) 

222 log.debug("trailer: %r", self.trailer) 

223 break 

224 line = line_bytes.decode("latin-1") # default pdf encoding 

225 m = self.PDFOBJ_CUE.match(line) 

226 if not m: 

227 continue 

228 (objid_s, genno_s) = m.groups() 

229 objid = int(objid_s) 

230 genno = int(genno_s) 

231 self.offsets[objid] = (None, pos, genno) 

232 # expand ObjStm. 

233 parser.seek(pos) 

234 (_, obj) = parser.nextobject() 

235 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM: 

236 stream = stream_value(obj) 

237 try: 

238 n = stream["N"] 

239 except KeyError: 

240 if settings.STRICT: 

241 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None 

242 n = 0 

243 parser1 = PDFStreamParser(stream.get_data()) 

244 objs: list[int] = [] 

245 try: 

246 while 1: 

247 (_, obj) = parser1.nextobject() 

248 objs.append(cast(int, obj)) 

249 except PSEOF: 

250 pass 

251 n = min(n, len(objs) // 2) 

252 for index in range(n): 

253 objid1 = objs[index * 2] 

254 self.offsets[objid1] = (objid, index, 0) 

255 

256 

257class PDFXRefStream(PDFBaseXRef): 

258 def __init__(self) -> None: 

259 self.data: bytes | None = None 

260 self.entlen: int | None = None 

261 self.fl1: int | None = None 

262 self.fl2: int | None = None 

263 self.fl3: int | None = None 

264 self.ranges: list[tuple[int, int]] = [] 

265 

266 def __repr__(self) -> str: 

267 return f"<PDFXRefStream: ranges={self.ranges!r}>" 

268 

269 def load(self, parser: PDFParser) -> None: 

270 (_, _objid) = parser.nexttoken() # ignored 

271 (_, _genno) = parser.nexttoken() # ignored 

272 (_, _kwd) = parser.nexttoken() 

273 (_, stream) = parser.nextobject() 

274 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF: 

275 raise PDFNoValidXRef("Invalid PDF stream spec.") 

276 size = stream["Size"] 

277 index_array = stream.get("Index", (0, size)) 

278 if len(index_array) % 2 != 0: 

279 raise PDFSyntaxError("Invalid index number") 

280 self.ranges.extend(cast(Iterator[tuple[int, int]], choplist(2, index_array))) 

281 (self.fl1, self.fl2, self.fl3) = stream["W"] 

282 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

283 self.data = stream.get_data() 

284 self.entlen = self.fl1 + self.fl2 + self.fl3 

285 self.trailer = stream.attrs 

286 log.debug( 

287 "xref stream: objid=%s, fields=%d,%d,%d", 

288 ", ".join(map(repr, self.ranges)), 

289 self.fl1, 

290 self.fl2, 

291 self.fl3, 

292 ) 

293 

294 def get_trailer(self) -> dict[str, Any]: 

295 return self.trailer 

296 

297 def get_objids(self) -> Iterator[int]: 

298 for start, nobjs in self.ranges: 

299 for i in range(nobjs): 

300 assert self.entlen is not None 

301 assert self.data is not None 

302 offset = self.entlen * i 

303 ent = self.data[offset : offset + self.entlen] 

304 f1 = nunpack(ent[: self.fl1], 1) 

305 if f1 == 1 or f1 == 2: 

306 yield start + i 

307 

308 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

309 index = 0 

310 for start, nobjs in self.ranges: 

311 if start <= objid and objid < start + nobjs: 

312 index += objid - start 

313 break 

314 else: 

315 index += nobjs 

316 else: 

317 raise PDFKeyError(objid) 

318 assert self.entlen is not None 

319 assert self.data is not None 

320 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

321 offset = self.entlen * index 

322 ent = self.data[offset : offset + self.entlen] 

323 f1 = nunpack(ent[: self.fl1], 1) 

324 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2]) 

325 f3 = nunpack(ent[self.fl1 + self.fl2 :]) 

326 if f1 == 1: 

327 return (None, f2, f3) 

328 elif f1 == 2: 

329 return (f2, f3, 0) 

330 else: 

331 # this is a free object 

332 raise PDFKeyError(objid) 

333 

334 

335class PDFStandardSecurityHandler: 

336 PASSWORD_PADDING = ( 

337 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz" 

338 ) 

339 supported_revisions: tuple[int, ...] = (2, 3) 

340 

341 def __init__( 

342 self, 

343 docid: Sequence[bytes], 

344 param: dict[str, Any], 

345 password: str = "", 

346 ) -> None: 

347 self.docid = docid 

348 self.param = param 

349 self.password = password 

350 self.init() 

351 

352 def init(self) -> None: 

353 self.init_params() 

354 if self.r not in self.supported_revisions: 

355 error_msg = f"Unsupported revision: param={self.param!r}" 

356 raise PDFEncryptionError(error_msg) 

357 self.init_key() 

358 

359 def init_params(self) -> None: 

360 self.v = int_value(self.param.get("V", 0)) 

361 self.r = int_value(self.param["R"]) 

362 self.p = uint_value(self.param["P"], 32) 

363 self.o = str_value(self.param["O"]) 

364 self.u = str_value(self.param["U"]) 

365 self.length = int_value(self.param.get("Length", 40)) 

366 

367 def init_key(self) -> None: 

368 self.key = self.authenticate(self.password) 

369 if self.key is None: 

370 raise PDFPasswordIncorrect 

371 

372 def is_printable(self) -> bool: 

373 return bool(self.p & 4) 

374 

375 def is_modifiable(self) -> bool: 

376 return bool(self.p & 8) 

377 

378 def is_extractable(self) -> bool: 

379 return bool(self.p & 16) 

380 

381 def compute_u(self, key: bytes) -> bytes: 

382 if self.r == 2: 

383 # Algorithm 3.4 

384 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2 

385 else: 

386 # Algorithm 3.5 

387 hash = md5(self.PASSWORD_PADDING) # 2 

388 hash.update(self.docid[0]) # 3 

389 result = Arcfour(key).encrypt(hash.digest()) # 4 

390 for i in range(1, 20): # 5 

391 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

392 result = Arcfour(k).encrypt(result) 

393 result += result # 6 

394 return result 

395 

396 def compute_encryption_key(self, password: bytes) -> bytes: 

397 # Algorithm 3.2 

398 password = (password + self.PASSWORD_PADDING)[:32] # 1 

399 hash = md5(password) # 2 

400 hash.update(self.o) # 3 

401 # See https://github.com/pdfminer/pdfminer.six/issues/186 

402 hash.update(struct.pack("<L", self.p)) # 4 

403 hash.update(self.docid[0]) # 5 

404 if ( 

405 self.r >= 4 

406 and not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata 

407 ): 

408 hash.update(b"\xff\xff\xff\xff") 

409 result = hash.digest() 

410 n = 5 

411 if self.r >= 3: 

412 n = self.length // 8 

413 for _ in range(50): 

414 result = md5(result[:n]).digest() 

415 return result[:n] 

416 

417 def authenticate(self, password: str) -> bytes | None: 

418 password_bytes = password.encode("latin1") 

419 key = self.authenticate_user_password(password_bytes) 

420 if key is None: 

421 key = self.authenticate_owner_password(password_bytes) 

422 return key 

423 

424 def authenticate_user_password(self, password: bytes) -> bytes | None: 

425 key = self.compute_encryption_key(password) 

426 if self.verify_encryption_key(key): 

427 return key 

428 else: 

429 return None 

430 

431 def verify_encryption_key(self, key: bytes) -> bool: 

432 # Algorithm 3.6 

433 u = self.compute_u(key) 

434 if self.r == 2: 

435 return u == self.u 

436 return u[:16] == self.u[:16] 

437 

438 def authenticate_owner_password(self, password: bytes) -> bytes | None: 

439 # Algorithm 3.7 

440 password = (password + self.PASSWORD_PADDING)[:32] 

441 hash = md5(password) 

442 if self.r >= 3: 

443 for _ in range(50): 

444 hash = md5(hash.digest()) 

445 n = 5 

446 if self.r >= 3: 

447 n = self.length // 8 

448 key = hash.digest()[:n] 

449 if self.r == 2: 

450 user_password = Arcfour(key).decrypt(self.o) 

451 else: 

452 user_password = self.o 

453 for i in range(19, -1, -1): 

454 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

455 user_password = Arcfour(k).decrypt(user_password) 

456 return self.authenticate_user_password(user_password) 

457 

458 def decrypt( 

459 self, 

460 objid: int, 

461 genno: int, 

462 data: bytes, 

463 attrs: dict[str, Any] | None = None, 

464 ) -> bytes: 

465 return self.decrypt_rc4(objid, genno, data) 

466 

467 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes: 

468 assert self.key is not None 

469 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2] 

470 hash = md5(key) 

471 key = hash.digest()[: min(len(key), 16)] 

472 return Arcfour(key).decrypt(data) 

473 

474 

475class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler): 

476 supported_revisions: tuple[int, ...] = (4,) 

477 

478 def init_params(self) -> None: 

479 super().init_params() 

480 self.length = 128 

481 self.cf = dict_value(self.param.get("CF")) 

482 self.stmf = literal_name(self.param["StmF"]) 

483 self.strf = literal_name(self.param["StrF"]) 

484 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True)) 

485 if self.stmf != self.strf: 

486 error_msg = f"Unsupported crypt filter: param={self.param!r}" 

487 raise PDFEncryptionError(error_msg) 

488 self.cfm = {} 

489 for k, v in self.cf.items(): 

490 f = self.get_cfm(literal_name(v["CFM"])) 

491 if f is None: 

492 error_msg = f"Unknown crypt filter method: param={self.param!r}" 

493 raise PDFEncryptionError(error_msg) 

494 self.cfm[k] = f 

495 self.cfm["Identity"] = self.decrypt_identity 

496 if self.strf not in self.cfm: 

497 error_msg = f"Undefined crypt filter: param={self.param!r}" 

498 raise PDFEncryptionError(error_msg) 

499 

500 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None: 

501 if name == "V2": 

502 return self.decrypt_rc4 

503 elif name == "AESV2": 

504 return self.decrypt_aes128 

505 else: 

506 return None 

507 

508 def decrypt( 

509 self, 

510 objid: int, 

511 genno: int, 

512 data: bytes, 

513 attrs: dict[str, Any] | None = None, 

514 name: str | None = None, 

515 ) -> bytes: 

516 if not self.encrypt_metadata and attrs is not None: 

517 t = attrs.get("Type") 

518 if t is not None and literal_name(t) == "Metadata": 

519 return data 

520 if name is None: 

521 name = self.strf 

522 return self.cfm[name](objid, genno, data) 

523 

524 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes: 

525 return data 

526 

527 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes: 

528 assert self.key is not None 

529 key = ( 

530 self.key 

531 + struct.pack("<L", objid)[:3] 

532 + struct.pack("<L", genno)[:2] 

533 + b"sAlT" 

534 ) 

535 hash = md5(key) 

536 key = hash.digest()[: min(len(key), 16)] 

537 initialization_vector = data[:16] 

538 ciphertext = data[16:] 

539 cipher = Cipher( 

540 algorithms.AES(key), 

541 modes.CBC(initialization_vector), 

542 backend=default_backend(), 

543 ) # type: ignore 

544 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

545 return unpad_aes(plaintext) 

546 

547 

548class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4): 

549 supported_revisions = (5, 6) 

550 

551 def init_params(self) -> None: 

552 super().init_params() 

553 self.length = 256 

554 self.oe = str_value(self.param["OE"]) 

555 self.ue = str_value(self.param["UE"]) 

556 self.o_hash = self.o[:32] 

557 self.o_validation_salt = self.o[32:40] 

558 self.o_key_salt = self.o[40:] 

559 self.u_hash = self.u[:32] 

560 self.u_validation_salt = self.u[32:40] 

561 self.u_key_salt = self.u[40:] 

562 

563 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None: 

564 if name == "AESV3": 

565 return self.decrypt_aes256 

566 else: 

567 return None 

568 

569 def authenticate(self, password: str) -> bytes | None: 

570 password_b = self._normalize_password(password) 

571 hash = self._password_hash(password_b, self.o_validation_salt, self.u) 

572 if hash == self.o_hash: 

573 hash = self._password_hash(password_b, self.o_key_salt, self.u) 

574 cipher = Cipher( 

575 algorithms.AES(hash), 

576 modes.CBC(b"\0" * 16), 

577 backend=default_backend(), 

578 ) # type: ignore 

579 return cipher.decryptor().update(self.oe) # type: ignore 

580 hash = self._password_hash(password_b, self.u_validation_salt) 

581 if hash == self.u_hash: 

582 hash = self._password_hash(password_b, self.u_key_salt) 

583 cipher = Cipher( 

584 algorithms.AES(hash), 

585 modes.CBC(b"\0" * 16), 

586 backend=default_backend(), 

587 ) # type: ignore 

588 return cipher.decryptor().update(self.ue) # type: ignore 

589 return None 

590 

591 def _normalize_password(self, password: str) -> bytes: 

592 if self.r == 6: 

593 # saslprep expects non-empty strings, apparently 

594 if not password: 

595 return b"" 

596 from pdfminer._saslprep import saslprep 

597 

598 password = saslprep(password) 

599 return password.encode("utf-8")[:127] 

600 

601 def _password_hash( 

602 self, 

603 password: bytes, 

604 salt: bytes, 

605 vector: bytes | None = None, 

606 ) -> bytes: 

607 """Compute password hash depending on revision number""" 

608 if self.r == 5: 

609 return self._r5_password(password, salt, vector) 

610 return self._r6_password(password, salt[0:8], vector) 

611 

612 def _r5_password( 

613 self, 

614 password: bytes, 

615 salt: bytes, 

616 vector: bytes | None = None, 

617 ) -> bytes: 

618 """Compute the password for revision 5""" 

619 hash = sha256(password) 

620 hash.update(salt) 

621 if vector is not None: 

622 hash.update(vector) 

623 return hash.digest() 

624 

625 def _r6_password( 

626 self, 

627 password: bytes, 

628 salt: bytes, 

629 vector: bytes | None = None, 

630 ) -> bytes: 

631 """Compute the password for revision 6""" 

632 initial_hash = sha256(password) 

633 initial_hash.update(salt) 

634 if vector is not None: 

635 initial_hash.update(vector) 

636 k = initial_hash.digest() 

637 hashes = (sha256, sha384, sha512) 

638 round_no = last_byte_val = 0 

639 while round_no < 64 or last_byte_val > round_no - 32: 

640 k1 = (password + k + (vector or b"")) * 64 

641 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1) 

642 # compute the first 16 bytes of e, 

643 # interpreted as an unsigned integer mod 3 

644 next_hash = hashes[self._bytes_mod_3(e[:16])] 

645 k = next_hash(e).digest() 

646 last_byte_val = e[len(e) - 1] 

647 round_no += 1 

648 return k[:32] 

649 

650 @staticmethod 

651 def _bytes_mod_3(input_bytes: bytes) -> int: 

652 # 256 is 1 mod 3, so we can just sum 'em 

653 return sum(b % 3 for b in input_bytes) % 3 

654 

655 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes: 

656 cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) 

657 encryptor = cipher.encryptor() # type: ignore 

658 return encryptor.update(data) + encryptor.finalize() # type: ignore 

659 

660 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes: 

661 initialization_vector = data[:16] 

662 ciphertext = data[16:] 

663 assert self.key is not None 

664 cipher = Cipher( 

665 algorithms.AES(self.key), 

666 modes.CBC(initialization_vector), 

667 backend=default_backend(), 

668 ) # type: ignore 

669 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

670 return unpad_aes(plaintext) 

671 

672 

673class PDFDocument: 

674 """PDFDocument object represents a PDF document. 

675 

676 Since a PDF file can be very big, normally it is not loaded at 

677 once. So PDF document has to cooperate with a PDF parser in order to 

678 dynamically import the data as processing goes. 

679 

680 Typical usage: 

681 doc = PDFDocument(parser, password) 

682 obj = doc.getobj(objid) 

683 

684 """ 

685 

686 security_handler_registry: ClassVar[dict[int, type[PDFStandardSecurityHandler]]] = { 

687 1: PDFStandardSecurityHandler, 

688 2: PDFStandardSecurityHandler, 

689 4: PDFStandardSecurityHandlerV4, 

690 5: PDFStandardSecurityHandlerV5, 

691 } 

692 

693 def __init__( 

694 self, 

695 parser: PDFParser, 

696 password: str = "", 

697 caching: bool = True, 

698 fallback: bool = True, 

699 ) -> None: 

700 """Set the document to use a given PDFParser object.""" 

701 self.caching = caching 

702 self.xrefs: list[PDFBaseXRef] = [] 

703 self.info = [] 

704 self.catalog: dict[str, Any] = {} 

705 self.encryption: tuple[Any, Any] | None = None 

706 self.decipher: DecipherCallable | None = None 

707 self._parser = None 

708 self._cached_objs: dict[int, tuple[object, int]] = {} 

709 self._parsed_objs: dict[int, tuple[list[object], int]] = {} 

710 self._parser = parser 

711 self._parser.set_document(self) 

712 self.is_printable = self.is_modifiable = self.is_extractable = True 

713 # Retrieve the information of each header that was appended 

714 # (maybe multiple times) at the end of the document. 

715 self._xrefpos: set[int] = set() 

716 try: 

717 pos = self.find_xref(parser) 

718 self.read_xref_from(parser, pos, self.xrefs) 

719 except PDFNoValidXRef: 

720 if fallback: 

721 parser.fallback = True 

722 newxref = PDFXRefFallback() 

723 newxref.load(parser) 

724 self.xrefs.append(newxref) 

725 

726 for xref in self.xrefs: 

727 trailer = xref.get_trailer() 

728 if not trailer: 

729 continue 

730 # If there's an encryption info, remember it. 

731 if "Encrypt" in trailer: 

732 # Some documents may not have a /ID, use two empty 

733 # byte strings instead. Solves 

734 # https://github.com/pdfminer/pdfminer.six/issues/594 

735 id_value = list_value(trailer["ID"]) if "ID" in trailer else (b"", b"") 

736 self.encryption = (id_value, dict_value(trailer["Encrypt"])) 

737 self._initialize_password(password) 

738 if "Info" in trailer: 

739 self.info.append(dict_value(trailer["Info"])) 

740 if "Root" in trailer: 

741 # Every PDF file must have exactly one /Root dictionary. 

742 self.catalog = dict_value(trailer["Root"]) 

743 break 

744 else: 

745 raise PDFSyntaxError("No /Root object! - Is this really a PDF?") 

746 if self.catalog.get("Type") is not LITERAL_CATALOG and settings.STRICT: 

747 raise PDFSyntaxError("Catalog not found!") 

748 

749 KEYWORD_OBJ = KWD(b"obj") 

750 

751 # _initialize_password(password=b'') 

752 # Perform the initialization with a given password. 

753 def _initialize_password(self, password: str = "") -> None: 

754 assert self.encryption is not None 

755 (docid, param) = self.encryption 

756 if literal_name(param.get("Filter")) != "Standard": 

757 raise PDFEncryptionError(f"Unknown filter: param={param!r}") 

758 v = int_value(param.get("V", 0)) 

759 factory = self.security_handler_registry.get(v) 

760 if factory is None: 

761 raise PDFEncryptionError(f"Unknown algorithm: param={param!r}") 

762 handler = factory(docid, param, password) 

763 self.decipher = handler.decrypt 

764 self.is_printable = handler.is_printable() 

765 self.is_modifiable = handler.is_modifiable() 

766 self.is_extractable = handler.is_extractable() 

767 assert self._parser is not None 

768 self._parser.fallback = False # need to read streams with exact length 

769 

770 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object: 

771 if stream.objid in self._parsed_objs: 

772 (objs, n) = self._parsed_objs[stream.objid] 

773 else: 

774 (objs, n) = self._get_objects(stream) 

775 if self.caching: 

776 assert stream.objid is not None 

777 self._parsed_objs[stream.objid] = (objs, n) 

778 i = n * 2 + index 

779 try: 

780 obj = objs[i] 

781 except IndexError as err: 

782 raise PDFSyntaxError(f"index too big: {index!r}") from err 

783 return obj 

784 

785 def _get_objects(self, stream: PDFStream) -> tuple[list[object], int]: 

786 if stream.get("Type") is not LITERAL_OBJSTM and settings.STRICT: 

787 raise PDFSyntaxError(f"Not a stream object: {stream!r}") 

788 try: 

789 n = cast(int, stream["N"]) 

790 except KeyError: 

791 if settings.STRICT: 

792 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None 

793 n = 0 

794 parser = PDFStreamParser(stream.get_data()) 

795 parser.set_document(self) 

796 objs: list[object] = [] 

797 try: 

798 while 1: 

799 (_, obj) = parser.nextobject() 

800 objs.append(obj) 

801 except PSEOF: 

802 pass 

803 return (objs, n) 

804 

805 def _getobj_parse(self, pos: int, objid: int) -> object: 

806 assert self._parser is not None 

807 self._parser.seek(pos) 

808 (_, objid1) = self._parser.nexttoken() # objid 

809 (_, _genno) = self._parser.nexttoken() # genno 

810 (_, kwd) = self._parser.nexttoken() 

811 # hack around malformed pdf files 

812 # copied from https://github.com/jaepil/pdfminer3k/blob/master/ 

813 # pdfminer/pdfparser.py#L399 

814 # to solve https://github.com/pdfminer/pdfminer.six/issues/56 

815 # assert objid1 == objid, str((objid1, objid)) 

816 if objid1 != objid: 

817 x = [] 

818 while kwd is not self.KEYWORD_OBJ: 

819 (_, kwd) = self._parser.nexttoken() 

820 x.append(kwd) 

821 if len(x) >= 2: 

822 objid1 = x[-2] 

823 # #### end hack around malformed pdf files 

824 if objid1 != objid: 

825 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}") 

826 

827 if kwd != KWD(b"obj"): 

828 raise PDFSyntaxError(f"Invalid object spec: offset={pos!r}") 

829 (_, obj) = self._parser.nextobject() 

830 return obj 

831 

832 # can raise PDFObjectNotFound 

833 def getobj(self, objid: int) -> object: 

834 """Get object from PDF 

835 

836 :raises PDFException if PDFDocument is not initialized 

837 :raises PDFObjectNotFound if objid does not exist in PDF 

838 """ 

839 if not self.xrefs: 

840 raise PDFException("PDFDocument is not initialized") 

841 log.debug("getobj: objid=%r", objid) 

842 obj: object # Initialize to satisfy mypy; always assigned in branches below 

843 genno: int 

844 if objid in self._cached_objs: 

845 (obj, genno) = self._cached_objs[objid] 

846 else: 

847 for xref in self.xrefs: 

848 try: 

849 (strmid, index, genno) = xref.get_pos(objid) 

850 except KeyError: 

851 continue 

852 try: 

853 if strmid is not None: 

854 stream = stream_value(self.getobj(strmid)) 

855 obj = self._getobj_objstm(stream, index, objid) 

856 else: 

857 obj = self._getobj_parse(index, objid) 

858 if self.decipher: 

859 obj = decipher_all(self.decipher, objid, genno, obj) 

860 

861 if isinstance(obj, PDFStream): 

862 obj.set_objid(objid, genno) 

863 break 

864 except (PSEOF, PDFSyntaxError): 

865 continue 

866 else: 

867 raise PDFObjectNotFound(objid) 

868 log.debug("register: objid=%r: %r", objid, obj) 

869 if self.caching: 

870 self._cached_objs[objid] = (obj, genno) 

871 return obj 

872 

873 OutlineType = tuple[Any, Any, Any, Any, Any] 

874 

875 def get_outlines(self) -> Iterator[OutlineType]: 

876 if "Outlines" not in self.catalog: 

877 raise PDFNoOutlines 

878 

879 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]: 

880 entry = dict_value(entry) 

881 if "Title" in entry and ("A" in entry or "Dest" in entry): 

882 title = decode_text(str_value(entry["Title"])) 

883 dest = entry.get("Dest") 

884 action = entry.get("A") 

885 se = entry.get("SE") 

886 yield (level, title, dest, action, se) 

887 if "First" in entry and "Last" in entry: 

888 yield from search(entry["First"], level + 1) 

889 if "Next" in entry: 

890 yield from search(entry["Next"], level) 

891 

892 return search(self.catalog["Outlines"], 0) 

893 

894 def get_page_labels(self) -> Iterator[str]: 

895 """Generate page label strings for the PDF document. 

896 

897 If the document includes page labels, generates strings, one per page. 

898 If not, raises PDFNoPageLabels. 

899 

900 The resulting iteration is unbounded. 

901 """ 

902 assert self.catalog is not None 

903 

904 try: 

905 page_labels = PageLabels(self.catalog["PageLabels"]) 

906 except (PDFTypeError, KeyError) as err: 

907 raise PDFNoPageLabels from err 

908 

909 return page_labels.labels 

910 

911 def lookup_name(self, cat: str, key: str | bytes) -> Any: 

912 try: 

913 names = dict_value(self.catalog["Names"]) 

914 except (PDFTypeError, KeyError) as err: 

915 raise PDFKeyError((cat, key)) from err 

916 # may raise KeyError 

917 d0 = dict_value(names[cat]) 

918 

919 def lookup(d: dict[str, Any]) -> Any: 

920 if "Limits" in d: 

921 (k1, k2) = list_value(d["Limits"]) 

922 if key < k1 or k2 < key: 

923 return None 

924 if "Names" in d: 

925 objs = list_value(d["Names"]) 

926 names = dict( 

927 cast(Iterator[tuple[str | bytes, Any]], choplist(2, objs)), 

928 ) 

929 return names[key] 

930 if "Kids" in d: 

931 for c in list_value(d["Kids"]): 

932 v = lookup(dict_value(c)) 

933 if v: 

934 return v 

935 raise PDFKeyError((cat, key)) 

936 

937 return lookup(d0) 

938 

939 def get_dest(self, name: str | bytes) -> Any: 

940 try: 

941 # PDF-1.2 or later 

942 obj = self.lookup_name("Dests", name) 

943 except KeyError: 

944 # PDF-1.1 or prior 

945 if "Dests" not in self.catalog: 

946 raise PDFDestinationNotFound(name) from None 

947 d0 = dict_value(self.catalog["Dests"]) 

948 if name not in d0: 

949 raise PDFDestinationNotFound(name) from None 

950 obj = d0[name] 

951 return obj 

952 

953 # find_xref 

954 def find_xref(self, parser: PDFParser) -> int: 

955 """Internal function used to locate the first XRef.""" 

956 # search the last xref table by scanning the file backwards. 

957 prev = b"" 

958 for line in parser.revreadlines(): 

959 line = line.strip() 

960 log.debug("find_xref: %r", line) 

961 

962 if line == b"startxref": 

963 log.debug("xref found: pos=%r", prev) 

964 

965 if not prev.isdigit(): 

966 raise PDFNoValidXRef(f"Invalid xref position, no digit: {prev!r}") 

967 

968 start = int(prev) 

969 

970 if not start >= 0: 

971 raise PDFNoValidXRef(f"Invalid xref position, negative: {start}") 

972 

973 # The xref start needs to fit in a C ssize_t to be a proper file offset 

974 if start >= 2**31: 

975 raise PDFNoValidXRef(f"Invalid xref position, too large: {start!r}") 

976 

977 return start 

978 

979 if line: 

980 prev = line 

981 

982 raise PDFNoValidXRef("Unexpected EOF") 

983 

984 # read xref table 

985 def read_xref_from( 

986 self, 

987 parser: PDFParser, 

988 start: int, 

989 xrefs: list[PDFBaseXRef], 

990 ) -> None: 

991 """Reads XRefs from the given location.""" 

992 if start in self._xrefpos: 

993 raise PDFNoValidXRef(f"Detected circular xref chain at {start}") 

994 return 

995 self._xrefpos.add(start) 

996 parser.seek(start) 

997 parser.reset() 

998 try: 

999 (pos, token) = parser.nexttoken() 

1000 except PSEOF as err: 

1001 raise PDFNoValidXRef("Unexpected EOF") from err 

1002 log.debug("read_xref_from: start=%d, token=%r", start, token) 

1003 if isinstance(token, int): 

1004 # XRefStream: PDF-1.5 

1005 parser.seek(pos) 

1006 parser.reset() 

1007 xref: PDFBaseXRef = PDFXRefStream() 

1008 xref.load(parser) 

1009 else: 

1010 if token is parser.KEYWORD_XREF: 

1011 parser.nextline() 

1012 xref = PDFXRef() 

1013 xref.load(parser) 

1014 xrefs.append(xref) 

1015 trailer = xref.get_trailer() 

1016 log.debug("trailer: %r", trailer) 

1017 if "XRefStm" in trailer: 

1018 pos = int_value(trailer["XRefStm"]) 

1019 self.read_xref_from(parser, pos, xrefs) 

1020 if "Prev" in trailer: 

1021 # find previous xref 

1022 pos = int_value(trailer["Prev"]) 

1023 self.read_xref_from(parser, pos, xrefs) 

1024 

1025 

1026class PageLabels(NumberTree): 

1027 """PageLabels from the document catalog. 

1028 

1029 See Section 8.3.1 in the PDF Reference. 

1030 """ 

1031 

1032 @property 

1033 def labels(self) -> Iterator[str]: 

1034 ranges = self.values 

1035 

1036 # The tree must begin with page index 0 

1037 if len(ranges) == 0 or ranges[0][0] != 0: 

1038 if settings.STRICT: 

1039 raise PDFSyntaxError("PageLabels is missing page index 0") 

1040 else: 

1041 # Try to cope, by assuming empty labels for the initial pages 

1042 ranges.insert(0, (0, {})) 

1043 

1044 for next, (start, label_dict_unchecked) in enumerate(ranges, 1): 

1045 label_dict = dict_value(label_dict_unchecked) 

1046 style = label_dict.get("S") 

1047 prefix = decode_text(str_value(label_dict.get("P", b""))) 

1048 first_value = int_value(label_dict.get("St", 1)) 

1049 

1050 if next == len(ranges): 

1051 # This is the last specified range. It continues until the end 

1052 # of the document. 

1053 values: Iterable[int] = itertools.count(first_value) 

1054 else: 

1055 end, _ = ranges[next] 

1056 range_length = end - start 

1057 values = range(first_value, first_value + range_length) 

1058 

1059 for value in values: 

1060 label = self._format_page_label(value, style) 

1061 yield prefix + label 

1062 

1063 @staticmethod 

1064 def _format_page_label(value: int, style: Any) -> str: 

1065 """Format page label value in a specific style""" 

1066 if style is None: 

1067 label = "" 

1068 elif style is LIT("D"): # Decimal arabic numerals 

1069 label = str(value) 

1070 elif style is LIT("R"): # Uppercase roman numerals 

1071 label = format_int_roman(value).upper() 

1072 elif style is LIT("r"): # Lowercase roman numerals 

1073 label = format_int_roman(value) 

1074 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ... 

1075 label = format_int_alpha(value).upper() 

1076 elif style is LIT("a"): # Lowercase letters a-z, aa-zz... 

1077 label = format_int_alpha(value) 

1078 else: 

1079 log.warning("Unknown page label style: %r", style) 

1080 label = "" 

1081 return label