Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 62%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

720 statements  

1import itertools 

2import logging 

3import re 

4import struct 

5from collections.abc import Callable, Iterable, Iterator, KeysView, Sequence 

6from hashlib import md5, sha256, sha384, sha512 

7from typing import ( 

8 Any, 

9 ClassVar, 

10 cast, 

11) 

12 

13from cryptography.hazmat.backends import default_backend 

14from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

15 

16from pdfminer import settings 

17from pdfminer.arcfour import Arcfour 

18from pdfminer.casting import safe_int 

19from pdfminer.data_structures import NumberTree 

20from pdfminer.pdfexceptions import ( 

21 PDFException, 

22 PDFKeyError, 

23 PDFObjectNotFound, 

24 PDFTypeError, 

25) 

26from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError 

27from pdfminer.pdftypes import ( 

28 DecipherCallable, 

29 PDFStream, 

30 decipher_all, 

31 dict_value, 

32 int_value, 

33 list_value, 

34 str_value, 

35 stream_value, 

36 uint_value, 

37) 

38from pdfminer.psexceptions import PSEOF 

39from pdfminer.psparser import KWD, LIT, literal_name 

40from pdfminer.utils import ( 

41 choplist, 

42 decode_text, 

43 format_int_alpha, 

44 format_int_roman, 

45 nunpack, 

46 unpad_aes, 

47) 

48 

49log = logging.getLogger(__name__) 

50 

51 

52class PDFNoValidXRef(PDFSyntaxError): 

53 pass 

54 

55 

56class PDFNoValidXRefWarning(SyntaxWarning): 

57 """Legacy warning for missing xref. 

58 

59 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

60 """ 

61 

62 

63class PDFNoOutlines(PDFException): 

64 pass 

65 

66 

67class PDFNoPageLabels(PDFException): 

68 pass 

69 

70 

71class PDFDestinationNotFound(PDFException): 

72 pass 

73 

74 

75class PDFEncryptionError(PDFException): 

76 pass 

77 

78 

79class PDFPasswordIncorrect(PDFEncryptionError): 

80 pass 

81 

82 

83class PDFEncryptionWarning(UserWarning): 

84 """Legacy warning for failed decryption. 

85 

86 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

87 """ 

88 

89 

90class PDFTextExtractionNotAllowedWarning(UserWarning): 

91 """Legacy warning for PDF that does not allow extraction. 

92 

93 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

94 """ 

95 

96 

97class PDFTextExtractionNotAllowed(PDFEncryptionError): 

98 pass 

99 

100 

101# some predefined literals and keywords. 

102LITERAL_OBJSTM = LIT("ObjStm") 

103LITERAL_XREF = LIT("XRef") 

104LITERAL_CATALOG = LIT("Catalog") 

105 

106 

107class PDFBaseXRef: 

108 def get_trailer(self) -> dict[str, Any]: 

109 raise NotImplementedError 

110 

111 def get_objids(self) -> Iterable[int]: 

112 return [] 

113 

114 # Must return 

115 # (strmid, index, genno) 

116 # or (None, pos, genno) 

117 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

118 raise PDFKeyError(objid) 

119 

120 def load(self, parser: PDFParser) -> None: 

121 raise NotImplementedError 

122 

123 

124class PDFXRef(PDFBaseXRef): 

125 def __init__(self) -> None: 

126 self.offsets: dict[int, tuple[int | None, int, int]] = {} 

127 self.trailer: dict[str, Any] = {} 

128 

129 def __repr__(self) -> str: 

130 return f"<PDFXRef: offsets={self.offsets.keys()!r}>" 

131 

132 def load(self, parser: PDFParser) -> None: 

133 while True: 

134 try: 

135 (pos, line) = parser.nextline() 

136 line = line.strip() 

137 if not line: 

138 continue 

139 except PSEOF as err: 

140 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err 

141 if line.startswith(b"trailer"): 

142 parser.seek(pos) 

143 break 

144 f = line.split(b" ") 

145 if len(f) != 2: 

146 error_msg = f"Trailer not found: {parser!r}: line={line!r}" 

147 raise PDFNoValidXRef(error_msg) 

148 try: 

149 (start, nobjs) = map(int, f) 

150 except ValueError as err: 

151 error_msg = f"Invalid line: {parser!r}: line={line!r}" 

152 raise PDFNoValidXRef(error_msg) from err 

153 for objid in range(start, start + nobjs): 

154 try: 

155 (_, line) = parser.nextline() 

156 line = line.strip() 

157 except PSEOF as err: 

158 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err 

159 f = line.split(b" ") 

160 if len(f) != 3: 

161 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}" 

162 raise PDFNoValidXRef(error_msg) 

163 (pos_b, genno_b, use_b) = f 

164 if use_b != b"n": 

165 continue 

166 

167 pos_i = safe_int(pos_b) 

