Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/pdfdocument.py: 80%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

712 statements  

1import itertools 

2import logging 

3import re 

4import struct 

5from hashlib import md5, sha256, sha384, sha512 

6from typing import ( 

7 Any, 

8 Callable, 

9 Dict, 

10 Iterable, 

11 Iterator, 

12 KeysView, 

13 List, 

14 Optional, 

15 Sequence, 

16 Tuple, 

17 Type, 

18 Union, 

19 cast, 

20) 

21 

22from cryptography.hazmat.backends import default_backend 

23from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes 

24 

25from pdfminer import settings 

26from pdfminer.arcfour import Arcfour 

27from pdfminer.data_structures import NumberTree 

28from pdfminer.pdfexceptions import ( 

29 PDFException, 

30 PDFKeyError, 

31 PDFObjectNotFound, 

32 PDFTypeError, 

33) 

34from pdfminer.pdfparser import PDFParser, PDFStreamParser, PDFSyntaxError 

35from pdfminer.pdftypes import ( 

36 DecipherCallable, 

37 PDFStream, 

38 decipher_all, 

39 dict_value, 

40 int_value, 

41 list_value, 

42 str_value, 

43 stream_value, 

44 uint_value, 

45) 

46from pdfminer.psexceptions import PSEOF 

47from pdfminer.psparser import KWD, LIT, literal_name 

48from pdfminer.utils import ( 

49 choplist, 

50 decode_text, 

51 format_int_alpha, 

52 format_int_roman, 

53 nunpack, 

54) 

55 

56log = logging.getLogger(__name__) 

57 

58 

59class PDFNoValidXRef(PDFSyntaxError): 

60 pass 

61 

62 

63class PDFNoValidXRefWarning(SyntaxWarning): 

64 """Legacy warning for missing xref. 

65 

66 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

67 """ 

68 

69 

70class PDFNoOutlines(PDFException): 

71 pass 

72 

73 

74class PDFNoPageLabels(PDFException): 

75 pass 

76 

77 

78class PDFDestinationNotFound(PDFException): 

79 pass 

80 

81 

82class PDFEncryptionError(PDFException): 

83 pass 

84 

85 

86class PDFPasswordIncorrect(PDFEncryptionError): 

87 pass 

88 

89 

90class PDFEncryptionWarning(UserWarning): 

91 """Legacy warning for failed decryption. 

92 

93 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

94 """ 

95 

96 

97class PDFTextExtractionNotAllowedWarning(UserWarning): 

98 """Legacy warning for PDF that does not allow extraction. 

99 

100 Not used anymore because warnings.warn is replaced by logger.Logger.warn. 

101 """ 

102 

103 

104class PDFTextExtractionNotAllowed(PDFEncryptionError): 

105 pass 

106 

107 

108# some predefined literals and keywords. 

109LITERAL_OBJSTM = LIT("ObjStm") 

110LITERAL_XREF = LIT("XRef") 

111LITERAL_CATALOG = LIT("Catalog") 

112 

113 

114class PDFBaseXRef: 

115 def get_trailer(self) -> Dict[str, Any]: 

116 raise NotImplementedError 

117 

118 def get_objids(self) -> Iterable[int]: 

119 return [] 

120 

121 # Must return 

122 # (strmid, index, genno) 

123 # or (None, pos, genno) 

124 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: 

125 raise PDFKeyError(objid) 

126 

127 def load(self, parser: PDFParser) -> None: 

128 raise NotImplementedError 

129 

130 

131class PDFXRef(PDFBaseXRef): 

132 def __init__(self) -> None: 

133 self.offsets: Dict[int, Tuple[Optional[int], int, int]] = {} 

134 self.trailer: Dict[str, Any] = {} 

135 

136 def __repr__(self) -> str: 

137 return "<PDFXRef: offsets=%r>" % (self.offsets.keys()) 

138 

139 def load(self, parser: PDFParser) -> None: 

140 while True: 

141 try: 

142 (pos, line) = parser.nextline() 

143 line = line.strip() 

144 if not line: 

145 continue 

146 except PSEOF: 

147 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") 

148 if line.startswith(b"trailer"): 

149 parser.seek(pos) 

150 break 

151 f = line.split(b" ") 

152 if len(f) != 2: 

153 error_msg = f"Trailer not found: {parser!r}: line={line!r}" 

154 raise PDFNoValidXRef(error_msg) 

155 try: 

156 (start, nobjs) = map(int, f) 

157 except ValueError: 

158 error_msg = f"Invalid line: {parser!r}: line={line!r}" 

159 raise PDFNoValidXRef(error_msg) 

160 for objid in range(start, start + nobjs): 

161 try: 

162 (_, line) = parser.nextline() 

163 line = line.strip() 

164 except PSEOF: 

165 raise PDFNoValidXRef("Unexpected EOF - file corrupted?") 

166 f = line.split(b" ") 

167 if len(f) != 3: 

168 error_msg = f"Invalid XRef format: {parser!r}, line={line!r}" 

169 raise PDFNoValidXRef(error_msg) 

170 (pos_b, genno_b, use_b) = f 

