Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 84%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

718 statements  

1import itertools 

2import logging 

3import re 

4import struct 

5from collections.abc import Callable, Iterable, Iterator, KeysView, Sequence 

6from hashlib import md5, sha256, sha384, sha512 

7from typing import ( 

8 Any, 

9 ClassVar, 

10 cast, 

11) 

12 

13from cryptography.hazmat.backends import default_backend 

14from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

15 

16from pdfminer import settings 

17from pdfminer.arcfour import Arcfour 

18from pdfminer.casting import safe_int 

19from pdfminer.data_structures import NumberTree 

20from pdfminer.pdfexceptions import ( 

21 PDFException, 

22 PDFKeyError, 

23 PDFObjectNotFound, 

24 PDFTypeError, 

25) 

26from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError 

27from pdfminer.pdftypes import ( 

28 DecipherCallable, 

29 PDFStream, 

30 decipher_all, 

31 dict_value, 

32 int_value, 

33 list_value, 

34 str_value, 

35 stream_value, 

36 uint_value, 

37) 

38from pdfminer.psexceptions import PSEOF 

39from pdfminer.psparser import KWD, LIT, literal_name 

40from pdfminer.utils import ( 

41 choplist, 

42 decode_text, 

43 format_int_alpha, 

44 format_int_roman, 

45 nunpack, 

46 unpad_aes, 

47) 

48 

49log = logging.getLogger(__name__) 

50 

51 

52class PDFNoValidXRef(PDFSyntaxError): 

53 pass 

54 

55 

56class PDFNoValidXRefWarning(SyntaxWarning): 

57 """Legacy warning for missing xref. 

58 

59 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

60 """ 

61 

62 

63class PDFNoOutlines(PDFException): 

64 pass 

65 

66 

67class PDFNoPageLabels(PDFException): 

68 pass 

69 

70 

71class PDFDestinationNotFound(PDFException): 

72 pass 

73 

74 

75class PDFEncryptionError(PDFException): 

76 pass 

77 

78 

79class PDFPasswordIncorrect(PDFEncryptionError): 

80 pass 

81 

82 

83class PDFEncryptionWarning(UserWarning): 

84 """Legacy warning for failed decryption. 

85 

86 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

87 """ 

88 

89 

90class PDFTextExtractionNotAllowedWarning(UserWarning): 

91 """Legacy warning for PDF that does not allow extraction. 

92 

93 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

94 """ 

95 

96 

97class PDFTextExtractionNotAllowed(PDFEncryptionError): 

98 pass 

99 

100 

101# some predefined literals and keywords. 

102LITERAL_OBJSTM = LIT("ObjStm") 

103LITERAL_XREF = LIT("XRef") 

104LITERAL_CATALOG = LIT("Catalog") 

105 

106 

107class PDFBaseXRef: 

108 def get_trailer(self) -> dict[str, Any]: 

109 raise NotImplementedError 

110 

111 def get_objids(self) -> Iterable[int]: 

112 return [] 

113 

114 # Must return 

115 # (strmid, index, genno) 

116 # or (None, pos, genno) 

117 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

118 raise PDFKeyError(objid) 

119 

120 def load(self, parser: PDFParser) -> None: 

121 raise NotImplementedError 

122 

123 

124class PDFXRef(PDFBaseXRef): 

125 def __init__(self) -> None: 

126 self.offsets: dict[int, tuple[int | None, int, int]] = {} 

127 self.trailer: dict[str, Any] = {} 

128 

129 def __repr__(self) -> str: 

130 return f"<PDFXRef: offsets={self.offsets.keys()!r}>" 

131 

132 def load(self, parser: PDFParser) -> None: 

133 while True: 

134 try: 

135 (pos, line) = parser.nextline() 

136 line = line.strip() 

137 if not line: 

138 continue 

139 except PSEOF as err: 

140 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err 

141 if line.startswith(b"trailer"): 

142 parser.seek(pos) 

143 break 

144 f = line.split(b" ") 

145 if len(f) != 2: 

146 error_msg = f"Trailer not found: {parser!r}: line={line!r}" 

147 raise PDFNoValidXRef(error_msg) 

148 try: 

149 (start, nobjs) = map(int, f) 

150 except ValueError as err: 

151 error_msg = f"Invalid line: {parser!r}: line={line!r}" 

152 raise PDFNoValidXRef(error_msg) from err 

153 for objid in range(start, start + nobjs): 

154 try: 

155 (_, line) = parser.nextline() 

156 line = line.strip() 

157 except PSEOF as err: 

158 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") from err 

159 f = line.split(b" ") 

160 if len(f) != 3: 

161 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}" 

162 raise PDFNoValidXRef(error_msg) 

163 (pos_b, genno_b, use_b) = f 

164 if use_b != b"n": 

165 continue 

166 

167 pos_i = safe_int(pos_b) 