168 genno_i = safe_int(genno_b) 

169 if pos_i is not None and genno_i is not None: 

170 self.offsets[objid] = (None, pos_i, genno_i) 

171 else: 

172 log.warning( 

173 "Not adding object %s to xref because position %r " 

174 "or generation number %r cannot be parsed as an int", 

175 objid, 

176 pos_b, 

177 genno_b, 

178 ) 

179 

180 log.debug("xref objects: %r", self.offsets) 

181 self.load_trailer(parser) 

182 

183 def load_trailer(self, parser: PDFParser) -> None: 

184 try: 

185 (_, kwd) = parser.nexttoken() 

186 assert kwd is KWD(b"trailer"), str(kwd) 

187 (_, dic) = parser.nextobject() 

188 except PSEOF: 

189 x = parser.pop(1) 

190 if not x: 

191 raise PDFNoValidXRef("Unexpected EOF - file corrupted") from None 

192 (_, dic) = x[0] 

193 self.trailer.update(dict_value(dic)) 

194 log.debug("trailer=%r", self.trailer) 

195 

196 def get_trailer(self) -> dict[str, Any]: 

197 return self.trailer 

198 

199 def get_objids(self) -> KeysView[int]: 

200 return self.offsets.keys() 

201 

202 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

203 return self.offsets[objid] 

204 

205 

206class PDFXRefFallback(PDFXRef): 

207 def __repr__(self) -> str: 

208 return f"<PDFXRefFallback: offsets={self.offsets.keys()!r}>" 

209 

210 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b") 

211 

212 def load(self, parser: PDFParser) -> None: 

213 parser.seek(0) 

214 while 1: 

215 try: 

216 (pos, line_bytes) = parser.nextline() 

217 except PSEOF: 

218 break 

219 if line_bytes.startswith(b"trailer"): 

220 parser.seek(pos) 

221 self.load_trailer(parser) 

222 log.debug("trailer: %r", self.trailer) 

223 break 

224 line = line_bytes.decode("latin-1") # default pdf encoding 

225 m = self.PDFOBJ_CUE.match(line) 

226 if not m: 

227 continue 

228 (objid_s, genno_s) = m.groups() 

229 objid = int(objid_s) 

230 genno = int(genno_s) 

231 self.offsets[objid] = (None, pos, genno) 

232 # expand ObjStm. 

233 parser.seek(pos) 

234 (_, obj) = parser.nextobject() 

235 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM: 

236 stream = stream_value(obj) 

237 try: 

238 n = stream["N"] 

239 except KeyError: 

240 if settings.STRICT: 

241 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None 

242 n = 0 

243 parser1 = PDFStreamParser(stream.get_data()) 

244 objs: list[int] = [] 

245 try: 

246 while 1: 

247 (_, obj) = parser1.nextobject() 

248 objs.append(cast(int, obj)) 

249 except PSEOF: 

250 pass 

251 n = min(n, len(objs) // 2) 

252 for index in range(n): 

253 objid1 = objs[index * 2] 

254 self.offsets[objid1] = (objid, index, 0) 

255 

256 

257class PDFXRefStream(PDFBaseXRef): 

258 def __init__(self) -> None: 

259 self.data: bytes | None = None 

260 self.entlen: int | None = None 

261 self.fl1: int | None = None 

262 self.fl2: int | None = None 

263 self.fl3: int | None = None 

264 self.ranges: list[tuple[int, int]] = [] 

265 

266 def __repr__(self) -> str: 

267 return f"<PDFXRefStream: ranges={self.ranges!r}>" 

268 

269 def load(self, parser: PDFParser) -> None: 

270 (_, _objid) = parser.nexttoken() # ignored 

271 (_, _genno) = parser.nexttoken() # ignored 

272 (_, _kwd) = parser.nexttoken() 

273 (_, stream) = parser.nextobject() 

274 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF: 

275 raise PDFNoValidXRef("Invalid PDF stream spec.") 

276 size = stream["Size"] 

277 index_array = stream.get("Index", (0, size)) 

278 if len(index_array) % 2 != 0: 

279 raise PDFSyntaxError("Invalid index number") 

280 self.ranges.extend(cast(Iterator[tuple[int, int]], choplist(2, index_array))) 

281 (self.fl1, self.fl2, self.fl3) = stream["W"] 

282 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

283 self.data = stream.get_data() 

284 self.entlen = self.fl1 + self.fl2 + self.fl3 

285 self.trailer = stream.attrs 

286 log.debug( 

287 "xref stream: objid=%s, fields=%d,%d,%d", 

288 ", ".join(map(repr, self.ranges)), 

289 self.fl1, 

290 self.fl2, 

291 self.fl3, 

292 ) 

293 

294 def get_trailer(self) -> dict[str, Any]: 

295 return self.trailer 

296 

297 def get_objids(self) -> Iterator[int]: 

298 for start, nobjs in self.ranges: 

299 for i in range(nobjs): 

300 assert self.entlen is not None 

301 assert self.data is not None 

302 offset = self.entlen * i 

303 ent = self.data[offset : offset + self.entlen] 

304 f1 = nunpack(ent[: self.fl1], 1) 

305 if f1 == 1 or f1 == 2: 

306 yield start + i 

307 

308 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

309 index = 0 

310 for start, nobjs in self.ranges: 

311 if start <= objid and objid < start + nobjs: 

312 index += objid - start 

313 break 

314 else: 

315 index += nobjs 

316 else: 

317 raise PDFKeyError(objid) 

318 assert self.entlen is not None 

319 assert self.data is not None 

320 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

321 offset = self.entlen * index 

322 ent = self.data[offset : offset + self.entlen] 

323 f1 = nunpack(ent[: self.fl1], 1) 

324 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2]) 