171 if use_b != b"n": 

172 continue 

173 self.offsets[objid] = (None, int(pos_b), int(genno_b)) 

174 log.debug("xref objects: %r", self.offsets) 

175 self.load_trailer(parser) 

176 

177 def load_trailer(self, parser: PDFParser) -> None: 

178 try: 

179 (_, kwd) = parser.nexttoken() 

180 assert kwd is KWD(b"trailer"), str(kwd) 

181 (_, dic) = parser.nextobject() 

182 except PSEOF: 

183 x = parser.pop(1) 

184 if not x: 

185 raise PDFNoValidXRef("Unexpected EOF - file corrupted") 

186 (_, dic) = x[0] 

187 self.trailer.update(dict_value(dic)) 

188 log.debug("trailer=%r", self.trailer) 

189 

190 def get_trailer(self) -> Dict[str, Any]: 

191 return self.trailer 

192 

193 def get_objids(self) -> KeysView[int]: 

194 return self.offsets.keys() 

195 

196 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: 

197 return self.offsets[objid] 

198 

199 

200class PDFXRefFallback(PDFXRef): 

201 def __repr__(self) -> str: 

202 return "<PDFXRefFallback: offsets=%r>" % (self.offsets.keys()) 

203 

204 PDFOBJ_CUE = re.compile(r"^(\d+)\s+(\d+)\s+obj\b") 

205 

206 def load(self, parser: PDFParser) -> None: 

207 parser.seek(0) 

208 while 1: 

209 try: 

210 (pos, line_bytes) = parser.nextline() 

211 except PSEOF: 

212 break 

213 if line_bytes.startswith(b"trailer"): 

214 parser.seek(pos) 

215 self.load_trailer(parser) 

216 log.debug("trailer: %r", self.trailer) 

217 break 

218 line = line_bytes.decode("latin-1") # default pdf encoding 

219 m = self.PDFOBJ_CUE.match(line) 

220 if not m: 

221 continue 

222 (objid_s, genno_s) = m.groups() 

223 objid = int(objid_s) 

224 genno = int(genno_s) 

225 self.offsets[objid] = (None, pos, genno) 

226 # expand ObjStm. 

227 parser.seek(pos) 

228 (_, obj) = parser.nextobject() 

229 if isinstance(obj, PDFStream) and obj.get("Type") is LITERAL_OBJSTM: 

230 stream = stream_value(obj) 

231 try: 

232 n = stream["N"] 

233 except KeyError: 

234 if settings.STRICT: 

235 raise PDFSyntaxError("N is not defined: %r" % stream) 

236 n = 0 

237 parser1 = PDFStreamParser(stream.get_data()) 

238 objs: List[int] = [] 

239 try: 

240 while 1: 

241 (_, obj) = parser1.nextobject() 

242 objs.append(cast(int, obj)) 

243 except PSEOF: 

244 pass 