168 genno_i = safe_int(genno_b) 

169 if pos_i is not None and genno_i is not None: 

170 self.offsets[objid] = (None, pos_i, genno_i) 

171 else: 

172 log.warning( 

173 f"Not adding object {objid} to xref because position {pos_b!r} " 

174 f"or generation number {genno_b!r} cannot be parsed as an int" 

175 ) 

176 

177 log.debug("xref objects: %r", self.offsets) 

178 self.load_trailer(parser) 

179 

180 def load_trailer(self, parser: PDFParser) -> None: 

181 try: 

182 (_, kwd) = parser.nexttoken() 

183 assert kwd is KWD(b"trailer"), str(kwd) 

184 (_, dic) = parser.nextobject() 

185 except PSEOF: 

186 x = parser.pop(1) 

187 if not x: 

188 raise PDFNoValidXRef("Unexpected EOF - file corrupted") from None 

189 (_, dic) = x[0] 

190 self.trailer.update(dict_value(dic)) 

191 log.debug("trailer=%r", self.trailer) 

192 

193 def get_trailer(self) -> dict[str, Any]: 

194 return self.trailer 

195 

196 def get_objids(self) -> KeysView[int]: 

197 return self.offsets.keys() 

198 

199 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

200 return self.offsets[objid] 

201 

202 

203class PDFXRefFallback(PDFXRef): 

204 def __repr__(self) -> str: 

205 return f"<PDFXRefFallback: offsets={self.offsets.keys()!r}>" 

206 

207 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b") 

208 

209 def load(self, parser: PDFParser) -> None: 

210 parser.seek(0) 

211 while 1: 

212 try: 

213 (pos, line_bytes) = parser.nextline() 

214 except PSEOF: 

215 break 

216 if line_bytes.startswith(b"trailer"): 

217 parser.seek(pos) 

218 self.load_trailer(parser) 

219 log.debug("trailer: %r", self.trailer) 

220 break 

221 line = line_bytes.decode("latin-1") # default pdf encoding 

222 m = self.PDFOBJ_CUE.match(line) 

223 if not m: 

224 continue 

225 (objid_s, genno_s) = m.groups() 

226 objid = int(objid_s) 

227 genno = int(genno_s) 

228 self.offsets[objid] = (None, pos, genno) 

229 # expand ObjStm. 

230 parser.seek(pos) 

231 (_, obj) = parser.nextobject() 

232 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM: 

233 stream = stream_value(obj) 

234 try: 

235 n = stream["N"] 

236 except KeyError: 

237 if settings.STRICT: 

238 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None 

239 n = 0 

240 parser1 = PDFStreamParser(stream.get_data()) 

241 objs: list[int] = [] 

242 try: 

243 while 1: 

244 (_, obj) = parser1.nextobject() 

245 objs.append(cast(int, obj)) 

246 except PSEOF: 

247 pass 

248 n = min(n, len(objs) // 2) 

249 for index in range(n): 

250 objid1 = objs[index * 2] 

251 self.offsets[objid1] = (objid, index, 0) 

252 

253 

254class PDFXRefStream(PDFBaseXRef): 

255 def __init__(self) -> None: 

256 self.data: bytes | None = None 

257 self.entlen: int | None = None 

258 self.fl1: int | None = None 

259 self.fl2: int | None = None 

260 self.fl3: int | None = None 

261 self.ranges: list[tuple[int, int]] = [] 

262 

263 def __repr__(self) -> str: 

264 return f"<PDFXRefStream: ranges={self.ranges!r}>" 

265 

266 def load(self, parser: PDFParser) -> None: 

267 (_, _objid) = parser.nexttoken() # ignored 

268 (_, _genno) = parser.nexttoken() # ignored 

269 (_, _kwd) = parser.nexttoken() 

270 (_, stream) = parser.nextobject() 

271 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF: 

272 raise PDFNoValidXRef("Invalid PDF stream spec.") 

273 size = stream["Size"] 

274 index_array = stream.get("Index", (0, size)) 

275 if len(index_array) % 2 != 0: 

276 raise PDFSyntaxError("Invalid index number") 

277 self.ranges.extend(cast(Iterator[tuple[int, int]], choplist(2, index_array))) 

278 (self.fl1, self.fl2, self.fl3) = stream["W"] 

279 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

280 self.data = stream.get_data() 

281 self.entlen = self.fl1 + self.fl2 + self.fl3 

282 self.trailer = stream.attrs 

283 log.debug( 

284 "xref stream: objid=%s, fields=%d,%d,%d", 

285 ", ".join(map(repr, self.ranges)), 

286 self.fl1, 

287 self.fl2, 

288 self.fl3, 

289 ) 

290 

291 def get_trailer(self) -> dict[str, Any]: 

292 return self.trailer 

293 

294 def get_objids(self) -> Iterator[int]: 

295 for start, nobjs in self.ranges: 

296 for i in range(nobjs): 

297 assert self.entlen is not None 

298 assert self.data is not None 

299 offset = self.entlen * i 

300 ent = self.data[offset : offset + self.entlen] 

301 f1 = nunpack(ent[: self.fl1], 1) 

302 if f1 == 1 or f1 == 2: 

303 yield start + i 

304 

305 def get_pos(self, objid: int) -> tuple[int | None, int, int]: 

306 index = 0 

307 for start, nobjs in self.ranges: 

308 if start <= objid and objid < start + nobjs: 

309 index += objid - start 

310 break 

311 else: 

312 index += nobjs 

313 else: 

314 raise PDFKeyError(objid) 

315 assert self.entlen is not None 

316 assert self.data is not None 

317 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

318 offset = self.entlen * index 

319 ent = self.data[offset : offset + self.entlen] 

320 f1 = nunpack(ent[: self.fl1], 1) 

321 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2]) 