325 f3 = nunpack(ent[self.fl1 + self.fl2 :]) 

326 if f1 == 1: 

327 return (None, f2, f3) 

328 elif f1 == 2: 

329 return (f2, f3, 0) 

330 else: 

331 # this is a free object 

332 raise PDFKeyError(objid) 

333 

334 

335class PDFStandardSecurityHandler: 

336 PASSWORD_PADDING = ( 

337 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz" 

338 ) 

339 supported_revisions: tuple[int, ...] = (2, 3) 

340 

341 def __init__( 

342 self, 

343 docid: Sequence[bytes], 

344 param: dict[str, Any], 

345 password: str = "", 

346 ) -> None: 

347 self.docid = docid 

348 self.param = param 

349 self.password = password 

350 self.init() 

351 

352 def init(self) -> None: 

353 self.init_params() 

354 if self.r not in self.supported_revisions: 

355 error_msg = f"Unsupported revision: param={self.param!r}" 

356 raise PDFEncryptionError(error_msg) 

357 self.init_key() 

358 

359 def init_params(self) -> None: 

360 self.v = int_value(self.param.get("V", 0)) 

361 self.r = int_value(self.param["R"]) 

362 self.p = uint_value(self.param["P"], 32) 

363 self.o = str_value(self.param["O"]) 

364 self.u = str_value(self.param["U"]) 

365 self.length = int_value(self.param.get("Length", 40)) 

366 

367 def init_key(self) -> None: 

368 self.key = self.authenticate(self.password) 

369 if self.key is None: 

370 raise PDFPasswordIncorrect 

371 

372 def is_printable(self) -> bool: 

373 return bool(self.p & 4) 

374 

375 def is_modifiable(self) -> bool: 

376 return bool(self.p & 8) 

377 

378 def is_extractable(self) -> bool: 

379 return bool(self.p & 16) 

380 

381 def compute_u(self, key: bytes) -> bytes: 

382 if self.r == 2: 

383 # Algorithm 3.4 

384 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2 

385 else: 

386 # Algorithm 3.5 

387 hash = md5(self.PASSWORD_PADDING) # 2 

388 hash.update(self.docid[0]) # 3 

389 result = Arcfour(key).encrypt(hash.digest()) # 4 

390 for i in range(1, 20): # 5 

391 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

392 result = Arcfour(k).encrypt(result) 

393 result += result # 6 

394 return result 

395 

396 def compute_encryption_key(self, password: bytes) -> bytes: 

397 # Algorithm 3.2 

398 password = (password + self.PASSWORD_PADDING)[:32] # 1 

399 hash = md5(password) # 2 

400 hash.update(self.o) # 3 

401 # See https://github.com/pdfminer/pdfminer.six/issues/186 

402 hash.update(struct.pack("<L", self.p)) # 4 

403 hash.update(self.docid[0]) # 5 

404 if ( 

405 self.r >= 4 

406 and not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata 

407 ): 

408 hash.update(b"\xff\xff\xff\xff") 

409 result = hash.digest() 

410 n = 5 

411 if self.r >= 3: 

412 n = self.length // 8 

413 for _ in range(50): 

414 result = md5(result[:n]).digest() 

415 return result[:n] 

416 

417 def authenticate(self, password: str) -> bytes | None: 

418 password_bytes = password.encode("latin1") 

419 key = self.authenticate_user_password(password_bytes) 

420 if key is None: 

421 key = self.authenticate_owner_password(password_bytes) 

422 return key 

423 

424 def authenticate_user_password(self, password: bytes) -> bytes | None: 

425 key = self.compute_encryption_key(password) 

426 if self.verify_encryption_key(key): 

427 return key 

428 else: 

429 return None 

430 

431 def verify_encryption_key(self, key: bytes) -> bool: 

432 # Algorithm 3.6 

433 u = self.compute_u(key) 

434 if self.r == 2: 

435 return u == self.u 

436 return u[:16] == self.u[:16] 