245 n = min(n, len(objs) // 2) 

246 for index in range(n): 

247 objid1 = objs[index * 2] 

248 self.offsets[objid1] = (objid, index, 0) 

249 

250 

251class PDFXRefStream(PDFBaseXRef): 

252 def __init__(self) -> None: 

253 self.data: Optional[bytes] = None 

254 self.entlen: Optional[int] = None 

255 self.fl1: Optional[int] = None 

256 self.fl2: Optional[int] = None 

257 self.fl3: Optional[int] = None 

258 self.ranges: List[Tuple[int, int]] = [] 

259 

260 def __repr__(self) -> str: 

261 return "<PDFXRefStream: ranges=%r>" % (self.ranges) 

262 

263 def load(self, parser: PDFParser) -> None: 

264 (_, objid) = parser.nexttoken() # ignored 

265 (_, genno) = parser.nexttoken() # ignored 

266 (_, kwd) = parser.nexttoken() 

267 (_, stream) = parser.nextobject() 

268 if not isinstance(stream, PDFStream) or stream.get("Type") is not LITERAL_XREF: 

269 raise PDFNoValidXRef("Invalid PDF stream spec.") 

270 size = stream["Size"] 

271 index_array = stream.get("Index", (0, size)) 

272 if len(index_array) % 2 != 0: 

273 raise PDFSyntaxError("Invalid index number") 

274 self.ranges.extend(cast(Iterator[Tuple[int, int]], choplist(2, index_array))) 

275 (self.fl1, self.fl2, self.fl3) = stream["W"] 

276 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

277 self.data = stream.get_data() 

278 self.entlen = self.fl1 + self.fl2 + self.fl3 

279 self.trailer = stream.attrs 

280 log.debug( 

281 "xref stream: objid=%s, fields=%d,%d,%d", 

282 ", ".join(map(repr, self.ranges)), 

283 self.fl1, 

284 self.fl2, 

285 self.fl3, 

286 ) 

287 

288 def get_trailer(self) -> Dict[str, Any]: 

289 return self.trailer 

290 

291 def get_objids(self) -> Iterator[int]: 

292 for start, nobjs in self.ranges: 

293 for i in range(nobjs): 

294 assert self.entlen is not None 

295 assert self.data is not None 

296 offset = self.entlen * i 

297 ent = self.data[offset : offset + self.entlen] 

298 f1 = nunpack(ent[: self.fl1], 1) 

299 if f1 == 1 or f1 == 2: 

300 yield start + i 

301 

302 def get_pos(self, objid: int) -> Tuple[Optional[int], int, int]: 

303 index = 0 

304 for start, nobjs in self.ranges: 

305 if start <= objid and objid < start + nobjs: 

306 index += objid - start 

307 break 

308 else: 

309 index += nobjs 

310 else: 

311 raise PDFKeyError(objid) 

312 assert self.entlen is not None 

313 assert self.data is not None 

314 assert self.fl1 is not None and self.fl2 is not None and self.fl3 is not None 

315 offset = self.entlen * index 

316 ent = self.data[offset : offset + self.entlen] 

317 f1 = nunpack(ent[: self.fl1], 1) 

318 f2 = nunpack(ent[self.fl1 : self.fl1 + self.fl2]) 

319 f3 = nunpack(ent[self.fl1 + self.fl2 :]) 

320 if f1 == 1: 

321 return (None, f2, f3) 

322 elif f1 == 2: 

323 return (f2, f3, 0) 

324 else: 

325 # this is a free object 

326 raise PDFKeyError(objid) 

327 

328 

329class PDFStandardSecurityHandler: 

330 PASSWORD_PADDING = ( 

331 b"(\xbfN^Nu\x8aAd\x00NV\xff\xfa\x01\x08" 

332 b"..\x00\xb6\xd0h>\x80/\x0c\xa9\xfedSiz" 

333 ) 

334 supported_revisions: Tuple[int, ...] = (2, 3) 

335 

336 def __init__( 

337 self, 

338 docid: Sequence[bytes], 

339 param: Dict[str, Any], 

340 password: str = "", 

341 ) -> None: 

342 self.docid = docid 

343 self.param = param 

344 self.password = password 

345 self.init() 

346 

347 def init(self) -> None: 

348 self.init_params() 

349 if self.r not in self.supported_revisions: 

350 error_msg = "Unsupported revision: param=%r" % self.param 

351 raise PDFEncryptionError(error_msg) 

352 self.init_key() 

353 

354 def init_params(self) -> None: 

355 self.v = int_value(self.param.get("V", 0)) 

356 self.r = int_value(self.param["R"]) 

357 self.p = uint_value(self.param["P"], 32) 

358 self.o = str_value(self.param["O"]) 

359 self.u = str_value(self.param["U"]) 

360 self.length = int_value(self.param.get("Length", 40)) 

361 

362 def init_key(self) -> None: 

363 self.key = self.authenticate(self.password) 

364 if self.key is None: 

365 raise PDFPasswordIncorrect 

366 

367 def is_printable(self) -> bool: 

368 return bool(self.p & 4) 

369 

370 def is_modifiable(self) -> bool: 

371 return bool(self.p & 8) 

372 

373 def is_extractable(self) -> bool: 

374 return bool(self.p & 16) 

375 

376 def compute_u(self, key: bytes) -> bytes: 

377 if self.r == 2: 

378 # Algorithm 3.4 

379 return Arcfour(key).encrypt(self.PASSWORD_PADDING) # 2 

380 else: 

381 # Algorithm 3.5 

382 hash = md5(self.PASSWORD_PADDING) # 2 

383 hash.update(self.docid[0]) # 3 

384 result = Arcfour(key).encrypt(hash.digest()) # 4 

385 for i in range(1, 20): # 5 

386 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

387 result = Arcfour(k).encrypt(result) 

388 result += result # 6 

389 return result 

390 

391 def compute_encryption_key(self, password: bytes) -> bytes: 

392 # Algorithm 3.2 

393 password = (password + self.PASSWORD_PADDING)[:32] # 1 

394 hash = md5(password) # 2 

395 hash.update(self.o) # 3 

396 # See https://github.com/pdfminer/pdfminer.six/issues/186 

397 hash.update(struct.pack("<L", self.p)) # 4 

398 hash.update(self.docid[0]) # 5 

399 if self.r >= 4: 

400 if not cast(PDFStandardSecurityHandlerV4, self).encrypt_metadata: 

401 hash.update(b"\xff\xff\xff\xff") 

402 result = hash.digest() 

403 n = 5 

404 if self.r >= 3: 

405 n = self.length // 8 

406 for _ in range(50): 

407 result = md5(result[:n]).digest() 

408 return result[:n] 

409 

410 def authenticate(self, password: str) -> Optional[bytes]: 

411 password_bytes = password.encode("latin1") 

412 key = self.authenticate_user_password(password_bytes) 

413 if key is None: 

414 key = self.authenticate_owner_password(password_bytes) 

415 return key 

416 

417 def authenticate_user_password(self, password: bytes) -> Optional[bytes]: 

418 key = self.compute_encryption_key(password) 

419 if self.verify_encryption_key(key): 

420 return key 

421 else: 

422 return None 

423 

424 def verify_encryption_key(self, key: bytes) -> bool: 

425 # Algorithm 3.6 

426 u = self.compute_u(key) 

427 if self.r == 2: 

428 return u == self.u 

429 return u[:16] == self.u[:16] 

430 

431 def authenticate_owner_password(self, password: bytes) -> Optional[bytes]: 

432 # Algorithm 3.7 

433 password = (password + self.PASSWORD_PADDING)[:32] 

434 hash = md5(password) 

435 if self.r >= 3: 

436 for _ in range(50): 

437 hash = md5(hash.digest()) 

438 n = 5 

439 if self.r >= 3: 

440 n = self.length // 8 

441 key = hash.digest()[:n] 

442 if self.r == 2: 

443 user_password = Arcfour(key).decrypt(self.o) 

444 else: 

445 user_password = self.o 

446 for i in range(19, -1, -1): 

447 k = b"".join(bytes((c ^ i,)) for c in iter(key)) 

448 user_password = Arcfour(k).decrypt(user_password) 

449 return self.authenticate_user_password(user_password) 

450 

451 def decrypt( 

452 self, 

453 objid: int, 

454 genno: int, 

455 data: bytes, 

456 attrs: Optional[Dict[str, Any]] = None, 

457 ) -> bytes: 

458 return self.decrypt_rc4(objid, genno, data) 

459 

460 def decrypt_rc4(self, objid: int, genno: int, data: bytes) -> bytes: 

461 assert self.key is not None 

462 key = self.key + struct.pack("<L", objid)[:3] + struct.pack("<L", genno)[:2] 

463 hash = md5(key) 

464 key = hash.digest()[: min(len(key), 16)] 

465 return Arcfour(key).decrypt(data) 

466 

467 

468class PDFStandardSecurityHandlerV4(PDFStandardSecurityHandler): 

469 supported_revisions: Tuple[int, ...] = (4,) 

470 

471 def init_params(self) -> None: 

472 super().init_params() 

473 self.length = 128 

474 self.cf = dict_value(self.param.get("CF")) 

475 self.stmf = literal_name(self.param["StmF"]) 

476 self.strf = literal_name(self.param["StrF"]) 

477 self.encrypt_metadata = bool(self.param.get("EncryptMetadata", True)) 

478 if self.stmf != self.strf: 

479 error_msg = "Unsupported crypt filter: param=%r" % self.param 

480 raise PDFEncryptionError(error_msg) 

481 self.cfm = {} 

482 for k, v in self.cf.items(): 

483 f = self.get_cfm(literal_name(v["CFM"])) 

484 if f is None: 

485 error_msg = "Unknown crypt filter method: param=%r" % self.param 

486 raise PDFEncryptionError(error_msg) 

487 self.cfm[k] = f 

488 self.cfm["Identity"] = self.decrypt_identity 

489 if self.strf not in self.cfm: 

490 error_msg = "Undefined crypt filter: param=%r" % self.param 

491 raise PDFEncryptionError(error_msg) 

492 

493 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]: 

