Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_reader.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

742 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com> 

3# 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are 

8# met: 

9# 

10# * Redistributions of source code must retain the above copyright notice, 

11# this list of conditions and the following disclaimer. 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# * The name of the author may not be used to endorse or promote products 

16# derived from this software without specific prior written permission. 

17# 

18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

28# POSSIBILITY OF SUCH DAMAGE. 

29 

30import os 

31import re 

32import sys 

33from collections.abc import Iterable 

34from io import BytesIO, UnsupportedOperation 

35from pathlib import Path 

36from types import TracebackType 

37from typing import ( 

38 TYPE_CHECKING, 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46if sys.version_info >= (3, 11): 

47 from typing import Self 

48else: 

49 from typing_extensions import Self 

50 

51from ._doc_common import PdfDocCommon, convert_to_int 

52from ._encryption import Encryption, PasswordType 

53from ._utils import ( 

54 WHITESPACES_AS_BYTES, 

55 StrByteType, 

56 StreamType, 

57 logger_warning, 

58 read_non_whitespace, 

59 read_previous_line, 

60 read_until_whitespace, 

61 skip_over_comment, 

62 skip_over_whitespace, 

63) 

64from .constants import TrailerKeys as TK 

65from .errors import ( 

66 EmptyFileError, 

67 FileNotDecryptedError, 

68 LimitReachedError, 

69 PdfReadError, 

70 PdfStreamError, 

71 WrongPasswordError, 

72) 

73from .generic import ( 

74 ArrayObject, 

75 ContentStream, 

76 DecodedStreamObject, 

77 DictionaryObject, 

78 EncodedStreamObject, 

79 IndirectObject, 

80 NameObject, 

81 NullObject, 

82 NumberObject, 

83 PdfObject, 

84 StreamObject, 

85 TextStringObject, 

86 is_null_or_none, 

87 read_object, 

88) 

89from .xmp import XmpInformation 

90 

91if TYPE_CHECKING: 

92 from ._page import PageObject 

93 

94 

95class PdfReader(PdfDocCommon): 

96 """ 

97 Initialize a PdfReader object. 

98 

99 This operation can take some time, as the PDF stream's cross-reference 

100 tables are read into memory. 

101 

102 Args: 

103 stream: A File object or an object that supports the standard read 

104 and seek methods similar to a File object. Could also be a 

105 string representing a path to a PDF file. 

106 strict: Determines whether user should be warned of all 

107 problems and also causes some correctable problems to be fatal. 

108 Defaults to ``False``. 

109 password: Decrypt PDF file at initialization. If the 

110 password is None, the file will not be decrypted. 

111 Defaults to ``None``. 

112 root_object_recovery_limit: The maximum number of objects to query 

113 for recovering the Root object in non-strict mode. To disable 

114 this security measure, pass ``None``. 

115 

116 """ 

117 

118 def __init__( 

119 self, 

120 stream: Union[StrByteType, Path], 

121 strict: bool = False, 

122 password: Union[None, str, bytes] = None, 

123 *, 

124 root_object_recovery_limit: Optional[int] = 10_000, 

125 ) -> None: 

126 self.strict = strict 

127 self.flattened_pages: Optional[list[PageObject]] = None 

128 

129 #: Storage of parsed PDF objects. 

130 self.resolved_objects: dict[tuple[Any, Any], Optional[PdfObject]] = {} 

131 

132 self._startxref: int = 0 

133 self.xref_index = 0 

134 self.xref: dict[int, dict[Any, Any]] = {} 

135 self.xref_free_entry: dict[int, dict[Any, Any]] = {} 

136 self.xref_objStm: dict[int, tuple[Any, Any]] = {} 

137 self.trailer = DictionaryObject() 

138 

139 # Security parameters. 

140 self._root_object_recovery_limit = ( 

141 root_object_recovery_limit if isinstance(root_object_recovery_limit, int) else sys.maxsize 

142 ) 

143 

144 # Map page indirect_reference number to page number 

145 self._page_id2num: Optional[dict[Any, Any]] = None 

146 

147 self._validated_root: Optional[DictionaryObject] = None 

148 

149 self._initialize_stream(stream) 

150 self._known_objects: set[tuple[int, int]] = set() 

151 

152 self._override_encryption = False 

153 self._encryption: Optional[Encryption] = None 

154 if self.is_encrypted: 

155 self._handle_encryption(password) 

156 elif password is not None: 

157 raise PdfReadError("Not an encrypted file") 

158 

159 def _initialize_stream(self, stream: Union[StrByteType, Path]) -> None: 

160 if hasattr(stream, "mode") and "b" not in stream.mode: 

161 logger_warning( 

162 "PdfReader stream/file object is not in binary mode. " 

163 "It may not be read correctly.", 

164 __name__, 

165 ) 

166 self._stream_opened = False 

167 if isinstance(stream, (str, Path)): 

168 with open(stream, "rb") as fh: 

169 stream = BytesIO(fh.read()) 

170 self._stream_opened = True 

171 self.read(stream) 

172 self.stream = stream 

173 

174 def _handle_encryption(self, password: Optional[Union[str, bytes]]) -> None: 

175 self._override_encryption = True 

176 # Some documents may not have a /ID, use two empty 

177 # byte strings instead. Solves 

178 # https://github.com/py-pdf/pypdf/issues/608 

179 id_entry = self.trailer.get(TK.ID) 

180 id1_entry = id_entry[0].get_object().original_bytes if id_entry else b"" 

181 encrypt_entry = cast(DictionaryObject, self.trailer[TK.ENCRYPT].get_object()) 

182 self._encryption = Encryption.read(encrypt_entry, id1_entry) 

183 

184 # try empty password if no password provided 

185 pwd = password if password is not None else b"" 

186 if ( 

187 self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED 

188 and password is not None 

189 ): 

190 # raise if password provided 

191 raise WrongPasswordError("Wrong password") 

192 self._override_encryption = False 

193 

194 def __enter__(self) -> Self: 

195 return self 

196 

197 def __exit__( 

198 self, 

199 exc_type: Optional[type[BaseException]], 

200 exc_val: Optional[BaseException], 

201 exc_tb: Optional[TracebackType], 

202 ) -> None: 

203 self.close() 

204 

205 def close(self) -> None: 

206 """Close the stream if opened in __init__ and clear memory.""" 

207 if self._stream_opened: 

208 self.stream.close() 

209 self.flattened_pages = [] 

210 self.resolved_objects = {} 

211 self.trailer = DictionaryObject() 

212 self.xref = {} 

213 self.xref_free_entry = {} 

214 self.xref_objStm = {} 

215 

216 @property 

217 def root_object(self) -> DictionaryObject: 

218 """Provide access to "/Root". Standardized with PdfWriter.""" 

219 if self._validated_root: 

220 return self._validated_root 

221 root = self.trailer.get(TK.ROOT) 

222 if is_null_or_none(root): 

223 logger_warning('Cannot find "/Root" key in trailer', __name__) 

224 elif ( 

225 cast(DictionaryObject, cast(PdfObject, root).get_object()).get("/Type") 

226 == "/Catalog" 

227 ): 

228 self._validated_root = cast( 

229 DictionaryObject, cast(PdfObject, root).get_object() 

230 ) 

231 else: 

232 logger_warning("Invalid Root object in trailer", __name__) 

233 if self._validated_root is None: 

234 logger_warning('Searching object with "/Catalog" key', __name__) 

235 number_of_objects = cast(int, self.trailer.get("/Size", 0)) 

236 for i in range(number_of_objects): 

237 if i >= self._root_object_recovery_limit: 

238 raise LimitReachedError("Maximum Root object recovery limit reached.") 

239 try: 

240 obj = self.get_object(i + 1) 

241 except Exception: # to be sure to capture all errors 

242 obj = None 

243 if isinstance(obj, DictionaryObject) and obj.get("/Type") == "/Catalog": 

244 self._validated_root = obj 

245 logger_warning(f"Root found at {obj.indirect_reference!r}", __name__) 

246 break 

247 if self._validated_root is None: 

248 if not is_null_or_none(root) and "/Pages" in cast(DictionaryObject, cast(PdfObject, root).get_object()): 

249 logger_warning( 

250 f"Possible root found at {cast(PdfObject, root).indirect_reference!r}, but missing /Catalog key", 

251 __name__ 

252 ) 

253 self._validated_root = cast( 

254 DictionaryObject, cast(PdfObject, root).get_object() 

255 ) 

256 else: 

257 raise PdfReadError("Cannot find Root object in pdf") 

258 return self._validated_root 

259 

260 @property 

261 def _info(self) -> Optional[DictionaryObject]: 

262 """ 

263 Provide access to "/Info". Standardized with PdfWriter. 

264 

265 Returns: 

266 /Info Dictionary; None if the entry does not exist 

267 

268 """ 

269 info = self.trailer.get(TK.INFO, None) 

270 if is_null_or_none(info): 

271 return None 

272 assert info is not None, "mypy" 

273 info = info.get_object() 

274 if not isinstance(info, DictionaryObject): 

275 raise PdfReadError( 

276 "Trailer not found or does not point to a document information dictionary" 

277 ) 

278 return info 

279 

280 @property 

281 def _ID(self) -> Optional[ArrayObject]: 

282 """ 

283 Provide access to "/ID". Standardized with PdfWriter. 

284 

285 Returns: 

286 /ID array; None if the entry does not exist 

287 

288 """ 

289 id = self.trailer.get(TK.ID, None) 

290 if is_null_or_none(id): 

291 return None 

292 assert id is not None, "mypy" 

293 return cast(ArrayObject, id.get_object()) 

294 

295 @property 

296 def pdf_header(self) -> str: 

297 """ 

298 The first 8 bytes of the file. 

299 

300 This is typically something like ``'%PDF-1.6'`` and can be used to 

301 detect if the file is actually a PDF file and which version it is. 

302 """ 

303 # TODO: Make this return a bytes object for consistency 

304 # but that needs a deprecation 

305 loc = self.stream.tell() 

306 self.stream.seek(0, 0) 

307 pdf_file_version = self.stream.read(8).decode("utf-8", "backslashreplace") 

308 self.stream.seek(loc, 0) # return to where it was 

309 return pdf_file_version 

310 

311 @property 

312 def xmp_metadata(self) -> Optional[XmpInformation]: 

313 """XMP (Extensible Metadata Platform) data.""" 

314 try: 

315 self._override_encryption = True 

316 return cast(XmpInformation, self.root_object.xmp_metadata) 

317 finally: 

318 self._override_encryption = False 

319 

320 def _get_page_number_by_indirect( 

321 self, indirect_reference: Union[None, int, NullObject, IndirectObject] 

322 ) -> Optional[int]: 

323 """ 

324 Retrieve the page number from an indirect reference. 

325 

326 Args: 

327 indirect_reference: The indirect reference to locate. 

328 

329 Returns: 

330 Page number or None. 

331 

332 """ 

333 if self._page_id2num is None: 

334 self._page_id2num = { 

335 x.indirect_reference.idnum: i for i, x in enumerate(self.pages) # type: ignore 

336 } 

337 

338 if is_null_or_none(indirect_reference): 

339 return None 

340 assert isinstance(indirect_reference, (int, IndirectObject)), "mypy" 

341 if isinstance(indirect_reference, int): 

342 idnum = indirect_reference 

343 else: 

344 idnum = indirect_reference.idnum 

345 assert self._page_id2num is not None, "hint for mypy" 

346 return self._page_id2num.get(idnum, None) 

347 

348 def _get_object_from_stream( 

349 self, indirect_reference: IndirectObject 

350 ) -> Union[int, PdfObject, str]: 

351 # indirect reference to object in object stream 

352 # read the entire object stream into memory 

353 stmnum, idx = self.xref_objStm[indirect_reference.idnum] 

354 obj_stm: EncodedStreamObject = IndirectObject(stmnum, 0, self).get_object() # type: ignore 

355 # This is an xref to a stream, so its type better be a stream 

356 assert cast(str, obj_stm["/Type"]) == "/ObjStm" 

357 stream_data = BytesIO(obj_stm.get_data()) 

358 for i in range(obj_stm["/N"]): # type: ignore 

359 read_non_whitespace(stream_data) 

360 stream_data.seek(-1, 1) 

361 objnum = NumberObject.read_from_stream(stream_data) 

362 read_non_whitespace(stream_data) 

363 stream_data.seek(-1, 1) 

364 offset = NumberObject.read_from_stream(stream_data) 

365 read_non_whitespace(stream_data) 

366 stream_data.seek(-1, 1) 

367 if objnum != indirect_reference.idnum: 

368 # We're only interested in one object 

369 continue 

370 if self.strict and idx != i: 

371 raise PdfReadError("Object is in wrong index.") 

372 stream_data.seek(int(obj_stm["/First"] + offset), 0) # type: ignore 

373 

374 # To cope with case where the 'pointer' is on a white space 

375 read_non_whitespace(stream_data) 

376 stream_data.seek(-1, 1) 

377 

378 try: 

379 obj = read_object(stream_data, self) 

380 except PdfStreamError as exc: 

381 # Stream object cannot be read. Normally, a critical error, but 

382 # Adobe Reader doesn't complain, so continue (in strict mode?) 

383 logger_warning( 

384 f"Invalid stream (index {i}) within object " 

385 f"{indirect_reference.idnum} {indirect_reference.generation}: " 

386 f"{exc}", 

387 __name__, 

388 ) 

389 

390 if self.strict: # pragma: no cover 

391 raise PdfReadError( 

392 f"Cannot read object stream: {exc}" 

393 ) # pragma: no cover 

394 # Replace with null. Hopefully it's nothing important. 

395 obj = NullObject() # pragma: no cover 

396 return obj 

397 

398 if self.strict: # pragma: no cover 

399 raise PdfReadError( 

400 "This is a fatal error in strict mode." 

401 ) # pragma: no cover 

402 return NullObject() # pragma: no cover 

403 

404 def get_object( 

405 self, indirect_reference: Union[int, IndirectObject] 

406 ) -> Optional[PdfObject]: 

407 if isinstance(indirect_reference, int): 

408 indirect_reference = IndirectObject(indirect_reference, 0, self) 

409 retval = self.cache_get_indirect_object( 

410 indirect_reference.generation, indirect_reference.idnum 

411 ) 

412 if retval is not None: 

413 return retval 

414 if ( 

415 indirect_reference.generation == 0 

416 and indirect_reference.idnum in self.xref_objStm 

417 ): 

418 retval = self._get_object_from_stream(indirect_reference) # type: ignore 

419 elif ( 

420 indirect_reference.generation in self.xref 

421 and indirect_reference.idnum in self.xref[indirect_reference.generation] 

422 ): 

423 if self.xref_free_entry.get(indirect_reference.generation, {}).get( 

424 indirect_reference.idnum, False 

425 ): 

426 return NullObject() 

427 start = self.xref[indirect_reference.generation][indirect_reference.idnum] 

428 self.stream.seek(start, 0) 

429 try: 

430 idnum, generation = self.read_object_header(self.stream) 

431 if ( 

432 idnum != indirect_reference.idnum 

433 or generation != indirect_reference.generation 

434 ): 

435 raise PdfReadError("Not matching, we parse the file for it") 

436 except Exception: 

437 if hasattr(self.stream, "getbuffer"): 

438 buf = bytes(self.stream.getbuffer()) 

439 else: 

440 p = self.stream.tell() 

441 self.stream.seek(0, 0) 

442 buf = self.stream.read(-1) 

443 self.stream.seek(p, 0) 

444 m = re.search( 

445 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(), 

446 buf, 

447 ) 

448 if m is not None: 

449 logger_warning( 

450 f"Object ID {indirect_reference.idnum},{indirect_reference.generation} ref repaired", 

451 __name__, 

452 ) 

453 self.xref[indirect_reference.generation][ 

454 indirect_reference.idnum 

455 ] = (m.start(0) + 1) 

456 self.stream.seek(m.start(0) + 1) 

457 idnum, generation = self.read_object_header(self.stream) 

458 else: 

459 idnum = -1 

460 generation = -1 # exception will be raised below 

461 if idnum != indirect_reference.idnum and self.xref_index: 

462 # xref table probably had bad indexes due to not being zero-indexed 

463 if self.strict: 

464 raise PdfReadError( 

465 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) " 

466 f"does not match actual ({idnum} {generation}); " 

467 "xref table not zero-indexed." 

468 ) 