437 

438 def authenticate_owner_password(self, password: bytes) -> bytes | None: 

439 # Algorithm 3.7 

440 password = (password + self.PASSWORD_PADDING)[:32] 

441 hash = md5(password) 

442 if self.r >= 3: 

443 for _ in range(50): 

444 hash = md5(hash.digest()) 

445 n = 5 

446 if self.r >= 3: 

447 n = self.length // 8 

448 key = hash.digest()[:n] 

449 if self.r == 2: 

450 user_password = Arcfour(key).decrypt(self.o) 

451 else: 

452 user_password = self.o 

453 for i in range(19, -1, -1): 

454 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

455 user_password = Arcfour(k).decrypt(user_password) 

456 return self.authenticate_user_password(user_password) 

457 

458 def decrypt( 

459 self, 

460 objid: int, 

461 genno: int, 

462 data: bytes, 

463 attrs: dict[str, Any] | None = None, 

464 ) -> bytes: 

465 return self.decrypt_rc4(objid, genno, data) 

466 

467 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes: 

468 assert self.key is not None 

469 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2] 

470 hash = md5(key) 

471 key = hash.digest()[: min(len(key), 16)] 

472 return Arcfour(key).decrypt(data) 

473 

474 

475class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler): 

476 supported_revisions: tuple[int, ...] = (4,) 

477 

478 def init_params(self) -> None: 

479 super().init_params() 

480 self.length = 128 

481 self.cf = dict_value(self.param.get("CF")) 

482 self.stmf = literal_name(self.param["StmF"]) 

483 self.strf = literal_name(self.param["StrF"]) 

484 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True)) 

485 if self.stmf != self.strf: 

486 error_msg = f"Unsupported crypt filter: param={self.param!r}" 

487 raise PDFEncryptionError(error_msg) 

488 self.cfm = {} 

489 for k, v in self.cf.items(): 

490 f = self.get_cfm(literal_name(v["CFM"])) 

491 if f is None: 

492 error_msg = f"Unknown crypt filter method: param={self.param!r}" 

493 raise PDFEncryptionError(error_msg) 

494 self.cfm[k] = f 

495 self.cfm["Identity"] = self.decrypt_identity 

496 if self.strf not in self.cfm: 

497 error_msg = f"Undefined crypt filter: param={self.param!r}" 

498 raise PDFEncryptionError(error_msg) 

499 

500 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None: 

501 if name == "V2": 

502 return self.decrypt_rc4 

503 elif name == "AESV2": 

504 return self.decrypt_aes128 

505 else: 

506 return None 

507 

508 def decrypt( 

509 self, 

510 objid: int, 

511 genno: int, 

512 data: bytes, 

513 attrs: dict[str, Any] | None = None, 

514 name: str | None = None, 

515 ) -> bytes: 

516 if not self.encrypt_metadata and attrs is not None: 

517 t = attrs.get("Type") 

518 if t is not None and literal_name(t) == "Metadata": 

519 return data 

520 if name is None: 

521 name = self.strf 

522 return self.cfm[name](objid, genno, data) 

523 

524 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes: 

525 return data 

526 

527 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes: 

528 assert self.key is not None 

529 key = ( 

530 self.key 

531 + struct.pack("<L", objid)[:3] 

532 + struct.pack("<L", genno)[:2] 

533 + b"sAlT" 

534 ) 

535 hash = md5(key) 

536 key = hash.digest()[: min(len(key), 16)] 

537 initialization_vector = data[:16] 

538 ciphertext = data[16:] 

539 cipher = Cipher( 

540 algorithms.AES(key), 

541 modes.CBC(initialization_vector), 

542 backend=default_backend(), 

543 ) # type: ignore 

544 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

545 return unpad_aes(plaintext) 

546 

547 

548class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4): 

549 supported_revisions = (5, 6) 

550 

551 def init_params(self) -> None: 

552 super().init_params() 

553 self.length = 256 

554 self.oe = str_value(self.param["OE"]) 

555 self.ue = str_value(self.param["UE"]) 

556 self.o_hash = self.o[:32] 

557 self.o_validation_salt = self.o[32:40] 

558 self.o_key_salt = self.o[40:] 

559 self.u_hash = self.u[:32] 

560 self.u_validation_salt = self.u[32:40] 

561 self.u_key_salt = self.u[40:] 

562 

563 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None: 

564 if name == "AESV3": 

565 return self.decrypt_aes256 

566 else: 

567 return None 

568 

569 def authenticate(self, password: str) -> bytes | None: 

570 password_b = self._normalize_password(password) 

571 hash = self._password_hash(password_b, self.o_validation_salt, self.u) 

572 if hash == self.o_hash: 

573 hash = self._password_hash(password_b, self.o_key_salt, self.u) 

574 cipher = Cipher( 

575 algorithms.AES(hash), 

576 modes.CBC(b"\0" * 16), 

577 backend=default_backend(), 

578 ) # type: ignore 