494 if name == "V2": 

495 return self.decrypt_rc4 

496 elif name == "AESV2": 

497 return self.decrypt_aes128 

498 else: 

499 return None 

500 

501 def decrypt( 

502 self, 

503 objid: int, 

504 genno: int, 

505 data: bytes, 

506 attrs: Optional[Dict[str, Any]] = None, 

507 name: Optional[str] = None, 

508 ) -> bytes: 

509 if not self.encrypt_metadata and attrs is not None: 

510 t = attrs.get("Type") 

511 if t is not None and literal_name(t) == "Metadata": 

512 return data 

513 if name is None: 

514 name = self.strf 

515 return self.cfm[name](objid, genno, data) 

516 

517 def decrypt_identity(self, objid: int, genno: int, data: bytes) -> bytes: 

518 return data 

519 

520 def decrypt_aes128(self, objid: int, genno: int, data: bytes) -> bytes: 

521 assert self.key is not None 

522 key = ( 

523 self.key 

524 + struct.pack("<L", objid)[:3] 

525 + struct.pack("<L", genno)[:2] 

526 + b"sAlT" 

527 ) 

528 hash = md5(key) 

529 key = hash.digest()[: min(len(key), 16)] 

530 initialization_vector = data[:16] 

531 ciphertext = data[16:] 

532 cipher = Cipher( 

533 algorithms.AES(key), 

534 modes.CBC(initialization_vector), 

535 backend=default_backend(), 

536 ) # type: ignore 

537 return cipher.decryptor().update(ciphertext) # type: ignore 

538 

539 

540class PDFStandardSecurityHandlerV5(PDFStandardSecurityHandlerV4): 

541 supported_revisions = (5, 6) 

542 

543 def init_params(self) -> None: 

544 super().init_params() 

545 self.length = 256 

546 self.oe = str_value(self.param["OE"]) 

547 self.ue = str_value(self.param["UE"]) 

548 self.o_hash = self.o[:32] 

549 self.o_validation_salt = self.o[32:40] 

550 self.o_key_salt = self.o[40:] 

551 self.u_hash = self.u[:32] 

552 self.u_validation_salt = self.u[32:40] 

553 self.u_key_salt = self.u[40:] 

554 