469 # xref table is corrected in non-strict mode 

470 elif idnum != indirect_reference.idnum and self.strict: 

471 # some other problem 

472 raise PdfReadError( 

473 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) " 

474 f"does not match actual ({idnum} {generation})." 

475 ) 

476 if self.strict: 

477 assert generation == indirect_reference.generation 

478 

479 current_object = (indirect_reference.idnum, indirect_reference.generation) 

480 if current_object in self._known_objects: 

481 raise PdfReadError(f"Detected loop with self reference for {indirect_reference!r}.") 

482 self._known_objects.add(current_object) 

483 retval = read_object(self.stream, self) # type: ignore 

484 self._known_objects.remove(current_object) 

485 

486 # override encryption is used for the /Encrypt dictionary 

487 if not self._override_encryption and self._encryption is not None: 

488 # if we don't have the encryption key: 

489 if not self._encryption.is_decrypted(): 

490 raise FileNotDecryptedError("File has not been decrypted") 

491 # otherwise, decrypt here... 

492 retval = cast(PdfObject, retval) 

493 retval = self._encryption.decrypt_object( 

494 retval, indirect_reference.idnum, indirect_reference.generation 

495 ) 

496 else: 

497 if hasattr(self.stream, "getbuffer"): 

498 buf = bytes(self.stream.getbuffer()) 