322 f3 = nunpack(ent[self.fl1 + self.fl2 :]) 

323 if f1 == 1: 

324 return (None, f2, f3) 

325 elif f1 == 2: 

326 return (f2, f3, 0) 

327 else: 

328 # this is a free object 

329 raise PDFKeyError(objid) 

330 

331 

332class PDFStandardSecurityHandler: 

333 PASSWORD_PADDING = ( 

334 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz" 

335 ) 

336 supported_revisions: tuple[int, ...] = (2, 3) 

337 

338 def __init__( 

339 self, 

340 docid: Sequence[bytes], 

341 param: dict[str, Any], 

342 password: str = "", 

343 ) -> None: 

344 self.docid = docid 

345 self.param = param 

346 self.password = password 

347 self.init() 

348 

349 def init(self) -> None: 

350 self.init_params() 

351 if self.r not in self.supported_revisions: 

352 error_msg = f"Unsupported revision: param={self.param!r}" 

353 raise PDFEncryptionError(error_msg) 

354 self.init_key() 

355 

356 def init_params(self) -> None: 

357 self.v = int_value(self.param.get("V", 0)) 

358 self.r = int_value(self.param["R"]) 

359 self.p = uint_value(self.param["P"], 32) 

360 self.o = str_value(self.param["O"]) 

361 self.u = str_value(self.param["U"]) 

362 self.length = int_value(self.param.get("Length", 40)) 

363 

364 def init_key(self) -> None: 

365 self.key = self.authenticate(self.password) 

366 if self.key is None: 

367 raise PDFPasswordIncorrect 

368 

369 def is_printable(self) -> bool: 

370 return bool(self.p & 4) 

371 

372 def is_modifiable(self) -> bool: 

373 return bool(self.p & 8) 

374 

375 def is_extractable(self) -> bool: 

376 return bool(self.p & 16) 

377 

378 def compute_u(self, key: bytes) -> bytes: 

379 if self.r == 2: 

380 # Algorithm 3.4 

381 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2 

382 else: 

383 # Algorithm 3.5 

384 hash = md5(self.PASSWORD_PADDING) # 2 

385 hash.update(self.docid[0]) # 3 

386 result = Arcfour(key).encrypt(hash.digest()) # 4 

387 for i in range(1, 20): # 5 

388 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

389 result = Arcfour(k).encrypt(result) 

390 result += result # 6 

391 return result 

392 

393 def compute_encryption_key(self, password: bytes) -> bytes: 

394 # Algorithm 3.2 

395 password = (password + self.PASSWORD_PADDING)[:32] # 1 

396 hash = md5(password) # 2 

397 hash.update(self.o) # 3 

398 # See https://github.com/pdfminer/pdfminer.six/issues/186 

399 hash.update(struct.pack("<L", self.p)) # 4 

400 hash.update(self.docid[0]) # 5 

401 if ( 

402 self.r >= 4 

403 and not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata 

404 ): 

405 hash.update(b"\xff\xff\xff\xff") 

406 result = hash.digest() 

407 n = 5 

408 if self.r >= 3: 

409 n = self.length // 8 

410 for _ in range(50): 

411 result = md5(result[:n]).digest() 

412 return result[:n] 

413 

414 def authenticate(self, password: str) -> bytes | None: 

415 password_bytes = password.encode("latin1") 

416 key = self.authenticate_user_password(password_bytes) 

417 if key is None: 

418 key = self.authenticate_owner_password(password_bytes) 

419 return key 

420 

421 def authenticate_user_password(self, password: bytes) -> bytes | None: 

422 key = self.compute_encryption_key(password) 

423 if self.verify_encryption_key(key): 

424 return key 

425 else: 

426 return None 

427 

428 def verify_encryption_key(self, key: bytes) -> bool: 

429 # Algorithm 3.6 

430 u = self.compute_u(key) 

431 if self.r == 2: 

432 return u == self.u 