555 def get_cfm(self, name: str) -> Optional[Callable[[int, int, bytes], bytes]]: 

556 if name == "AESV3": 

557 return self.decrypt_aes256 

558 else: 

559 return None 

560 

561 def authenticate(self, password: str) -> Optional[bytes]: 

562 password_b = self._normalize_password(password) 

563 hash = self._password_hash(password_b, self.o_validation_salt, self.u) 

564 if hash == self.o_hash: 

565 hash = self._password_hash(password_b, self.o_key_salt, self.u) 

566 cipher = Cipher( 

567 algorithms.AES(hash), 

568 modes.CBC(b"\0" * 16), 

569 backend=default_backend(), 

570 ) # type: ignore 

571 return cipher.decryptor().update(self.oe) # type: ignore 

572 hash = self._password_hash(password_b, self.u_validation_salt) 

573 if hash == self.u_hash: 

574 hash = self._password_hash(password_b, self.u_key_salt) 

575 cipher = Cipher( 

576 algorithms.AES(hash), 

577 modes.CBC(b"\0" * 16), 

578 backend=default_backend(), 

579 ) # type: ignore 

580 return cipher.decryptor().update(self.ue) # type: ignore 

581 return None 

582 

583 def _normalize_password(self, password: str) -> bytes: 

584 if self.r == 6: 

585 # saslprep expects non-empty strings, apparently 

586 if not password: 

587 return b"" 

588 from pdfminer._saslprep import saslprep 

589 

590 password = saslprep(password) 

591 return password.encode("utf-8")[:127] 

592 

593 def _password_hash( 

594 self, 

595 password: bytes, 

596 salt: bytes, 

597 vector: Optional[bytes] = None, 

598 ) -> bytes: 

599 """Compute password hash depending on revision number""" 

600 if self.r == 5: 

601 return self._r5_password(password, salt, vector) 

602 return self._r6_password(password, salt[0:8], vector) 

603 

604 def _r5_password( 

605 self, 

606 password: bytes, 

607 salt: bytes, 

608 vector: Optional[bytes] = None, 

609 ) -> bytes: 

610 """Compute the password for revision 5""" 

611 hash = sha256(password) 

612 hash.update(salt) 

613 if vector is not None: 

614 hash.update(vector) 

615 return hash.digest() 

616 

617 def _r6_password( 

618 self, 

619 password: bytes, 

620 salt: bytes, 

621 vector: Optional[bytes] = None, 

622 ) -> bytes: 

623 """Compute the password for revision 6""" 

624 initial_hash = sha256(password) 

625 initial_hash.update(salt) 

626 if vector is not None: 

627 initial_hash.update(vector) 

628 k = initial_hash.digest() 

629 hashes = (sha256, sha384, sha512) 

630 round_no = last_byte_val = 0 

631 while round_no < 64 or last_byte_val > round_no - 32: 

632 k1 = (password + k + (vector or b"")) * 64 

633 e = self._aes_cbc_encrypt(key=k[:16], iv=k[16:32], data=k1) 

634 # compute the first 16 bytes of e, 

635 # interpreted as an unsigned integer mod 3 

636 next_hash = hashes[self._bytes_mod_3(e[:16])] 

637 k = next_hash(e).digest() 

638 last_byte_val = e[len(e) - 1] 

639 round_no += 1 

640 return k[:32] 

641 

642 @staticmethod 

643 def _bytes_mod_3(input_bytes: bytes) -> int: 

644 # 256 is 1 mod 3, so we can just sum 'em 

645 return sum(b % 3 for b in input_bytes) % 3 

646 

647 def _aes_cbc_encrypt(self, key: bytes, iv: bytes, data: bytes) -> bytes: 

648 cipher = Cipher(algorithms.AES(key), modes.CBC(iv)) 

649 encryptor = cipher.encryptor() # type: ignore 

650 return encryptor.update(data) + encryptor.finalize() # type: ignore 

651 

652 def decrypt_aes256(self, objid: int, genno: int, data: bytes) -> bytes: 

653 initialization_vector = data[:16] 

654 ciphertext = data[16:] 

655 assert self.key is not None 

656 cipher = Cipher( 

657 algorithms.AES(self.key), 

658 modes.CBC(initialization_vector), 

659 backend=default_backend(), 

660 ) # type: ignore 

661 return cipher.decryptor().update(ciphertext) # type: ignore 

662 

663 

664class PDFDocument: 

665 """PDFDocument object represents a PDF document. 

666 

667 Since a PDF file can be very big, normally it is not loaded at 

668 once. So PDF document has to cooperate with a PDF parser in order to 

669 dynamically import the data as processing goes. 

670 

671 Typical usage: 

672 doc = PDFDocument(parser, password) 

673 obj = doc.getobj(objid) 

674 

675 """ 

676 

677 security_handler_registry: Dict[int, Type[PDFStandardSecurityHandler]] = { 

678 1: PDFStandardSecurityHandler, 

679 2: PDFStandardSecurityHandler, 

680 4: PDFStandardSecurityHandlerV4, 

681 5: PDFStandardSecurityHandlerV5, 

682 } 