499 else: 

500 p = self.stream.tell() 

501 self.stream.seek(0, 0) 

502 buf = self.stream.read(-1) 

503 self.stream.seek(p, 0) 

504 m = re.search( 

505 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(), 

506 buf, 

507 ) 

508 if m is not None: 

509 logger_warning( 

510 f"Object {indirect_reference.idnum} {indirect_reference.generation} found", 

511 __name__, 

512 ) 

513 if indirect_reference.generation not in self.xref: 

514 self.xref[indirect_reference.generation] = {} 

515 self.xref[indirect_reference.generation][indirect_reference.idnum] = ( 

516 m.start(0) + 1 

517 ) 

518 self.stream.seek(m.end(0) + 1) 

519 skip_over_whitespace(self.stream) 

520 self.stream.seek(-1, 1) 

521 retval = read_object(self.stream, self) # type: ignore 

522 

523 # override encryption is used for the /Encrypt dictionary 

524 if not self._override_encryption and self._encryption is not None: 

525 # if we don't have the encryption key: 

526 if not self._encryption.is_decrypted(): 

527 raise FileNotDecryptedError("File has not been decrypted") 

528 # otherwise, decrypt here... 

529 retval = cast(PdfObject, retval) 

530 retval = self._encryption.decrypt_object( 

531 retval, indirect_reference.idnum, indirect_reference.generation 

532 ) 

533 else: 

534 logger_warning( 

535 f"Object {indirect_reference.idnum} {indirect_reference.generation} not defined.", 

536 __name__, 

537 ) 

538 if self.strict: 

539 raise PdfReadError("Could not find object.") 