579 return cipher.decryptor().update(self.oe) # type: ignore 

580 hash = self._password_hash(password_b, self.u_validation_salt) 

581 if hash == self.u_hash: 

582 hash = self._password_hash(password_b, self.u_key_salt) 

583 cipher = Cipher( 

584 algorithms.AES(hash), 

585 modes.CBC(b"\0" * 16), 

586 backend=default_backend(), 

587 ) # type: ignore 

588 return cipher.decryptor().update(self.ue) # type: ignore 

589 return None 

590 

591 def _normalize_password(self, password: str) -> bytes: 

592 if self.r == 6: 

593 # saslprep expects non-empty strings, apparently 

594 if not password: 

595 return b"" 

596 from pdfminer._saslprep import saslprep 

597 

598 password = saslprep(password) 

599 return password.encode("utf-8")[:127] 

600 

601 def _password_hash( 

602 self, 

603 password: bytes, 

604 salt: bytes, 

605 vector: bytes | None = None, 

606 ) -> bytes: 

607 """Compute password hash depending on revision number""" 

608 if self.r == 5: 

609 return self._r5_password(password, salt, vector) 

610 return self._r6_password(password, salt[0:8], vector) 

611 

612 def _r5_password( 

613 self, 

614 password: bytes, 

615 salt: bytes, 

616 vector: bytes | None = None, 

617 ) -> bytes: 

618 """Compute the password for revision 5""" 

619 hash = sha256(password) 

620 hash.update(salt) 

621 if vector is not None: 

622 hash.update(vector) 

623 return hash.digest() 

624 

625 def _r6_password( 

626 self, 

627 password: bytes, 

628 salt: bytes, 

629 vector: bytes | None = None, 

630 ) -> bytes: 

631 """Compute the password for revision 6""" 

632 initial_hash = sha256(password) 

633 initial_hash.update(salt) 

634 if vector is not None: 

635 initial_hash.update(vector) 

636 k = initial_hash.digest() 

637 hashes = (sha256, sha384, sha512) 

638 round_no = last_byte_val = 0 

639 while round_no < 64 or last_byte_val > round_no - 32: 

640 k1 = (password + k + (vector or b"")) * 64 

641 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1) 

642 # compute the first 16 bytes of e, 

643 # interpreted as an unsigned integer mod 3 

644 next_hash = hashes[self._bytes_mod_3(e[:16])] 

645 k = next_hash(e).digest() 

646 last_byte_val = e[len(e) - 1] 

647 round_no += 1 

648 return k[:32] 

649 

650 @staticmethod 

651 def _bytes_mod_3(input_bytes: bytes) -> int: 

652 # 256 is 1 mod 3, so we can just sum 'em 

653 return sum(b % 3 for b in input_bytes) % 3 

654 

655 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes: 

656 cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) 

657 encryptor = cipher.encryptor() # type: ignore 

658 return encryptor.update(data) + encryptor.finalize() # type: ignore 

659 

660 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes: 

661 initialization_vector = data[:16] 

662 ciphertext = data[16:] 

663 assert self.key is not None 

664 cipher = Cipher( 

665 algorithms.AES(self.key), 

666 modes.CBC(initialization_vector), 

667 backend=default_backend(), 

668 ) # type: ignore 

669 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

670 return unpad_aes(plaintext) 

671 

672 

673class PDFDocument: 

674 """PDFDocument object represents a PDF document. 

675 

676 Since a PDF file can be very big, normally it is not loaded at 

677 once. So PDF document has to cooperate with a PDF parser in order to 

678 dynamically import the data as processing goes. 

679 

680 Typical usage: 

681 doc = PDFDocument(parser, password) 

682 obj = doc.getobj(objid) 

683 

684 """ 

685 

686 security_handler_registry: ClassVar[dict[int, type[PDFStandardSecurityHandler]]] = { 

687 1: PDFStandardSecurityHandler, 

688 2: PDFStandardSecurityHandler, 

689 4: PDFStandardSecurityHandlerV4, 

690 5: PDFStandardSecurityHandlerV5, 

691 } 

692 

693 def __init__( 

694 self, 

695 parser: PDFParser, 

696 password: str = "", 

697 caching: bool = True, 

698 fallback: bool = True, 

699 ) -> None: 

700 """Set the document to use a given PDFParser object.""" 

701 self.caching = caching 

702 self.xrefs: list[PDFBaseXRef] = [] 

703 self.info = [] 

704 self.catalog: dict[str, Any] = {} 

705 self.encryption: tuple[Any, Any] | None = None 

706 self.decipher: DecipherCallable | None = None 

707 self._parser = None 

708 self._cached_objs: dict[int, tuple[object, int]] = {} 

709 self._parsed_objs: dict[int, tuple[list[object], int]] = {} 