433 return u[:16] == self.u[:16] 

434 

435 def authenticate_owner_password(self, password: bytes) -> bytes | None: 

436 # Algorithm 3.7 

437 password = (password + self.PASSWORD_PADDING)[:32] 

438 hash = md5(password) 

439 if self.r >= 3: 

440 for _ in range(50): 

441 hash = md5(hash.digest()) 

442 n = 5 

443 if self.r >= 3: 

444 n = self.length // 8 

445 key = hash.digest()[:n] 

446 if self.r == 2: 

447 user_password = Arcfour(key).decrypt(self.o) 

448 else: 

449 user_password = self.o 

450 for i in range(19, -1, -1): 

451 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

452 user_password = Arcfour(k).decrypt(user_password) 

453 return self.authenticate_user_password(user_password) 

454 

455 def decrypt( 

456 self, 

457 objid: int, 

458 genno: int, 

459 data: bytes, 

460 attrs: dict[str, Any] | None = None, 

461 ) -> bytes: 

462 return self.decrypt_rc4(objid, genno, data) 

463 

464 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes: 

465 assert self.key is not None 

466 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2] 

467 hash = md5(key) 

468 key = hash.digest()[: min(len(key), 16)] 

469 return Arcfour(key).decrypt(data) 

470 

471 

472class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler): 

473 supported_revisions: tuple[int, ...] = (4,) 

474 

475 def init_params(self) -> None: 

476 super().init_params() 

477 self.length = 128 

478 self.cf = dict_value(self.param.get("CF")) 

479 self.stmf = literal_name(self.param["StmF"]) 

480 self.strf = literal_name(self.param["StrF"]) 

481 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True)) 

482 if self.stmf != self.strf: 

483 error_msg = f"Unsupported crypt filter: param={self.param!r}" 

484 raise PDFEncryptionError(error_msg) 

485 self.cfm = {} 

486 for k, v in self.cf.items(): 

487 f = self.get_cfm(literal_name(v["CFM"])) 

488 if f is None: 

489 error_msg = f"Unknown crypt filter method: param={self.param!r}" 

490 raise PDFEncryptionError(error_msg) 

491 self.cfm[k] = f 

492 self.cfm["Identity"] = self.decrypt_identity 

493 if self.strf not in self.cfm: 

494 error_msg = f"Undefined crypt filter: param={self.param!r}" 

495 raise PDFEncryptionError(error_msg) 

496 

497 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None: 

498 if name == "V2": 

499 return self.decrypt_rc4 

500 elif name == "AESV2": 

501 return self.decrypt_aes128 

502 else: 

503 return None 

504 

505 def decrypt( 

506 self, 

507 objid: int, 

508 genno: int, 

509 data: bytes, 

510 attrs: dict[str, Any] | None = None, 

511 name: str | None = None, 

512 ) -> bytes: 

513 if not self.encrypt_metadata and attrs is not None: 

514 t = attrs.get("Type") 

515 if t is not None and literal_name(t) == "Metadata": 

516 return data 

517 if name is None: 

518 name = self.strf 

519 return self.cfm[name](objid, genno, data) 

520 

521 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes: 

522 return data 

523 

524 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes: 

525 assert self.key is not None 

526 key = ( 

527 self.key 

528 + struct.pack("<L", objid)[:3] 

529 + struct.pack("<L", genno)[:2] 

530 + b"sAlT" 

531 ) 

532 hash = md5(key) 

533 key = hash.digest()[: min(len(key), 16)] 

534 initialization_vector = data[:16] 

535 ciphertext = data[16:] 

536 cipher = Cipher( 

537 algorithms.AES(key), 

538 modes.CBC(initialization_vector), 

539 backend=default_backend(), 

540 ) # type: ignore 

541 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

542 return unpad_aes(plaintext) 

543 

544 

545class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4): 

546 supported_revisions = (5, 6) 

547 

548 def init_params(self) -> None: 

549 super().init_params() 

550 self.length = 256 

551 self.oe = str_value(self.param["OE"]) 

552 self.ue = str_value(self.param["UE"]) 

553 self.o_hash = self.o[:32] 

554 self.o_validation_salt = self.o[32:40] 

555 self.o_key_salt = self.o[40:] 

556 self.u_hash = self.u[:32] 

557 self.u_validation_salt = self.u[32:40] 

558 self.u_key_salt = self.u[40:] 

559 

560 def get_cfm(self, name: str) -> Callable[[int, int, bytes], bytes] | None: 

561 if name == "AESV3": 

562 return self.decrypt_aes256 

563 else: 

564 return None 

565 

566 def authenticate(self, password: str) -> bytes | None: 

567 password_b = self._normalize_password(password) 

568 hash = self._password_hash(password_b, self.o_validation_salt, self.u) 

569 if hash == self.o_hash: 

570 hash = self._password_hash(password_b, self.o_key_salt, self.u) 