540 self.cache_indirect_object( 

541 indirect_reference.generation, indirect_reference.idnum, retval 

542 ) 

543 return retval 

544 

545 def read_object_header(self, stream: StreamType) -> tuple[int, int]: 

546 # Should never be necessary to read out whitespace, since the 

547 # cross-reference table should put us in the right spot to read the 

548 # object header. In reality some files have stupid cross-reference 

549 # tables that are off by whitespace bytes. 

550 skip_over_comment(stream) 

551 extra = skip_over_whitespace(stream) 

552 stream.seek(-1, 1) 

553 idnum = read_until_whitespace(stream) 

554 extra |= skip_over_whitespace(stream) 

555 stream.seek(-1, 1) 

556 generation = read_until_whitespace(stream) 

557 extra |= skip_over_whitespace(stream) 

558 stream.seek(-1, 1) 

559 

560 # although it's not used, it might still be necessary to read 

561 _obj = stream.read(3) 

562 

563 read_non_whitespace(stream) 

564 stream.seek(-1, 1) 

565 if extra and self.strict: 

566 logger_warning( 

567 f"Superfluous whitespace found in object header {idnum} {generation}", # type: ignore 

568 __name__, 

569 ) 

570 return int(idnum), int(generation) 

571 

572 def cache_get_indirect_object( 

573 self, generation: int, idnum: int 

574 ) -> Optional[PdfObject]: 

575 try: 

576 return self.resolved_objects.get((generation, idnum)) 

577 except RecursionError: 

578 raise PdfReadError("Maximum recursion depth reached.") 

579 

580 def cache_indirect_object( 

581 self, generation: int, idnum: int, obj: Optional[PdfObject] 

582 ) -> Optional[PdfObject]: 

583 if (generation, idnum) in self.resolved_objects: 

584 msg = f"Overwriting cache for {generation} {idnum}" 

585 if self.strict: 

586 raise PdfReadError(msg) 

587 logger_warning(msg, __name__) 

588 self.resolved_objects[(generation, idnum)] = obj 

589 if obj is not None: 

590 obj.indirect_reference = IndirectObject(idnum, generation, self) 

591 return obj 

592 

593 def _replace_object(self, indirect_reference: IndirectObject, obj: PdfObject) -> PdfObject: 

594 # function reserved for future development 

595 if indirect_reference.pdf != self: 

596 raise ValueError("Cannot update PdfReader with external object") 

597 if (indirect_reference.generation, indirect_reference.idnum) not in self.resolved_objects: 

598 raise ValueError("Cannot find referenced object") 

599 self.resolved_objects[(indirect_reference.generation, indirect_reference.idnum)] = obj 

600 obj.indirect_reference = indirect_reference 

601 return obj 

602 

603 def read(self, stream: StreamType) -> None: 

604 """ 

605 Read and process the PDF stream, extracting necessary data. 

606 

607 Args: 

608 stream: The PDF file stream. 

609 

610 """ 

611 self._basic_validation(stream) 

612 self._find_eof_marker(stream) 

613 startxref = self._find_startxref_pos(stream) 

614 self._startxref = startxref 

615 

616 # check and eventually correct the startxref only if not strict 

617 xref_issue_nr = self._get_xref_issues(stream, startxref) 

618 if xref_issue_nr != 0: 

619 if self.strict and xref_issue_nr: 

620 raise PdfReadError("Broken xref table") 

621 logger_warning(f"incorrect startxref pointer({xref_issue_nr})", __name__) 

622 

623 # read all cross-reference tables and their trailers 

624 self._read_xref_tables_and_trailers(stream, startxref, xref_issue_nr) 

625 

626 # if not zero-indexed, verify that the table is correct; change it if necessary 

627 if self.xref_index and not self.strict: 

628 loc = stream.tell() 

629 for gen, xref_entry in self.xref.items(): 

630 if gen == 65535: 

631 continue 

632 xref_k = sorted( 

633 xref_entry.keys() 

634 ) # ensure ascending to prevent damage 

635 for id in xref_k: 

636 stream.seek(xref_entry[id], 0) 

637 try: 

638 pid, _pgen = self.read_object_header(stream) 

639 except ValueError: 

640 self._rebuild_xref_table(stream) 

641 break 

642 if pid == id - self.xref_index: 

643 # fixing index item per item is required for revised PDF. 

644 self.xref[gen][pid] = self.xref[gen][id] 

645 del self.xref[gen][id] 

646 # if not, then either it's just plain wrong, or the 

647 # non-zero-index is actually correct 

648 stream.seek(loc, 0) # return to where it was 

649 

650 # remove wrong objects (not pointing to correct structures) - cf #2326 

651 if not self.strict: 

652 loc = stream.tell() 

653 for gen, xref_entry in self.xref.items(): 

654 if gen == 65535: 

655 continue 

656 ids = list(xref_entry.keys()) 

657 for id in ids: 

658 stream.seek(xref_entry[id], 0) 

659 try: 

660 self.read_object_header(stream) 

661 except ValueError: 

662 logger_warning( 

663 f"Ignoring wrong pointing object {id} {gen} (offset {xref_entry[id]})", 

664 __name__, 

665 ) 

666 del xref_entry[id] # we can delete the id, we are parsing ids 

667 stream.seek(loc, 0) # return to where it was 

668 

669 def _basic_validation(self, stream: StreamType) -> None: 

670 """Ensure the stream is valid and not empty.""" 

671 stream.seek(0, os.SEEK_SET) 

672 try: 

673 header_byte = stream.read(5) 

674 except UnicodeDecodeError: 

675 raise UnsupportedOperation("cannot read header") 

676 if header_byte == b"": 

677 raise EmptyFileError("Cannot read an empty file") 

678 if header_byte != b"%PDF-": 

679 if self.strict: 

680 raise PdfReadError( 

681 f"PDF starts with '{header_byte.decode('utf8')}', " 

682 "but '%PDF-' expected" 

683 ) 

684 logger_warning(f"invalid pdf header: {header_byte}", __name__) 

685 stream.seek(0, os.SEEK_END) 

686 

687 def _find_eof_marker(self, stream: StreamType) -> None: 

688 """ 

689 Jump to the %%EOF marker. 

690 

691 According to the specs, the %%EOF marker should be at the very end of 

692 the file. Hence for standard-compliant PDF documents this function will 

693 read only the last part (DEFAULT_BUFFER_SIZE). 

694 """ 

695 HEADER_SIZE = 8 # to parse whole file, Header is e.g. '%PDF-1.6' 

696 line = b"" 

697 first = True 