710 self._parser = parser 

711 self._parser.set_document(self) 

712 self.is_printable = self.is_modifiable = self.is_extractable = True 

713 # Retrieve the information of each header that was appended 

714 # (maybe multiple times) at the end of the document. 

715 try: 

716 pos = self.find_xref(parser) 

717 self.read_xref_from(parser, pos, self.xrefs) 

718 except PDFNoValidXRef: 

719 if fallback: 

720 parser.fallback = True 

721 newxref = PDFXRefFallback() 

722 newxref.load(parser) 

723 self.xrefs.append(newxref) 

724 

725 for xref in self.xrefs: 

726 trailer = xref.get_trailer() 

727 if not trailer: 

728 continue 

729 # If there's an encryption info, remember it. 

730 if "Encrypt" in trailer: 

731 # Some documents may not have a /ID, use two empty 

732 # byte strings instead. Solves 

733 # https://github.com/pdfminer/pdfminer.six/issues/594 

734 id_value = list_value(trailer["ID"]) if "ID" in trailer else (b"", b"") 

735 self.encryption = (id_value, dict_value(trailer["Encrypt"])) 

736 self._initialize_password(password) 

737 if "Info" in trailer: 

738 self.info.append(dict_value(trailer["Info"])) 

739 if "Root" in trailer: 

740 # Every PDF file must have exactly one /Root dictionary. 

741 self.catalog = dict_value(trailer["Root"]) 

742 break 

743 else: 

744 raise PDFSyntaxError("No /Root object! - Is this really a PDF?") 

745 if self.catalog.get("Type") is not LITERAL_CATALOG and settings.STRICT: 

746 raise PDFSyntaxError("Catalog not found!") 

747 

748 KEYWORD_OBJ = KWD(b"obj") 

749 

750 # _initialize_password(password=b'') 

751 # Perform the initialization with a given password. 

752 def _initialize_password(self, password: str = "") -> None: 

753 assert self.encryption is not None 

754 (docid, param) = self.encryption 

755 if literal_name(param.get("Filter")) != "Standard": 

756 raise PDFEncryptionError(f"Unknown filter: param={param!r}") 

757 v = int_value(param.get("V", 0)) 

758 factory = self.security_handler_registry.get(v) 

759 if factory is None: 

760 raise PDFEncryptionError(f"Unknown algorithm: param={param!r}") 

761 handler = factory(docid, param, password) 

762 self.decipher = handler.decrypt 

763 self.is_printable = handler.is_printable() 

764 self.is_modifiable = handler.is_modifiable() 

765 self.is_extractable = handler.is_extractable() 

766 assert self._parser is not None 

767 self._parser.fallback = False # need to read streams with exact length 

768 

769 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object: 

770 if stream.objid in self._parsed_objs: 

771 (objs, n) = self._parsed_objs[stream.objid] 

772 else: 

773 (objs, n) = self._get_objects(stream) 

774 if self.caching: 

775 assert stream.objid is not None 

776 self._parsed_objs[stream.objid] = (objs, n) 

777 i = n * 2 + index 

778 try: 

779 obj = objs[i] 

780 except IndexError as err: 

781 raise PDFSyntaxError(f"index too big: {index!r}") from err 

782 return obj 

783 

784 def _get_objects(self, stream: PDFStream) -> tuple[list[object], int]: 

785 if stream.get("Type") is not LITERAL_OBJSTM and settings.STRICT: 

786 raise PDFSyntaxError(f"Not a stream object: {stream!r}") 

787 try: 

788 n = cast(int, stream["N"]) 

789 except KeyError: 

790 if settings.STRICT: 

791 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None 

792 n = 0 

793 parser = PDFStreamParser(stream.get_data()) 

794 parser.set_document(self) 

795 objs: list[object] = [] 

796 try: 

797 while 1: 

798 (_, obj) = parser.nextobject() 

799 objs.append(obj) 

800 except PSEOF: 

801 pass 

802 return (objs, n) 

803 

804 def _getobj_parse(self, pos: int, objid: int) -> object: 

805 assert self._parser is not None 

806 self._parser.seek(pos) 

807 (_, objid1) = self._parser.nexttoken() # objid 

808 (_, _genno) = self._parser.nexttoken() # genno 

809 (_, kwd) = self._parser.nexttoken() 

810 # hack around malformed pdf files 

811 # copied from https://github.com/jaepil/pdfminer3k/blob/master/ 

812 # pdfminer/pdfparser.py#L399 

813 # to solve https://github.com/pdfminer/pdfminer.six/issues/56 

814 # assert objid1 == objid, str((objid1, objid)) 

815 if objid1 != objid: 

816 x = [] 

817 while kwd is not self.KEYWORD_OBJ: 

818 (_, kwd) = self._parser.nexttoken() 

819 x.append(kwd) 

820 if len(x) >= 2: 