571 cipher = Cipher( 

572 algorithms.AES(hash), 

573 modes.CBC(b"\0" * 16), 

574 backend=default_backend(), 

575 ) # type: ignore 

576 return cipher.decryptor().update(self.oe) # type: ignore 

577 hash = self._password_hash(password_b, self.u_validation_salt) 

578 if hash == self.u_hash: 

579 hash = self._password_hash(password_b, self.u_key_salt) 

580 cipher = Cipher( 

581 algorithms.AES(hash), 

582 modes.CBC(b"\0" * 16), 

583 backend=default_backend(), 

584 ) # type: ignore 

585 return cipher.decryptor().update(self.ue) # type: ignore 

586 return None 

587 

588 def _normalize_password(self, password: str) -> bytes: 

589 if self.r == 6: 

590 # saslprep expects non-empty strings, apparently 

591 if not password: 

592 return b"" 

593 from pdfminer._saslprep import saslprep 

594 

595 password = saslprep(password) 

596 return password.encode("utf-8")[:127] 

597 

598 def _password_hash( 

599 self, 

600 password: bytes, 

601 salt: bytes, 

602 vector: bytes | None = None, 

603 ) -> bytes: 

604 """Compute password hash depending on revision number""" 

605 if self.r == 5: 

606 return self._r5_password(password, salt, vector) 

607 return self._r6_password(password, salt[0:8], vector) 

608 

609 def _r5_password( 

610 self, 

611 password: bytes, 

612 salt: bytes, 

613 vector: bytes | None = None, 

614 ) -> bytes: 

615 """Compute the password for revision 5""" 

616 hash = sha256(password) 

617 hash.update(salt) 

618 if vector is not None: 

619 hash.update(vector) 

620 return hash.digest() 

621 

622 def _r6_password( 

623 self, 

624 password: bytes, 

625 salt: bytes, 

626 vector: bytes | None = None, 

627 ) -> bytes: 

628 """Compute the password for revision 6""" 

629 initial_hash = sha256(password) 

630 initial_hash.update(salt) 

631 if vector is not None: 

632 initial_hash.update(vector) 

633 k = initial_hash.digest() 

634 hashes = (sha256, sha384, sha512) 

635 round_no = last_byte_val = 0 

636 while round_no < 64 or last_byte_val > round_no - 32: 

637 k1 = (password + k + (vector or b"")) * 64 

638 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1) 

639 # compute the first 16 bytes of e, 

640 # interpreted as an unsigned integer mod 3 

641 next_hash = hashes[self._bytes_mod_3(e[:16])] 

642 k = next_hash(e).digest() 

643 last_byte_val = e[len(e) - 1] 

644 round_no += 1 

645 return k[:32] 

646 

647 @staticmethod 

648 def _bytes_mod_3(input_bytes: bytes) -> int: 

649 # 256 is 1 mod 3, so we can just sum 'em 

650 return sum(b % 3 for b in input_bytes) % 3 

651 

652 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes: 

653 cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) 

654 encryptor = cipher.encryptor() # type: ignore 

655 return encryptor.update(data) + encryptor.finalize() # type: ignore 

656 

657 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes: 

658 initialization_vector = data[:16] 

659 ciphertext = data[16:] 

660 assert self.key is not None 

661 cipher = Cipher( 

662 algorithms.AES(self.key), 

663 modes.CBC(initialization_vector), 

664 backend=default_backend(), 

665 ) # type: ignore 

666 plaintext = cipher.decryptor().update(ciphertext) # type: ignore 

667 return unpad_aes(plaintext) 

668 

669 

670class PDFDocument: 

671 """PDFDocument object represents a PDF document. 

672 

673 Since a PDF file can be very big, normally it is not loaded at 

674 once. So PDF document has to cooperate with a PDF parser in order to 

675 dynamically import the data as processing goes. 

676 

677 Typical usage: 

678 doc = PDFDocument(parser, password) 

679 obj = doc.getobj(objid) 

680 

681 """ 

682 

683 security_handler_registry: ClassVar[dict[int, type[PDFStandardSecurityHandler]]] = { 

684 1: PDFStandardSecurityHandler, 

685 2: PDFStandardSecurityHandler, 

686 4: PDFStandardSecurityHandlerV4, 

687 5: PDFStandardSecurityHandlerV5, 

688 } 

689 

690 def __init__( 

691 self, 

692 parser: PDFParser, 

693 password: str = "", 

694 caching: bool = True, 

695 fallback: bool = True, 

696 ) -> None: 

697 """Set the document to use a given PDFParser object.""" 

698 self.caching = caching 

699 self.xrefs: list[PDFBaseXRef] = [] 

700 self.info = [] 

701 self.catalog: dict[str, Any] = {} 

702 self.encryption: tuple[Any, Any] | None = None 

703 self.decipher: DecipherCallable | None = None 