698 while not line.startswith(b"%%EOF"): 

699 if line != b"" and first: 

700 if any( 

701 line.strip().endswith(tr) for tr in (b"%%EO", b"%%E", b"%%", b"%") 

702 ): 

703 # Consider the file as truncated while 

704 # having enough confidence to carry on. 

705 logger_warning("EOF marker seems truncated", __name__) 

706 break 

707 first = False 

708 if b"startxref" in line: 

709 logger_warning( 

710 "CAUTION: startxref found while searching for %%EOF. " 

711 "The file might be truncated and some data might not be read.", 

712 __name__, 

713 ) 

714 if stream.tell() < HEADER_SIZE: 

715 if self.strict: 

716 raise PdfReadError("EOF marker not found") 

717 logger_warning("EOF marker not found", __name__) 

718 line = read_previous_line(stream) 

719 

720 def _find_startxref_pos(self, stream: StreamType) -> int: 

721 """ 

722 Find startxref entry - the location of the xref table. 

723 

724 Args: 

725 stream: 

726 

727 Returns: 

728 The bytes offset 

729 

730 """ 

731 line = read_previous_line(stream) 

732 try: 

733 startxref = int(line) 

734 except ValueError: 

735 # 'startxref' may be on the same line as the location 

736 if not line.startswith(b"startxref"): 

737 raise PdfReadError("startxref not found") 

738 startxref = int(line[9:].strip()) 

739 logger_warning("startxref on same line as offset", __name__) 

740 else: 

741 line = read_previous_line(stream) 

742 if not line.startswith(b"startxref"): 

743 raise PdfReadError("startxref not found") 

744 return startxref 

745 

746 def _read_standard_xref_table(self, stream: StreamType) -> None: 

747 # standard cross-reference table 

748 ref = stream.read(3) 

749 if ref != b"ref": 

750 raise PdfReadError("xref table read error") 

751 read_non_whitespace(stream) 

752 stream.seek(-1, 1) 

753 first_time = True # check if the first time looking at the xref table 

754 while True: 

755 num = cast(int, read_object(stream, self)) 

756 if first_time and num != 0: 

757 self.xref_index = num 

758 if self.strict: 

759 logger_warning( 

760 "Xref table not zero-indexed. ID numbers for objects will be corrected.", 

761 __name__, 

762 ) 

763 # if table not zero indexed, could be due to error from when PDF was created 

764 # which will lead to mismatched indices later on, only warned and corrected if self.strict==True 

765 first_time = False 

766 read_non_whitespace(stream) 

767 stream.seek(-1, 1) 

768 size = cast(int, read_object(stream, self)) 

769 if not isinstance(size, int): 

770 logger_warning( 

771 "Invalid/Truncated xref table. Rebuilding it.", 

772 __name__, 

773 ) 

774 self._rebuild_xref_table(stream) 

775 stream.read() 

776 return 

777 read_non_whitespace(stream) 

778 stream.seek(-1, 1) 

779 cnt = 0 

780 while cnt < size: 

781 line = stream.read(20) 

782 if not line: 

783 raise PdfReadError("Unexpected empty line in Xref table.") 

784 

785 # It's very clear in section 3.4.3 of the PDF spec 

786 # that all cross-reference table lines are a fixed 

787 # 20 bytes (as of PDF 1.7). However, some files have 

788 # 21-byte entries (or more) due to the use of \r\n 

789 # (CRLF) EOL's. Detect that case, and adjust the line 

790 # until it does not begin with a \r (CR) or \n (LF). 

791 while line[0] in b"\x0D\x0A": 

792 stream.seek(-20 + 1, 1) 

793 line = stream.read(20) 

794 

795 # On the other hand, some malformed PDF files 

796 # use a single character EOL without a preceding 

797 # space. Detect that case, and seek the stream 

798 # back one character (0-9 means we've bled into 

799 # the next xref entry, t means we've bled into the 

800 # text "trailer"): 

801 if line[-1] in b"0123456789t": 

802 stream.seek(-1, 1) 

803 

804 try: 

805 offset_b, generation_b = line[:16].split(b" ") 

806 entry_type_b = line[17:18] 

807 

808 offset, generation = int(offset_b), int(generation_b) 

809 except Exception: 

810 if hasattr(stream, "getbuffer"): 

811 buf = bytes(stream.getbuffer()) 

812 else: 

813 p = stream.tell() 

814 stream.seek(0, 0) 

815 buf = stream.read(-1) 

816 stream.seek(p) 

817 

818 f = re.search(rf"{num}\s+(\d+)\s+obj".encode(), buf) 

819 if f is None: 

820 logger_warning( 

821 f"entry {num} in Xref table invalid; object not found", 

822 __name__, 

823 ) 

824 generation = 65535 

825 offset = -1 

826 entry_type_b = b"f" 

827 else: 

828 logger_warning( 

829 f"entry {num} in Xref table invalid but object found", 

830 __name__, 

831 ) 

832 generation = int(f.group(1)) 

833 offset = f.start() 

834 

835 if generation not in self.xref: 

836 self.xref[generation] = {} 

837 self.xref_free_entry[generation] = {} 

838 if num in self.xref[generation]: 

839 # It really seems like we should allow the last 

840 # xref table in the file to override previous 

841 # ones. Since we read the file backwards, assume 

842 # any existing key is already set correctly. 

843 pass 

844 else: 

845 if entry_type_b == b"n": 

846 self.xref[generation][num] = offset 

847 try: 

848 self.xref_free_entry[generation][num] = entry_type_b == b"f" 

849 except Exception: 

850 pass 

851 try: 

852 self.xref_free_entry[65535][num] = entry_type_b == b"f" 

853 except Exception: 

854 pass 

855 cnt += 1 

856 num += 1 

857 read_non_whitespace(stream) 

858 stream.seek(-1, 1) 

859 trailer_tag = stream.read(7) 

860 if trailer_tag != b"trailer": 

861 # more xrefs! 

862 stream.seek(-7, 1) 

863 else: 

864 break 

865 

866 def _read_xref_tables_and_trailers( 

867 self, stream: StreamType, startxref: Optional[int], xref_issue_nr: int 

868 ) -> None: 

869 """Read the cross-reference tables and trailers in the PDF stream.""" 

870 self.xref = {} 

871 self.xref_free_entry = {} 

872 self.xref_objStm = {} 

873 self.trailer = DictionaryObject() 

874 visited_xref_offsets: set[int] = set() 

875 while startxref is not None: 

876 # Detect circular /Prev references in the xref chain 