683 

684 def __init__( 

685 self, 

686 parser: PDFParser, 

687 password: str = "", 

688 caching: bool = True, 

689 fallback: bool = True, 

690 ) -> None: 

691 """Set the document to use a given PDFParser object.""" 

692 self.caching = caching 

693 self.xrefs: List[PDFBaseXRef] = [] 

694 self.info = [] 

695 self.catalog: Dict[str, Any] = {} 

696 self.encryption: Optional[Tuple[Any, Any]] = None 

697 self.decipher: Optional[DecipherCallable] = None 

698 self._parser = None 

699 self._cached_objs: Dict[int, Tuple[object, int]] = {} 

700 self._parsed_objs: Dict[int, Tuple[List[object], int]] = {} 

701 self._parser = parser 

702 self._parser.set_document(self) 

703 self.is_printable = self.is_modifiable = self.is_extractable = True 

704 # Retrieve the information of each header that was appended 

705 # (maybe multiple times) at the end of the document. 

706 try: 

707 pos = self.find_xref(parser) 

708 self.read_xref_from(parser, pos, self.xrefs) 

709 except PDFNoValidXRef: 

710 if fallback: 

711 parser.fallback = True 

712 newxref = PDFXRefFallback() 

713 newxref.load(parser) 

714 self.xrefs.append(newxref) 

715 

716 for xref in self.xrefs: 

717 trailer = xref.get_trailer() 

718 if not trailer: 

719 continue 

720 # If there's an encryption info, remember it. 

721 if "Encrypt" in trailer: 

722 if "ID" in trailer: 

723 id_value = list_value(trailer["ID"]) 

724 else: 

725 # Some documents may not have a /ID, use two empty 

726 # byte strings instead. Solves 

727 # https://github.com/pdfminer/pdfminer.six/issues/594 

728 id_value = (b"", b"") 

729 self.encryption = (id_value, dict_value(trailer["Encrypt"])) 

730 self._initialize_password(password) 

731 if "Info" in trailer: 

732 self.info.append(dict_value(trailer["Info"])) 

733 if "Root" in trailer: 

734 # Every PDF file must have exactly one /Root dictionary. 

735 self.catalog = dict_value(trailer["Root"]) 

736 break 

737 else: 

738 raise PDFSyntaxError("No /Root object! - Is this really a PDF?") 

739 if self.catalog.get("Type") is not LITERAL_CATALOG: 

740 if settings.STRICT: 

741 raise PDFSyntaxError("Catalog not found!") 

742 

743 KEYWORD_OBJ = KWD(b"obj") 

744 

745 # _initialize_password(password=b'') 

746 # Perform the initialization with a given password. 

747 def _initialize_password(self, password: str = "") -> None: 

748 assert self.encryption is not None 

749 (docid, param) = self.encryption 

750 if literal_name(param.get("Filter")) != "Standard": 

751 raise PDFEncryptionError("Unknown filter: param=%r" % param) 

752 v = int_value(param.get("V", 0)) 

753 factory = self.security_handler_registry.get(v) 

754 if factory is None: 

755 raise PDFEncryptionError("Unknown algorithm: param=%r" % param) 

756 handler = factory(docid, param, password) 

757 self.decipher = handler.decrypt 

758 self.is_printable = handler.is_printable() 

759 self.is_modifiable = handler.is_modifiable() 

760 self.is_extractable = handler.is_extractable() 

761 assert self._parser is not None 

762 self._parser.fallback = False # need to read streams with exact length 

763 

764 def _getobj_objstm(self, stream: PDFStream, index: int, objid: int) -> object: 

765 if stream.objid in self._parsed_objs: 

766 (objs, n) = self._parsed_objs[stream.objid] 

767 else: 

768 (objs, n) = self._get_objects(stream) 

769 if self.caching: 

770 assert stream.objid is not None 

771 self._parsed_objs[stream.objid] = (objs, n) 

772 i = n * 2 + index 

773 try: 

774 obj = objs[i] 

775 except IndexError: 

776 raise PDFSyntaxError("index too big: %r" % index) 

777 return obj 

778 

779 def _get_objects(self, stream: PDFStream) -> Tuple[List[object], int]: 

780 if stream.get("Type") is not LITERAL_OBJSTM: 

781 if settings.STRICT: 

782 raise PDFSyntaxError("Not a stream object: %r" % stream) 

783 try: 

784 n = cast(int, stream["N"]) 

785 except KeyError: 

786 if settings.STRICT: 

787 raise PDFSyntaxError("N is not defined: %r" % stream) 

788 n = 0 

789 parser = PDFStreamParser(stream.get_data()) 

790 parser.set_document(self) 

791 objs: List[object] = [] 

792 try: 

793 while 1: 

794 (_, obj) = parser.nextobject() 

795 objs.append(obj) 

796 except PSEOF: 

797 pass 

798 return (objs, n) 

799 

800 def _getobj_parse(self, pos: int, objid: int) -> object: 

801 assert self._parser is not None 

802 self._parser.seek(pos) 

803 (_, objid1) = self._parser.nexttoken() # objid 