704 self._parser = None 

705 self._cached_objs: dict[int, tuple[object, int]] = {} 

706 self._parsed_objs: dict[int, tuple[list[object], int]] = {} 

707 self._parser = parser 

708 self._parser.set_document(self) 

709 self.is_printable = self.is_modifiable = self.is_extractable = True 

710 # Retrieve the information of each header that was appended 

711 # (maybe multiple times) at the end of the document. 

712 try: 

713 pos = self.find_xref(parser) 

714 self.read_xref_from(parser, pos, self.xrefs) 

715 except PDFNoValidXRef: 

716 if fallback: 

717 parser.fallback = True 

718 newxref = PDFXRefFallback() 

719 newxref.load(parser) 

720 self.xrefs.append(newxref) 

721 

722 for xref in self.xrefs: 

723 trailer = xref.get_trailer() 

724 if not trailer: 

725 continue 

726 # If there's an encryption info, remember it. 

727 if "Encrypt" in trailer: 

728 # Some documents may not have a /ID, use two empty 

729 # byte strings instead. Solves 

730 # https://github.com/pdfminer/pdfminer.six/issues/594 

731 id_value = list_value(trailer["ID"]) if "ID" in trailer else (b"", b"") 

732 self.encryption = (id_value, dict_value(trailer["Encrypt"])) 

733 self._initialize_password(password) 

734 if "Info" in trailer: 

735 self.info.append(dict_value(trailer["Info"])) 

736 if "Root" in trailer: 

737 # Every PDF file must have exactly one /Root dictionary. 

738 self.catalog = dict_value(trailer["Root"]) 

739 break 

740 else: 

741 raise PDFSyntaxError("No /Root object! - Is this really a PDF?") 

742 if self.catalog.get("Type") is not LITERAL_CATALOG and settings.STRICT: 

743 raise PDFSyntaxError("Catalog not found!") 

744 

745 KEYWORD_OBJ = KWD(b"obj") 

746 

747 # _initialize_password(password=b'') 

748 # Perform the initialization with a given password. 

749 def _initialize_password(self, password: str = "") -> None: 

750 assert self.encryption is not None 

751 (docid, param) = self.encryption 

752 if literal_name(param.get("Filter")) != "Standard": 

753 raise PDFEncryptionError(f"Unknown filter: param={param!r}") 

754 v = int_value(param.get("V", 0)) 

755 factory = self.security_handler_registry.get(v) 

756 if factory is None: 

757 raise PDFEncryptionError(f"Unknown algorithm: param={param!r}") 

758 handler = factory(docid, param, password) 

759 self.decipher = handler.decrypt 

760 self.is_printable = handler.is_printable() 

761 self.is_modifiable = handler.is_modifiable() 

762 self.is_extractable = handler.is_extractable() 

763 assert self._parser is not None 

764 self._parser.fallback = False # need to read streams with exact length 

765 

766 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object: 

767 if stream.objid in self._parsed_objs: 

768 (objs, n) = self._parsed_objs[stream.objid] 

769 else: 

770 (objs, n) = self._get_objects(stream) 

771 if self.caching: 

772 assert stream.objid is not None 

773 self._parsed_objs[stream.objid] = (objs, n) 

774 i = n * 2 + index 

775 try: 

776 obj = objs[i] 

777 except IndexError as err: 

778 raise PDFSyntaxError(f"index too big: {index!r}") from err 

779 return obj 

780 

781 def _get_objects(self, stream: PDFStream) -> tuple[list[object], int]: 

782 if stream.get("Type") is not LITERAL_OBJSTM and settings.STRICT: 

783 raise PDFSyntaxError(f"Not a stream object: {stream!r}") 

784 try: 

785 n = cast(int, stream["N"]) 

786 except KeyError: 

787 if settings.STRICT: 

788 raise PDFSyntaxError(f"N is not defined: {stream!r}") from None 

789 n = 0 

790 parser = PDFStreamParser(stream.get_data()) 

791 parser.set_document(self) 

792 objs: list[object] = [] 

793 try: 

794 while 1: 

795 (_, obj) = parser.nextobject() 

796 objs.append(obj) 

797 except PSEOF: 

798 pass 

799 return (objs, n) 

800 

801 def _getobj_parse(self, pos: int, objid: int) -> object: 

802 assert self._parser is not None 

803 self._parser.seek(pos) 

804 (_, objid1) = self._parser.nexttoken() # objid 

805 (_, _genno) = self._parser.nexttoken() # genno 

806 (_, kwd) = self._parser.nexttoken() 

807 # hack around malformed pdf files 

808 # copied from https://github.com/jaepil/pdfminer3k/blob/master/ 

809 # pdfminer/pdfparser.py#L399 

810 # to solve https://github.com/pdfminer/pdfminer.six/issues/56 

811 # assert objid1 == objid, str((objid1, objid)) 