877 if startxref in visited_xref_offsets: 

878 logger_warning( 

879 f"Circular xref chain detected at offset {startxref}, stopping", 

880 __name__, 

881 ) 

882 break 

883 visited_xref_offsets.add(startxref) 

884 # load the xref table 

885 stream.seek(startxref, 0) 

886 x = stream.read(1) 

887 if x in b"\r\n": 

888 x = stream.read(1) 

889 if x == b"x": 

890 startxref = self._read_xref(stream) 

891 elif xref_issue_nr: 

892 try: 

893 self._rebuild_xref_table(stream) 

894 break 

895 except Exception: 

896 xref_issue_nr = 0 

897 elif x.isdigit(): 

898 try: 

899 xrefstream = self._read_pdf15_xref_stream(stream) 

900 except Exception as e: 

901 if TK.ROOT in self.trailer: 

902 logger_warning( 

903 f"Previous trailer cannot be read: {e.args}", __name__ 

904 ) 

905 break 

906 raise PdfReadError(f"Trailer cannot be read: {e!s}") 

907 self._process_xref_stream(xrefstream) 

908 if "/Prev" in xrefstream: 

909 startxref = cast(int, xrefstream["/Prev"]) 

910 else: 

911 break 

912 else: 

913 startxref = self._read_xref_other_error(stream, startxref) 

914 

915 def _process_xref_stream(self, xrefstream: DictionaryObject) -> None: 

916 """Process and handle the xref stream.""" 

917 trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE 

918 for key in trailer_keys: 

919 if key in xrefstream and key not in self.trailer: 

920 self.trailer[NameObject(key)] = xrefstream.raw_get(key) 

921 if "/XRefStm" in xrefstream: 

922 p = self.stream.tell() 

923 self.stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0) 

924 self._read_pdf15_xref_stream(self.stream) 

925 self.stream.seek(p, 0) 

926 

927 def _read_xref(self, stream: StreamType) -> Optional[int]: 

928 self._read_standard_xref_table(stream) 

929 if stream.read(1) == b"": 

930 return None 

931 stream.seek(-1, 1) 

932 read_non_whitespace(stream) 

933 stream.seek(-1, 1) 

934 new_trailer = cast(dict[str, Any], read_object(stream, self)) 

935 for key, value in new_trailer.items(): 

936 if key not in self.trailer: 

937 self.trailer[key] = value 

938 if "/XRefStm" in new_trailer: 

939 p = stream.tell() 

940 stream.seek(cast(int, new_trailer["/XRefStm"]) + 1, 0) 

941 try: 

942 self._read_pdf15_xref_stream(stream) 

943 except Exception: 

944 logger_warning( 

945 f"XRef object at {new_trailer['/XRefStm']} can not be read, some object may be missing", 

946 __name__, 

947 ) 

948 stream.seek(p, 0) 

949 if "/Prev" in new_trailer: 

950 return new_trailer["/Prev"] 

951 return None 

952 

953 def _read_xref_other_error( 

954 self, stream: StreamType, startxref: int 

955 ) -> Optional[int]: 

956 # some PDFs have /Prev=0 in the trailer, instead of no /Prev 

957 if startxref == 0: 

958 if self.strict: 

959 raise PdfReadError( 

960 "/Prev=0 in the trailer (try opening with strict=False)" 

961 ) 

962 logger_warning( 

963 "/Prev=0 in the trailer - assuming there is no previous xref table", 

964 __name__, 

965 ) 

966 return None 

967 # bad xref character at startxref. Let's see if we can find 

968 # the xref table nearby, as we've observed this error with an 

969 # off-by-one before. 

970 stream.seek(-11, 1) 

971 tmp = stream.read(20) 

972 xref_loc = tmp.find(b"xref") 

973 if xref_loc != -1: 

974 startxref -= 10 - xref_loc 

975 return startxref 

976 # No explicit xref table, try finding a cross-reference stream. 

977 stream.seek(startxref, 0) 

978 for look in range(25): # value extended to cope with more linearized files 

979 if stream.read(1).isdigit(): 

980 # This is not a standard PDF, consider adding a warning 

981 startxref += look 

982 return startxref 

983 # no xref table found at specified location 

984 if "/Root" in self.trailer and not self.strict: 

985 # if Root has been already found, just raise warning 

986 logger_warning("Invalid parent xref., rebuild xref", __name__) 

987 try: 

988 self._rebuild_xref_table(stream) 

989 return None 

990 except Exception: 

991 raise PdfReadError("Cannot rebuild xref") 

992 raise PdfReadError("Could not find xref table at specified location") 

993 

994 def _read_pdf15_xref_stream( 

995 self, stream: StreamType 

996 ) -> Union[ContentStream, EncodedStreamObject, DecodedStreamObject]: 

997 """Read the cross-reference stream for PDF 1.5+.""" 

998 stream.seek(-1, 1) 

999 idnum, generation = self.read_object_header(stream) 

1000 xrefstream = cast(ContentStream, read_object(stream, self)) 

1001 if cast(str, xrefstream["/Type"]) != "/XRef": 

1002 raise PdfReadError(f"Unexpected type {xrefstream['/Type']!r}") 

1003 self.cache_indirect_object(generation, idnum, xrefstream) 

1004 

1005 # Index pairs specify the subsections in the dictionary. 

1006 # If none, create one subsection that spans everything. 

1007 if "/Size" not in xrefstream: 

1008 # According to table 17 of the PDF 2.0 specification, this key is required. 

1009 raise PdfReadError(f"Size missing from XRef stream {xrefstream!r}!") 

1010 idx_pairs = xrefstream.get("/Index", [0, xrefstream["/Size"]]) 

1011 

1012 entry_sizes = cast(dict[Any, Any], xrefstream.get("/W")) 

1013 assert len(entry_sizes) >= 3 

1014 if self.strict and len(entry_sizes) > 3: 

1015 raise PdfReadError(f"Too many entry sizes: {entry_sizes}") 

1016 

1017 stream_data = BytesIO(xrefstream.get_data()) 

1018 

1019 def get_entry(i: int) -> Union[int, tuple[int, ...]]: 

1020 # Reads the correct number of bytes for each entry. See the 

1021 # discussion of the W parameter in PDF spec table 17. 

1022 if entry_sizes[i] > 0: 

1023 d = stream_data.read(entry_sizes[i]) 

1024 return convert_to_int(d, entry_sizes[i]) 

1025 

1026 # PDF Spec Table 17: A value of zero for an element in the 

1027 # W array indicates...the default value shall be used 

1028 if i == 0: 