821 objid1 = x[-2] 

822 # #### end hack around malformed pdf files 

823 if objid1 != objid: 

824 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}") 

825 

826 if kwd != KWD(b"obj"): 

827 raise PDFSyntaxError(f"Invalid object spec: offset={pos!r}") 

828 (_, obj) = self._parser.nextobject() 

829 return obj 

830 

831 # can raise PDFObjectNotFound 

832 def getobj(self, objid: int) -> object: 

833 """Get object from PDF 

834 

835 :raises PDFException if PDFDocument is not initialized 

836 :raises PDFObjectNotFound if objid does not exist in PDF 

837 """ 

838 if not self.xrefs: 

839 raise PDFException("PDFDocument is not initialized") 

840 log.debug("getobj: objid=%r", objid) 

841 obj: object # Initialize to satisfy mypy; always assigned in branches below 

842 genno: int 

843 if objid in self._cached_objs: 

844 (obj, genno) = self._cached_objs[objid] 

845 else: 

846 for xref in self.xrefs: 

847 try: 

848 (strmid, index, genno) = xref.get_pos(objid) 

849 except KeyError: 

850 continue 

851 try: 

852 if strmid is not None: 

853 stream = stream_value(self.getobj(strmid)) 

854 obj = self._getobj_objstm(stream, index, objid) 

855 else: 

856 obj = self._getobj_parse(index, objid) 

857 if self.decipher: 

858 obj = decipher_all(self.decipher, objid, genno, obj) 

859 

860 if isinstance(obj, PDFStream): 

861 obj.set_objid(objid, genno) 

862 break 

863 except (PSEOF, PDFSyntaxError): 

864 continue 

865 else: 

866 raise PDFObjectNotFound(objid) 

867 log.debug("register: objid=%r: %r", objid, obj) 

868 if self.caching: 

869 self._cached_objs[objid] = (obj, genno) 

870 return obj 

871 

872 OutlineType = tuple[Any, Any, Any, Any, Any] 

873 

874 def get_outlines(self) -> Iterator[OutlineType]: 

875 if "Outlines" not in self.catalog: 

876 raise PDFNoOutlines 

877 

878 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]: 

879 entry = dict_value(entry) 

880 if "Title" in entry and ("A" in entry or "Dest" in entry): 

881 title = decode_text(str_value(entry["Title"])) 

882 dest = entry.get("Dest") 

883 action = entry.get("A") 

884 se = entry.get("SE") 

885 yield (level, title, dest, action, se) 

886 if "First" in entry and "Last" in entry: 

887 yield from search(entry["First"], level + 1) 

888 if "Next" in entry: 

889 yield from search(entry["Next"], level) 

890 

891 return search(self.catalog["Outlines"], 0) 

892 

893 def get_page_labels(self) -> Iterator[str]: 

894 """Generate page label strings for the PDF document. 

895 

896 If the document includes page labels, generates strings, one per page. 

897 If not, raises PDFNoPageLabels. 

898 

899 The resulting iteration is unbounded. 

900 """ 

901 assert self.catalog is not None 

902 

903 try: 

904 page_labels = PageLabels(self.catalog["PageLabels"]) 

905 except (PDFTypeError, KeyError) as err: 

906 raise PDFNoPageLabels from err 

907 

908 return page_labels.labels 

909 

910 def lookup_name(self, cat: str, key: str | bytes) -> Any: 

911 try: 

912 names = dict_value(self.catalog["Names"]) 

913 except (PDFTypeError, KeyError) as err: 

914 raise PDFKeyError((cat, key)) from err 

915 # may raise KeyError 

916 d0 = dict_value(names[cat]) 

917 

918 def lookup(d: dict[str, Any]) -> Any: 

919 if "Limits" in d: 

920 (k1, k2) = list_value(d["Limits"]) 

921 if key < k1 or k2 < key: 

922 return None 

923 if "Names" in d: 

924 objs = list_value(d["Names"]) 

925 names = dict( 

926 cast(Iterator[tuple[str | bytes, Any]], choplist(2, objs)), 

927 ) 

928 return names[key] 

929 if "Kids" in d: 

930 for c in list_value(d["Kids"]): 

931 v = lookup(dict_value(c)) 

932 if v: 

933 return v 

934 raise PDFKeyError((cat, key)) 

935 

936 return lookup(d0) 

937 

938 def get_dest(self, name: str | bytes) -> Any: 

939 try: 

940 # PDF-1.2 or later 

941 obj = self.lookup_name("Dests", name) 

942 except KeyError: 

943 # PDF-1.1 or prior 

944 if "Dests" not in self.catalog: 

945 raise PDFDestinationNotFound(name) from None 

946 d0 = dict_value(self.catalog["Dests"]) 

947 if name not in d0: 

948 raise PDFDestinationNotFound(name) from None 

949 obj = d0[name] 