812 if objid1 != objid: 

813 x = [] 

814 while kwd is not self.KEYWORD_OBJ: 

815 (_, kwd) = self._parser.nexttoken() 

816 x.append(kwd) 

817 if len(x) >= 2: 

818 objid1 = x[-2] 

819 # #### end hack around malformed pdf files 

820 if objid1 != objid: 

821 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}") 

822 

823 if kwd != KWD(b"obj"): 

824 raise PDFSyntaxError(f"Invalid object spec: offset={pos!r}") 

825 (_, obj) = self._parser.nextobject() 

826 return obj 

827 

828 # can raise PDFObjectNotFound 

829 def getobj(self, objid: int) -> object: 

830 """Get object from PDF 

831 

832 :raises PDFException if PDFDocument is not initialized 

833 :raises PDFObjectNotFound if objid does not exist in PDF 

834 """ 

835 if not self.xrefs: 

836 raise PDFException("PDFDocument is not initialized") 

837 log.debug("getobj: objid=%r", objid) 

838 obj: object # Initialize to satisfy mypy; always assigned in branches below 

839 genno: int 

840 if objid in self._cached_objs: 

841 (obj, genno) = self._cached_objs[objid] 

842 else: 

843 for xref in self.xrefs: 

844 try: 

845 (strmid, index, genno) = xref.get_pos(objid) 

846 except KeyError: 

847 continue 

848 try: 

849 if strmid is not None: 

850 stream = stream_value(self.getobj(strmid)) 

851 obj = self._getobj_objstm(stream, index, objid) 

852 else: 

853 obj = self._getobj_parse(index, objid) 

854 if self.decipher: 

855 obj = decipher_all(self.decipher, objid, genno, obj) 

856 

857 if isinstance(obj, PDFStream): 

858 obj.set_objid(objid, genno) 

859 break 

860 except (PSEOF, PDFSyntaxError): 

861 continue 

862 else: 

863 raise PDFObjectNotFound(objid) 

864 log.debug("register: objid=%r: %r", objid, obj) 

865 if self.caching: 

866 self._cached_objs[objid] = (obj, genno) 

867 return obj 

868 

869 OutlineType = tuple[Any, Any, Any, Any, Any] 

870 

871 def get_outlines(self) -> Iterator[OutlineType]: 

872 if "Outlines" not in self.catalog: 

873 raise PDFNoOutlines 

874 

875 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]: 

876 entry = dict_value(entry) 

877 if "Title" in entry and ("A" in entry or "Dest" in entry): 

878 title = decode_text(str_value(entry["Title"])) 

879 dest = entry.get("Dest") 

880 action = entry.get("A") 

881 se = entry.get("SE") 

882 yield (level, title, dest, action, se) 

883 if "First" in entry and "Last" in entry: 

884 yield from search(entry["First"], level + 1) 

885 if "Next" in entry: 

886 yield from search(entry["Next"], level) 

887 

888 return search(self.catalog["Outlines"], 0) 

889 

890 def get_page_labels(self) -> Iterator[str]: 

891 """Generate page label strings for the PDF document. 

892 

893 If the document includes page labels, generates strings, one per page. 

894 If not, raises PDFNoPageLabels. 

895 

896 The resulting iteration is unbounded. 

897 """ 

898 assert self.catalog is not None 

899 

900 try: 

901 page_labels = PageLabels(self.catalog["PageLabels"]) 

902 except (PDFTypeError, KeyError) as err: 

903 raise PDFNoPageLabels from err 

904 

905 return page_labels.labels 

906 

907 def lookup_name(self, cat: str, key: str | bytes) -> Any: 

908 try: 

909 names = dict_value(self.catalog["Names"]) 

910 except (PDFTypeError, KeyError) as err: 

911 raise PDFKeyError((cat, key)) from err 

912 # may raise KeyError 

913 d0 = dict_value(names[cat]) 

914 

915 def lookup(d: dict[str, Any]) -> Any: 

916 if "Limits" in d: 

917 (k1, k2) = list_value(d["Limits"]) 

918 if key < k1 or k2 < key: 

919 return None 

920 if "Names" in d: 

921 objs = list_value(d["Names"]) 

922 names = dict( 

923 cast(Iterator[tuple[str | bytes, Any]], choplist(2, objs)), 

924 ) 

925 return names[key] 

926 if "Kids" in d: 

927 for c in list_value(d["Kids"]): 

928 v = lookup(dict_value(c)) 

929 if v: 

930 return v 

931 raise PDFKeyError((cat, key)) 

932 

933 return lookup(d0) 

934 

935 def get_dest(self, name: str | bytes) -> Any: 

936 try: 

937 # PDF-1.2 or later 

938 obj = self.lookup_name("Dests", name) 

939 except KeyError: 

940 # PDF-1.1 or prior 

941 if "Dests" not in self.catalog: 