1029 return 1 # First value defaults to 1 

1030 return 0 

1031 

1032 def used_before(num: int, generation: Union[int, tuple[int, ...]]) -> bool: 

1033 # We move backwards through the xrefs, don't replace any. 

1034 return num in self.xref.get(generation, []) or num in self.xref_objStm # type: ignore 

1035 

1036 # Iterate through each subsection 

1037 self._read_xref_subsections(idx_pairs, get_entry, used_before) 

1038 return xrefstream 

1039 

1040 @staticmethod 

1041 def _get_xref_issues(stream: StreamType, startxref: int) -> int: 

1042 """ 

1043 Return an int which indicates an issue. 0 means there is no issue. 

1044 

1045 Args: 

1046 stream: 

1047 startxref: 

1048 

1049 Returns: 

1050 0 means no issue, other values represent specific issues. 

1051 

1052 """ 

1053 if startxref == 0: 

1054 return 4 

1055 

1056 stream.seek(startxref - 1, 0) # -1 to check character before 

1057 line = stream.read(1) 

1058 if line == b"j": 

1059 line = stream.read(1) 

1060 if line not in b"\r\n \t": 

1061 return 1 

1062 line = stream.read(4) 

1063 if line != b"xref": 

1064 # not a xref so check if it is an XREF object 

1065 line = b"" 

1066 while line in b"0123456789 \t": 

1067 line = stream.read(1) 

1068 if line == b"": 

1069 return 2 

1070 line += stream.read(2) # 1 char already read, +2 to check "obj" 

1071 if line.lower() != b"obj": 

1072 return 3 

1073 return 0 

1074 

1075 @classmethod 

1076 def _find_pdf_objects(cls, data: bytes) -> Iterable[tuple[int, int, int]]: 

1077 index = 0 

1078 ord_0 = ord("0") 

1079 ord_9 = ord("9") 

1080 while True: 

1081 index = data.find(b" obj", index) 

1082 if index == -1: 

1083 return 

1084 

1085 index_before_space = index - 1 

1086 

1087 # Skip whitespace backwards 

1088 while index_before_space >= 0 and data[index_before_space] in WHITESPACES_AS_BYTES: 

1089 index_before_space -= 1 

1090 

1091 # Read generation number 

1092 generation_end = index_before_space + 1 

1093 while index_before_space >= 0 and ord_0 <= data[index_before_space] <= ord_9: 

1094 index_before_space -= 1 

1095 generation_start = index_before_space + 1 

1096 

1097 # Skip whitespace 

1098 while index_before_space >= 0 and data[index_before_space] in WHITESPACES_AS_BYTES: 

1099 index_before_space -= 1 

1100 

1101 # Read object number 

1102 object_end = index_before_space + 1 

1103 while index_before_space >= 0 and ord_0 <= data[index_before_space] <= ord_9: 

1104 index_before_space -= 1 

1105 object_start = index_before_space + 1 

1106 

1107 # Validate 

1108 if object_start < object_end and generation_start < generation_end: 

1109 object_number = int(data[object_start:object_end]) 

1110 generation_number = int(data[generation_start:generation_end]) 

1111 

1112 yield object_number, generation_number, object_start 

1113 

1114 index += 4 # len(b" obj") 

1115 

1116 @classmethod 

1117 def _find_pdf_trailers(cls, data: bytes) -> Iterable[int]: 

1118 index = 0 

1119 data_length = len(data) 

1120 while True: 

1121 index = data.find(b"trailer", index) 

1122 if index == -1: 

1123 return 

1124 

1125 index_after_trailer = index + 7 # len(b"trailer") 

1126 

1127 # Skip whitespace 

1128 while index_after_trailer < data_length and data[index_after_trailer] in WHITESPACES_AS_BYTES: 

1129 index_after_trailer += 1 

1130 

1131 # Must be dictionary start 

1132 if index_after_trailer + 1 < data_length and data[index_after_trailer:index_after_trailer+2] == b"<<": 

1133 yield index_after_trailer # offset of '<<' 

1134 

1135 index += 7 # len(b"trailer") 

1136 

1137 def _rebuild_xref_table(self, stream: StreamType) -> None: 

1138 self.xref = {} 

1139 stream.seek(0, 0) 

1140 stream_data = stream.read(-1) 

1141 

1142 for object_number, generation_number, object_start in self._find_pdf_objects(stream_data): 

1143 if generation_number not in self.xref: 

1144 self.xref[generation_number] = {} 

1145 self.xref[generation_number][object_number] = object_start 

1146 

1147 logger_warning("parsing for Object Streams", __name__) 

1148 for generation_number in self.xref: 

1149 for object_number in self.xref[generation_number]: 

1150 # get_object in manual 

1151 stream.seek(self.xref[generation_number][object_number], 0) 

1152 try: 

1153 _ = self.read_object_header(stream) 

1154 obj = cast(StreamObject, read_object(stream, self)) 

1155 if obj.get("/Type", "") != "/ObjStm": 

1156 continue 

1157 object_stream = BytesIO(obj.get_data()) 

1158 actual_count = 0 

1159 while True: 

1160 current = read_until_whitespace(object_stream) 

1161 if not current.isdigit(): 

1162 break 

1163 inner_object_number = int(current) 

1164 skip_over_whitespace(object_stream) 

1165 object_stream.seek(-1, 1) 

1166 current = read_until_whitespace(object_stream) 

1167 if not current.isdigit(): # pragma: no cover 

1168 break # pragma: no cover 

1169 inner_generation_number = int(current) 

1170 self.xref_objStm[inner_object_number] = (object_number, inner_generation_number) 

1171 actual_count += 1 

1172 if actual_count != obj.get("/N"): # pragma: no cover 

1173 logger_warning( # pragma: no cover 

1174 f"found {actual_count} objects within Object({object_number},{generation_number})" 

1175 f" whereas {obj.get('/N')} expected", 

1176 __name__, 

1177 ) 

1178 except Exception: # could be multiple causes 

1179 pass 

1180 

1181 stream.seek(0, 0) 

1182 for position in self._find_pdf_trailers(stream_data): 

1183 stream.seek(position, 0) 

1184 new_trailer = cast(dict[Any, Any], read_object(stream, self)) 

1185 # Here, we are parsing the file from start to end, the new data have to erase the existing. 

1186 for key, value in new_trailer.items(): 

1187 self.trailer[key] = value 

1188 