950 return obj 

951 

952 # find_xref 

953 def find_xref(self, parser: PDFParser) -> int: 

954 """Internal function used to locate the first XRef.""" 

955 # search the last xref table by scanning the file backwards. 

956 prev = b"" 

957 for line in parser.revreadlines(): 

958 line = line.strip() 

959 log.debug("find_xref: %r", line) 

960 

961 if line == b"startxref": 

962 log.debug("xref found: pos=%r", prev) 

963 

964 if not prev.isdigit(): 

965 raise PDFNoValidXRef(f"Invalid xref position, no digit: {prev!r}") 

966 

967 start = int(prev) 

968 

969 if not start >= 0: 

970 raise PDFNoValidXRef(f"Invalid xref position, negative: {start}") 

971 

972 # The xref start needs to fit in a C ssize_t to be a proper file offset 

973 if start >= 2**31: 

974 raise PDFNoValidXRef(f"Invalid xref position, too large: {start!r}") 

975 

976 return start 

977 

978 if line: 

979 prev = line 

980 

981 raise PDFNoValidXRef("Unexpected EOF") 

982 

983 # read xref table 

984 def read_xref_from( 

985 self, 

986 parser: PDFParser, 

987 start: int, 

988 xrefs: list[PDFBaseXRef], 

989 ) -> None: 

990 """Reads XRefs from the given location.""" 

991 parser.seek(start) 

992 parser.reset() 

993 try: 

994 (pos, token) = parser.nexttoken() 

995 except PSEOF as err: 

996 raise PDFNoValidXRef("Unexpected EOF") from err 

997 log.debug("read_xref_from: start=%d, token=%r", start, token) 

998 if isinstance(token, int): 

999 # XRefStream: PDF-1.5 

1000 parser.seek(pos) 

1001 parser.reset() 

1002 xref: PDFBaseXRef = PDFXRefStream() 

1003 xref.load(parser) 

1004 else: 

1005 if token is parser.KEYWORD_XREF: 

1006 parser.nextline() 

1007 xref = PDFXRef() 

1008 xref.load(parser) 

1009 xrefs.append(xref) 

1010 trailer = xref.get_trailer() 

1011 log.debug("trailer: %r", trailer) 

1012 if "XRefStm" in trailer: 

1013 pos = int_value(trailer["XRefStm"]) 

1014 self.read_xref_from(parser, pos, xrefs) 

1015 if "Prev" in trailer: 

1016 # find previous xref 

1017 pos = int_value(trailer["Prev"]) 

1018 self.read_xref_from(parser, pos, xrefs) 

1019 

1020 

1021class PageLabels(NumberTree): 

1022 """PageLabels from the document catalog. 

1023 

1024 See Section 8.3.1 in the PDF Reference. 

1025 """ 

1026 

1027 @property 

1028 def labels(self) -> Iterator[str]: 

1029 ranges = self.values 

1030 

1031 # The tree must begin with page index 0 

1032 if len(ranges) == 0 or ranges[0][0] != 0: 

1033 if settings.STRICT: 

1034 raise PDFSyntaxError("PageLabels is missing page index 0") 

1035 else: 

1036 # Try to cope, by assuming empty labels for the initial pages 

1037 ranges.insert(0, (0, {})) 

1038 

1039 for next, (start, label_dict_unchecked) in enumerate(ranges, 1): 

1040 label_dict = dict_value(label_dict_unchecked) 

1041 style = label_dict.get("S") 

1042 prefix = decode_text(str_value(label_dict.get("P", b""))) 

1043 first_value = int_value(label_dict.get("St", 1)) 

1044 

1045 if next == len(ranges): 

1046 # This is the last specified range. It continues until the end 

1047 # of the document. 

1048 values: Iterable[int] = itertools.count(first_value) 

1049 else: 

1050 end, _ = ranges[next] 

1051 range_length = end - start 

1052 values = range(first_value, first_value + range_length) 

1053 

1054 for value in values: 

1055 label = self._format_page_label(value, style) 

1056 yield prefix + label 

1057 

1058 @staticmethod 

1059 def _format_page_label(value: int, style: Any) -> str: 

1060 """Format page label value in a specific style""" 

1061 if style is None: 

1062 label = "" 

1063 elif style is LIT("D"): # Decimal arabic numerals 

1064 label = str(value) 

1065 elif style is LIT("R"): # Uppercase roman numerals 

1066 label = format_int_roman(value).upper() 

1067 elif style is LIT("r"): # Lowercase roman numerals 

1068 label = format_int_roman(value) 

1069 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ... 

1070 label = format_int_alpha(value).upper() 

1071 elif style is LIT("a"): # Lowercase letters a-z, aa-zz... 

1072 label = format_int_alpha(value) 

1073 else: 

1074 log.warning("Unknown page label style: %r", style) 

1075 label = "" 

1076 return label