804 (_, genno) = self._parser.nexttoken() # genno 

805 (_, kwd) = self._parser.nexttoken() 

806 # hack around malformed pdf files 

807 # copied from https://github.com/jaepil/pdfminer3k/blob/master/ 

808 # pdfminer/pdfparser.py#L399 

809 # to solve https://github.com/pdfminer/pdfminer.six/issues/56 

810 # assert objid1 == objid, str((objid1, objid)) 

811 if objid1 != objid: 

812 x = [] 

813 while kwd is not self.KEYWORD_OBJ: 

814 (_, kwd) = self._parser.nexttoken() 

815 x.append(kwd) 

816 if len(x) >= 2: 

817 objid1 = x[-2] 

818 # #### end hack around malformed pdf files 

819 if objid1 != objid: 

820 raise PDFSyntaxError(f"objid mismatch: {objid1!r}={objid!r}") 

821 

822 if kwd != KWD(b"obj"): 

823 raise PDFSyntaxError("Invalid object spec: offset=%r" % pos) 

824 (_, obj) = self._parser.nextobject() 

825 return obj 

826 

827 # can raise PDFObjectNotFound 

828 def getobj(self, objid: int) -> object: 

829 """Get object from PDF 

830 

831 :raises PDFException if PDFDocument is not initialized 

832 :raises PDFObjectNotFound if objid does not exist in PDF 

833 """ 

834 if not self.xrefs: 

835 raise PDFException("PDFDocument is not initialized") 

836 log.debug("getobj: objid=%r", objid) 

837 if objid in self._cached_objs: 

838 (obj, genno) = self._cached_objs[objid] 

839 else: 

840 for xref in self.xrefs: 

841 try: 

842 (strmid, index, genno) = xref.get_pos(objid) 

843 except KeyError: 

844 continue 

845 try: 

846 if strmid is not None: 

847 stream = stream_value(self.getobj(strmid)) 

848 obj = self._getobj_objstm(stream, index, objid) 

849 else: 

850 obj = self._getobj_parse(index, objid) 

851 if self.decipher: 

852 obj = decipher_all(self.decipher, objid, genno, obj) 

853 

854 if isinstance(obj, PDFStream): 

855 obj.set_objid(objid, genno) 

856 break 

857 except (PSEOF, PDFSyntaxError): 

858 continue 

859 else: 

860 raise PDFObjectNotFound(objid) 

861 log.debug("register: objid=%r: %r", objid, obj) 

862 if self.caching: 

863 self._cached_objs[objid] = (obj, genno) 

864 return obj 

865 

866 OutlineType = Tuple[Any, Any, Any, Any, Any] 

867 

868 def get_outlines(self) -> Iterator[OutlineType]: 

869 if "Outlines" not in self.catalog: 

870 raise PDFNoOutlines 

871 

872 def search(entry: object, level: int) -> Iterator[PDFDocument.OutlineType]: 

873 entry = dict_value(entry) 

874 if "Title" in entry: 

875 if "A" in entry or "Dest" in entry: 

876 title = decode_text(str_value(entry["Title"])) 

877 dest = entry.get("Dest") 

878 action = entry.get("A") 

879 se = entry.get("SE") 

880 yield (level, title, dest, action, se) 

881 if "First" in entry and "Last" in entry: 

882 yield from search(entry["First"], level + 1) 

883 if "Next" in entry: 

884 yield from search(entry["Next"], level) 

885 

886 return search(self.catalog["Outlines"], 0) 

887 

888 def get_page_labels(self) -> Iterator[str]: 

889 """Generate page label strings for the PDF document. 

890 

891 If the document includes page labels, generates strings, one per page. 

892 If not, raises PDFNoPageLabels. 

893 

894 The resulting iteration is unbounded. 

895 """ 

896 assert self.catalog is not None 

897 

898 try: 

899 page_labels = PageLabels(self.catalog["PageLabels"]) 

900 except (PDFTypeError, KeyError): 

901 raise PDFNoPageLabels 

902 

903 return page_labels.labels 

904 

905 def lookup_name(self, cat: str, key: Union[str, bytes]) -> Any: 

906 try: 

907 names = dict_value(self.catalog["Names"]) 

908 except (PDFTypeError, KeyError): 

909 raise PDFKeyError((cat, key)) 

910 # may raise KeyError 

911 d0 = dict_value(names[cat]) 

912 

913 def lookup(d: Dict[str, Any]) -> Any: 

914 if "Limits" in d: 

915 (k1, k2) = list_value(d["Limits"]) 

916 if key < k1 or k2 < key: 

917 return None 

918 if "Names" in d: 

919 objs = list_value(d["Names"]) 

920 names = dict( 

921 cast(Iterator[Tuple[Union[str, bytes], Any]], choplist(2, objs)), 

922 ) 

923 return names[key] 

924 if "Kids" in d: 

925 for c in list_value(d["Kids"]): 

926 v = lookup(dict_value(c)) 

927 if v: 

928 return v 

929 raise PDFKeyError((cat, key)) 

930 

931 return lookup(d0) 

932 

933 def get_dest(self, name: Union[str, bytes]) -> Any: 