942 raise PDFDestinationNotFound(name) from None 

943 d0 = dict_value(self.catalog["Dests"]) 

944 if name not in d0: 

945 raise PDFDestinationNotFound(name) from None 

946 obj = d0[name] 

947 return obj 

948 

949 # find_xref 

950 def find_xref(self, parser: PDFParser) -> int: 

951 """Internal function used to locate the first XRef.""" 

952 # search the last xref table by scanning the file backwards. 

953 prev = b"" 

954 for line in parser.revreadlines(): 

955 line = line.strip() 

956 log.debug("find_xref: %r", line) 

957 

958 if line == b"startxref": 

959 log.debug("xref found: pos=%r", prev) 

960 

961 if not prev.isdigit(): 

962 raise PDFNoValidXRef(f"Invalid xref position: {prev!r}") 

963 

964 start = int(prev) 

965 

966 if not start >= 0: 

967 raise PDFNoValidXRef(f"Invalid negative xref position: {start}") 

968 

969 return start 

970 

971 if line: 

972 prev = line 

973 

974 raise PDFNoValidXRef("Unexpected EOF") 

975 

976 # read xref table 

977 def read_xref_from( 

978 self, 

979 parser: PDFParser, 

980 start: int, 

981 xrefs: list[PDFBaseXRef], 

982 ) -> None: 

983 """Reads XRefs from the given location.""" 

984 parser.seek(start) 

985 parser.reset() 

986 try: 

987 (pos, token) = parser.nexttoken() 

988 except PSEOF as err: 

989 raise PDFNoValidXRef("Unexpected EOF") from err 

990 log.debug("read_xref_from: start=%d, token=%r", start, token) 

991 if isinstance(token, int): 

992 # XRefStream: PDF-1.5 

993 parser.seek(pos) 

994 parser.reset() 

995 xref: PDFBaseXRef = PDFXRefStream() 

996 xref.load(parser) 

997 else: 

998 if token is parser.KEYWORD_XREF: 

999 parser.nextline() 

1000 xref = PDFXRef() 

1001 xref.load(parser) 

1002 xrefs.append(xref) 

1003 trailer = xref.get_trailer() 

1004 log.debug("trailer: %r", trailer) 

1005 if "XRefStm" in trailer: 

1006 pos = int_value(trailer["XRefStm"]) 

1007 self.read_xref_from(parser, pos, xrefs) 

1008 if "Prev" in trailer: 

1009 # find previous xref 

1010 pos = int_value(trailer["Prev"]) 

1011 self.read_xref_from(parser, pos, xrefs) 

1012 

1013 

1014class PageLabels(NumberTree): 

1015 """PageLabels from the document catalog. 

1016 

1017 See Section 8.3.1 in the PDF Reference. 

1018 """ 

1019 

1020 @property 

1021 def labels(self) -> Iterator[str]: 

1022 ranges = self.values 

1023 

1024 # The tree must begin with page index 0 

1025 if len(ranges) == 0 or ranges[0][0] != 0: 

1026 if settings.STRICT: 

1027 raise PDFSyntaxError("PageLabels is missing page index 0") 

1028 else: 

1029 # Try to cope, by assuming empty labels for the initial pages 

1030 ranges.insert(0, (0, {})) 

1031 

1032 for next, (start, label_dict_unchecked) in enumerate(ranges, 1): 

1033 label_dict = dict_value(label_dict_unchecked) 

1034 style = label_dict.get("S") 

1035 prefix = decode_text(str_value(label_dict.get("P", b""))) 

1036 first_value = int_value(label_dict.get("St", 1)) 

1037 

1038 if next == len(ranges): 

1039 # This is the last specified range. It continues until the end 

1040 # of the document. 

1041 values: Iterable[int] = itertools.count(first_value) 

1042 else: 

1043 end, _ = ranges[next] 

1044 range_length = end - start 

1045 values = range(first_value, first_value + range_length) 

1046 

1047 for value in values: 

1048 label = self._format_page_label(value, style) 

1049 yield prefix + label 

1050 

1051 @staticmethod 

1052 def _format_page_label(value: int, style: Any) -> str: 

1053 """Format page label value in a specific style""" 

1054 if style is None: 

1055 label = "" 

1056 elif style is LIT("D"): # Decimal arabic numerals 

1057 label = str(value) 

1058 elif style is LIT("R"): # Uppercase roman numerals 

1059 label = format_int_roman(value).upper() 

1060 elif style is LIT("r"): # Lowercase roman numerals 

1061 label = format_int_roman(value) 

1062 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ... 

1063 label = format_int_alpha(value).upper() 

1064 elif style is LIT("a"): # Lowercase letters a-z, aa-zz... 

1065 label = format_int_alpha(value) 

1066 else: 

1067 log.warning("Unknown page label style: %r", style) 

1068 label = "" 

1069 return label