1189 def _read_xref_subsections( 

1190 self, 

1191 idx_pairs: list[int], 

1192 get_entry: Callable[[int], Union[int, tuple[int, ...]]], 

1193 used_before: Callable[[int, Union[int, tuple[int, ...]]], bool], 

1194 ) -> None: 

1195 """Read and process the subsections of the xref.""" 

1196 for start, size in self._pairs(idx_pairs): 

1197 # The subsections must increase 

1198 for num in range(start, start + size): 

1199 # The first entry is the type 

1200 xref_type = get_entry(0) 

1201 # The rest of the elements depend on the xref_type 

1202 if xref_type == 0: 

1203 # linked list of free objects 

1204 next_free_object = get_entry(1) # noqa: F841 

1205 next_generation = get_entry(2) # noqa: F841 

1206 elif xref_type == 1: 

1207 # objects that are in use but are not compressed 

1208 byte_offset = get_entry(1) 

1209 generation = get_entry(2) 

1210 if generation not in self.xref: 

1211 self.xref[generation] = {} # type: ignore 

1212 if not used_before(num, generation): 

1213 self.xref[generation][num] = byte_offset # type: ignore 

1214 elif xref_type == 2: 

1215 # compressed objects 

1216 objstr_num = get_entry(1) 

1217 obstr_idx = get_entry(2) 

1218 generation = 0 # PDF spec table 18, generation is 0 

1219 if not used_before(num, generation): 

1220 self.xref_objStm[num] = (objstr_num, obstr_idx) 

1221 elif self.strict: 

1222 raise PdfReadError(f"Unknown xref type: {xref_type}") 

1223 

1224 def _pairs(self, array: list[int]) -> Iterable[tuple[int, int]]: 

1225 """Iterate over pairs in the array.""" 

1226 i = 0 

1227 while i + 1 < len(array): 

1228 yield array[i], array[i + 1] 

1229 i += 2 

1230 

1231 def decrypt(self, password: Union[str, bytes]) -> PasswordType: 

1232 """ 

1233 When using an encrypted / secured PDF file with the PDF Standard 

1234 encryption handler, this function will allow the file to be decrypted. 

1235 It checks the given password against the document's user password and 

1236 owner password, and then stores the resulting decryption key if either 

1237 password is correct. 

1238 

1239 It does not matter which password was matched. Both passwords provide 

1240 the correct decryption key that will allow the document to be used with 

1241 this library. 

1242 

1243 Args: 

1244 password: The password to match. 

1245 

1246 Returns: 

1247 An indicator if the document was decrypted and whether it was the 

1248 owner password or the user password. 

1249 

1250 """ 

1251 if not self._encryption: 

1252 raise PdfReadError("Not encrypted file") 

1253 # TODO: raise Exception for wrong password 

1254 return self._encryption.verify(password) 

1255 

1256 @property 

1257 def is_encrypted(self) -> bool: 

1258 """ 

1259 Read-only boolean property showing whether this PDF file is encrypted. 

1260 

1261 Note that this property, if true, will remain true even after the 

1262 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called. 

1263 """ 

1264 return TK.ENCRYPT in self.trailer 

1265 

1266 def add_form_topname(self, name: str) -> Optional[DictionaryObject]: 

1267 """ 

1268 Add a top level form that groups all form fields below it. 

1269 

1270 Args: 

1271 name: text string of the "/T" Attribute of the created object 

1272 

1273 Returns: 

1274 The created object. ``None`` means no object was created. 

1275 

1276 """ 

1277 catalog = self.root_object 

1278 

1279 if "/AcroForm" not in catalog or not isinstance( 

1280 catalog["/AcroForm"], DictionaryObject 

1281 ): 

1282 return None 

1283 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")]) 

1284 if "/Fields" not in acroform: 

1285 # TODO: No error but this may be extended for XFA Forms 

1286 return None 

1287 

1288 interim = DictionaryObject() 

1289 interim[NameObject("/T")] = TextStringObject(name) 

1290 interim[NameObject("/Kids")] = acroform[NameObject("/Fields")] 

1291 self.cache_indirect_object( 

1292 0, 

1293 max(i for (g, i) in self.resolved_objects if g == 0) + 1, 

1294 interim, 

1295 ) 

1296 arr = ArrayObject() 

1297 arr.append(interim.indirect_reference) 

1298 acroform[NameObject("/Fields")] = arr 

1299 for o in cast(ArrayObject, interim["/Kids"]): 

1300 obj = o.get_object() 

1301 if "/Parent" in obj: 

1302 logger_warning( 

1303 f"Top Level Form Field {obj.indirect_reference} have a non-expected parent", 

1304 __name__, 

1305 ) 

1306 obj[NameObject("/Parent")] = interim.indirect_reference 

1307 return interim 

1308 

1309 def rename_form_topname(self, name: str) -> Optional[DictionaryObject]: 

1310 """ 

1311 Rename top level form field that all form fields below it. 

1312 

1313 Args: 

1314 name: text string of the "/T" field of the created object 

1315 

1316 Returns: 

1317 The modified object. ``None`` means no object was modified. 

1318 

1319 """ 

1320 catalog = self.root_object 

1321 

1322 if "/AcroForm" not in catalog or not isinstance( 

1323 catalog["/AcroForm"], DictionaryObject 

1324 ): 

1325 return None 

1326 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")]) 

1327 if "/Fields" not in acroform: 

1328 return None 

1329 

1330 interim = cast( 

1331 DictionaryObject, 

1332 cast(ArrayObject, acroform[NameObject("/Fields")])[0].get_object(), 

1333 ) 

1334 interim[NameObject("/T")] = TextStringObject(name) 

1335 return interim 

1336 

1337 def _repr_mimebundle_( 

1338 self, 

1339 include: Union[None, Iterable[str]] = None, 

1340 exclude: Union[None, Iterable[str]] = None, 

1341 ) -> dict[str, Any]: 

1342 """ 

1343 Integration into Jupyter Notebooks. 

1344 

1345 This method returns a dictionary that maps a mime-type to its 

1346 representation. 

1347 

1348 .. seealso:: 

1349 

1350 https://ipython.readthedocs.io/en/stable/config/integrating.html 

1351 """ 

1352 self.stream.seek(0) 

1353 pdf_data = self.stream.read() 

1354 data = { 

1355 "application/pdf": pdf_data, 

1356 } 

1357 

1358 if include is not None: 

1359 # Filter representations based on include list 

1360 data = {k: v for k, v in data.items() if k in include} 

1361 

1362 if exclude is not None: 

1363 # Remove representations based on exclude list 

1364 data = {k: v for k, v in data.items() if k not in exclude} 

1365 

1366 return data