934 try: 

935 # PDF-1.2 or later 

936 obj = self.lookup_name("Dests", name) 

937 except KeyError: 

938 # PDF-1.1 or prior 

939 if "Dests" not in self.catalog: 

940 raise PDFDestinationNotFound(name) 

941 d0 = dict_value(self.catalog["Dests"]) 

942 if name not in d0: 

943 raise PDFDestinationNotFound(name) 

944 obj = d0[name] 

945 return obj 

946 

947 # find_xref 

948 def find_xref(self, parser: PDFParser) -> int: 

949 """Internal function used to locate the first XRef.""" 

950 # search the last xref table by scanning the file backwards. 

951 prev = b"" 

952 for line in parser.revreadlines(): 

953 line = line.strip() 

954 log.debug("find_xref: %r", line) 

955 

956 if line == b"startxref": 

957 log.debug("xref found: pos=%r", prev) 

958 

959 if not prev.isdigit(): 

960 raise PDFNoValidXRef(f"Invalid xref position: {prev!r}") 

961 

962 start = int(prev) 

963 

964 if not start >= 0: 

965 raise PDFNoValidXRef(f"Invalid negative xref position: {start}") 

966 

967 return start 

968 

969 if line: 

970 prev = line 

971 

972 raise PDFNoValidXRef("Unexpected EOF") 

973 

974 # read xref table 

975 def read_xref_from( 

976 self, 

977 parser: PDFParser, 

978 start: int, 

979 xrefs: List[PDFBaseXRef], 

980 ) -> None: 

981 """Reads XRefs from the given location.""" 

982 parser.seek(start) 

983 parser.reset() 

984 try: 

985 (pos, token) = parser.nexttoken() 

986 except PSEOF: 

987 raise PDFNoValidXRef("Unexpected EOF") 

988 log.debug("read_xref_from: start=%d, token=%r", start, token) 

989 if isinstance(token, int): 

990 # XRefStream: PDF-1.5 

991 parser.seek(pos) 

992 parser.reset() 

993 xref: PDFBaseXRef = PDFXRefStream() 

994 xref.load(parser) 

995 else: 

996 if token is parser.KEYWORD_XREF: 

997 parser.nextline() 

998 xref = PDFXRef() 

999 xref.load(parser) 

1000 xrefs.append(xref) 

1001 trailer = xref.get_trailer() 

1002 log.debug("trailer: %r", trailer) 

1003 if "XRefStm" in trailer: 

1004 pos = int_value(trailer["XRefStm"]) 

1005 self.read_xref_from(parser, pos, xrefs) 

1006 if "Prev" in trailer: 

1007 # find previous xref 

1008 pos = int_value(trailer["Prev"]) 

1009 self.read_xref_from(parser, pos, xrefs) 

1010 

1011 

1012class PageLabels(NumberTree): 

1013 """PageLabels from the document catalog. 

1014 

1015 See Section 8.3.1 in the PDF Reference. 

1016 """ 

1017 

1018 @property 

1019 def labels(self) -> Iterator[str]: 

1020 ranges = self.values 

1021 

1022 # The tree must begin with page index 0 

1023 if len(ranges) == 0 or ranges[0][0] != 0: 

1024 if settings.STRICT: 

1025 raise PDFSyntaxError("PageLabels is missing page index 0") 

1026 else: 

1027 # Try to cope, by assuming empty labels for the initial pages 

1028 ranges.insert(0, (0, {})) 

1029 

1030 for next, (start, label_dict_unchecked) in enumerate(ranges, 1): 

1031 label_dict = dict_value(label_dict_unchecked) 

1032 style = label_dict.get("S") 

1033 prefix = decode_text(str_value(label_dict.get("P", b""))) 

1034 first_value = int_value(label_dict.get("St", 1)) 

1035 

1036 if next == len(ranges): 

1037 # This is the last specified range. It continues until the end 

1038 # of the document. 

1039 values: Iterable[int] = itertools.count(first_value) 

1040 else: 

1041 end, _ = ranges[next] 

1042 range_length = end - start 

1043 values = range(first_value, first_value + range_length) 

1044 

1045 for value in values: 

1046 label = self._format_page_label(value, style) 

1047 yield prefix + label 

1048 

1049 @staticmethod 

1050 def _format_page_label(value: int, style: Any) -> str: 

1051 """Format page label value in a specific style""" 

1052 if style is None: 

1053 label = "" 

1054 elif style is LIT("D"): # Decimal arabic numerals 

1055 label = str(value) 

1056 elif style is LIT("R"): # Uppercase roman numerals 

1057 label = format_int_roman(value).upper() 

1058 elif style is LIT("r"): # Lowercase roman numerals 

1059 label = format_int_roman(value) 

1060 elif style is LIT("A"): # Uppercase letters A-Z, AA-ZZ... 

1061 label = format_int_alpha(value).upper() 

1062 elif style is LIT("a"): # Lowercase letters a-z, aa-zz... 

1063 label = format_int_alpha(value) 

1064 else: 

1065 log.warning("Unknown page label style: %r", style) 

1066 label = "" 

1067 return label