Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pypdf/_reader.py: 34%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

734 statements  

1# Copyright (c) 2006, Mathieu Fenniak 

2# Copyright (c) 2007, Ashish Kulkarni <kulkarni.ashish@gmail.com> 

3# 

4# All rights reserved. 

5# 

6# Redistribution and use in source and binary forms, with or without 

7# modification, are permitted provided that the following conditions are 

8# met: 

9# 

10# * Redistributions of source code must retain the above copyright notice, 

11# this list of conditions and the following disclaimer. 

12# * Redistributions in binary form must reproduce the above copyright notice, 

13# this list of conditions and the following disclaimer in the documentation 

14# and/or other materials provided with the distribution. 

15# * The name of the author may not be used to endorse or promote products 

16# derived from this software without specific prior written permission. 

17# 

18# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" 

19# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE 

20# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 

21# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE 

22# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR 

23# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF 

24# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 

25# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN 

26# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) 

27# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 

28# POSSIBILITY OF SUCH DAMAGE. 

29 

30import os 

31import re 

32import sys 

33from collections.abc import Iterable 

34from io import BytesIO, UnsupportedOperation 

35from pathlib import Path 

36from types import TracebackType 

37from typing import ( 

38 TYPE_CHECKING, 

39 Any, 

40 Callable, 

41 Optional, 

42 Union, 

43 cast, 

44) 

45 

46from ._doc_common import PdfDocCommon, convert_to_int 

47from ._encryption import Encryption, PasswordType 

48from ._utils import ( 

49 WHITESPACES_AS_BYTES, 

50 StrByteType, 

51 StreamType, 

52 logger_warning, 

53 read_non_whitespace, 

54 read_previous_line, 

55 read_until_whitespace, 

56 skip_over_comment, 

57 skip_over_whitespace, 

58) 

59from .constants import TrailerKeys as TK 

60from .errors import ( 

61 EmptyFileError, 

62 FileNotDecryptedError, 

63 LimitReachedError, 

64 PdfReadError, 

65 PdfStreamError, 

66 WrongPasswordError, 

67) 

68from .generic import ( 

69 ArrayObject, 

70 ContentStream, 

71 DecodedStreamObject, 

72 DictionaryObject, 

73 EncodedStreamObject, 

74 IndirectObject, 

75 NameObject, 

76 NullObject, 

77 NumberObject, 

78 PdfObject, 

79 StreamObject, 

80 TextStringObject, 

81 is_null_or_none, 

82 read_object, 

83) 

84from .xmp import XmpInformation 

85 

86if TYPE_CHECKING: 

87 from ._page import PageObject 

88 

89 

90class PdfReader(PdfDocCommon): 

91 """ 

92 Initialize a PdfReader object. 

93 

94 This operation can take some time, as the PDF stream's cross-reference 

95 tables are read into memory. 

96 

97 Args: 

98 stream: A File object or an object that supports the standard read 

99 and seek methods similar to a File object. Could also be a 

100 string representing a path to a PDF file. 

101 strict: Determines whether user should be warned of all 

102 problems and also causes some correctable problems to be fatal. 

103 Defaults to ``False``. 

104 password: Decrypt PDF file at initialization. If the 

105 password is None, the file will not be decrypted. 

106 Defaults to ``None``. 

107 root_object_recovery_limit: The maximum number of objects to query 

108 for recovering the Root object in non-strict mode. To disable 

109 this security measure, pass ``None``. 

110 

111 """ 

112 

113 def __init__( 

114 self, 

115 stream: Union[StrByteType, Path], 

116 strict: bool = False, 

117 password: Union[None, str, bytes] = None, 

118 *, 

119 root_object_recovery_limit: Optional[int] = 10_000, 

120 ) -> None: 

121 self.strict = strict 

122 self.flattened_pages: Optional[list[PageObject]] = None 

123 

124 #: Storage of parsed PDF objects. 

125 self.resolved_objects: dict[tuple[Any, Any], Optional[PdfObject]] = {} 

126 

127 self._startxref: int = 0 

128 self.xref_index = 0 

129 self.xref: dict[int, dict[Any, Any]] = {} 

130 self.xref_free_entry: dict[int, dict[Any, Any]] = {} 

131 self.xref_objStm: dict[int, tuple[Any, Any]] = {} 

132 self.trailer = DictionaryObject() 

133 

134 # Security parameters. 

135 self._root_object_recovery_limit = ( 

136 root_object_recovery_limit if isinstance(root_object_recovery_limit, int) else sys.maxsize 

137 ) 

138 

139 # Map page indirect_reference number to page number 

140 self._page_id2num: Optional[dict[Any, Any]] = None 

141 

142 self._validated_root: Optional[DictionaryObject] = None 

143 

144 self._initialize_stream(stream) 

145 self._known_objects: set[tuple[int, int]] = set() 

146 

147 self._override_encryption = False 

148 self._encryption: Optional[Encryption] = None 

149 if self.is_encrypted: 

150 self._handle_encryption(password) 

151 elif password is not None: 

152 raise PdfReadError("Not an encrypted file") 

153 

154 def _initialize_stream(self, stream: Union[StrByteType, Path]) -> None: 

155 if hasattr(stream, "mode") and "b" not in stream.mode: 

156 logger_warning( 

157 "PdfReader stream/file object is not in binary mode. " 

158 "It may not be read correctly.", 

159 __name__, 

160 ) 

161 self._stream_opened = False 

162 if isinstance(stream, (str, Path)): 

163 with open(stream, "rb") as fh: 

164 stream = BytesIO(fh.read()) 

165 self._stream_opened = True 

166 self.read(stream) 

167 self.stream = stream 

168 

169 def _handle_encryption(self, password: Optional[Union[str, bytes]]) -> None: 

170 self._override_encryption = True 

171 # Some documents may not have a /ID, use two empty 

172 # byte strings instead. Solves 

173 # https://github.com/py-pdf/pypdf/issues/608 

174 id_entry = self.trailer.get(TK.ID) 

175 id1_entry = id_entry[0].get_object().original_bytes if id_entry else b"" 

176 encrypt_entry = cast(DictionaryObject, self.trailer[TK.ENCRYPT].get_object()) 

177 self._encryption = Encryption.read(encrypt_entry, id1_entry) 

178 

179 # try empty password if no password provided 

180 pwd = password if password is not None else b"" 

181 if ( 

182 self._encryption.verify(pwd) == PasswordType.NOT_DECRYPTED 

183 and password is not None 

184 ): 

185 # raise if password provided 

186 raise WrongPasswordError("Wrong password") 

187 self._override_encryption = False 

188 

189 def __enter__(self) -> "PdfReader": 

190 return self 

191 

192 def __exit__( 

193 self, 

194 exc_type: Optional[type[BaseException]], 

195 exc_val: Optional[BaseException], 

196 exc_tb: Optional[TracebackType], 

197 ) -> None: 

198 self.close() 

199 

200 def close(self) -> None: 

201 """Close the stream if opened in __init__ and clear memory.""" 

202 if self._stream_opened: 

203 self.stream.close() 

204 self.flattened_pages = [] 

205 self.resolved_objects = {} 

206 self.trailer = DictionaryObject() 

207 self.xref = {} 

208 self.xref_free_entry = {} 

209 self.xref_objStm = {} 

210 

211 @property 

212 def root_object(self) -> DictionaryObject: 

213 """Provide access to "/Root". Standardized with PdfWriter.""" 

214 if self._validated_root: 

215 return self._validated_root 

216 root = self.trailer.get(TK.ROOT) 

217 if is_null_or_none(root): 

218 logger_warning('Cannot find "/Root" key in trailer', __name__) 

219 elif ( 

220 cast(DictionaryObject, cast(PdfObject, root).get_object()).get("/Type") 

221 == "/Catalog" 

222 ): 

223 self._validated_root = cast( 

224 DictionaryObject, cast(PdfObject, root).get_object() 

225 ) 

226 else: 

227 logger_warning("Invalid Root object in trailer", __name__) 

228 if self._validated_root is None: 

229 logger_warning('Searching object with "/Catalog" key', __name__) 

230 number_of_objects = cast(int, self.trailer.get("/Size", 0)) 

231 for i in range(number_of_objects): 

232 if i >= self._root_object_recovery_limit: 

233 raise LimitReachedError("Maximum Root object recovery limit reached.") 

234 try: 

235 obj = self.get_object(i + 1) 

236 except Exception: # to be sure to capture all errors 

237 obj = None 

238 if isinstance(obj, DictionaryObject) and obj.get("/Type") == "/Catalog": 

239 self._validated_root = obj 

240 logger_warning(f"Root found at {obj.indirect_reference!r}", __name__) 

241 break 

242 if self._validated_root is None: 

243 if not is_null_or_none(root) and "/Pages" in cast(DictionaryObject, cast(PdfObject, root).get_object()): 

244 logger_warning( 

245 f"Possible root found at {cast(PdfObject, root).indirect_reference!r}, but missing /Catalog key", 

246 __name__ 

247 ) 

248 self._validated_root = cast( 

249 DictionaryObject, cast(PdfObject, root).get_object() 

250 ) 

251 else: 

252 raise PdfReadError("Cannot find Root object in pdf") 

253 return self._validated_root 

254 

255 @property 

256 def _info(self) -> Optional[DictionaryObject]: 

257 """ 

258 Provide access to "/Info". Standardized with PdfWriter. 

259 

260 Returns: 

261 /Info Dictionary; None if the entry does not exist 

262 

263 """ 

264 info = self.trailer.get(TK.INFO, None) 

265 if is_null_or_none(info): 

266 return None 

267 assert info is not None, "mypy" 

268 info = info.get_object() 

269 if not isinstance(info, DictionaryObject): 

270 raise PdfReadError( 

271 "Trailer not found or does not point to a document information dictionary" 

272 ) 

273 return info 

274 

275 @property 

276 def _ID(self) -> Optional[ArrayObject]: 

277 """ 

278 Provide access to "/ID". Standardized with PdfWriter. 

279 

280 Returns: 

281 /ID array; None if the entry does not exist 

282 

283 """ 

284 id = self.trailer.get(TK.ID, None) 

285 if is_null_or_none(id): 

286 return None 

287 assert id is not None, "mypy" 

288 return cast(ArrayObject, id.get_object()) 

289 

290 @property 

291 def pdf_header(self) -> str: 

292 """ 

293 The first 8 bytes of the file. 

294 

295 This is typically something like ``'%PDF-1.6'`` and can be used to 

296 detect if the file is actually a PDF file and which version it is. 

297 """ 

298 # TODO: Make this return a bytes object for consistency 

299 # but that needs a deprecation 

300 loc = self.stream.tell() 

301 self.stream.seek(0, 0) 

302 pdf_file_version = self.stream.read(8).decode("utf-8", "backslashreplace") 

303 self.stream.seek(loc, 0) # return to where it was 

304 return pdf_file_version 

305 

306 @property 

307 def xmp_metadata(self) -> Optional[XmpInformation]: 

308 """XMP (Extensible Metadata Platform) data.""" 

309 try: 

310 self._override_encryption = True 

311 return cast(XmpInformation, self.root_object.xmp_metadata) 

312 finally: 

313 self._override_encryption = False 

314 

315 def _get_page_number_by_indirect( 

316 self, indirect_reference: Union[None, int, NullObject, IndirectObject] 

317 ) -> Optional[int]: 

318 """ 

319 Retrieve the page number from an indirect reference. 

320 

321 Args: 

322 indirect_reference: The indirect reference to locate. 

323 

324 Returns: 

325 Page number or None. 

326 

327 """ 

328 if self._page_id2num is None: 

329 self._page_id2num = { 

330 x.indirect_reference.idnum: i for i, x in enumerate(self.pages) # type: ignore 

331 } 

332 

333 if is_null_or_none(indirect_reference): 

334 return None 

335 assert isinstance(indirect_reference, (int, IndirectObject)), "mypy" 

336 if isinstance(indirect_reference, int): 

337 idnum = indirect_reference 

338 else: 

339 idnum = indirect_reference.idnum 

340 assert self._page_id2num is not None, "hint for mypy" 

341 return self._page_id2num.get(idnum, None) 

342 

343 def _get_object_from_stream( 

344 self, indirect_reference: IndirectObject 

345 ) -> Union[int, PdfObject, str]: 

346 # indirect reference to object in object stream 

347 # read the entire object stream into memory 

348 stmnum, idx = self.xref_objStm[indirect_reference.idnum] 

349 obj_stm: EncodedStreamObject = IndirectObject(stmnum, 0, self).get_object() # type: ignore 

350 # This is an xref to a stream, so its type better be a stream 

351 assert cast(str, obj_stm["/Type"]) == "/ObjStm" 

352 stream_data = BytesIO(obj_stm.get_data()) 

353 for i in range(obj_stm["/N"]): # type: ignore 

354 read_non_whitespace(stream_data) 

355 stream_data.seek(-1, 1) 

356 objnum = NumberObject.read_from_stream(stream_data) 

357 read_non_whitespace(stream_data) 

358 stream_data.seek(-1, 1) 

359 offset = NumberObject.read_from_stream(stream_data) 

360 read_non_whitespace(stream_data) 

361 stream_data.seek(-1, 1) 

362 if objnum != indirect_reference.idnum: 

363 # We're only interested in one object 

364 continue 

365 if self.strict and idx != i: 

366 raise PdfReadError("Object is in wrong index.") 

367 stream_data.seek(int(obj_stm["/First"] + offset), 0) # type: ignore 

368 

369 # To cope with case where the 'pointer' is on a white space 

370 read_non_whitespace(stream_data) 

371 stream_data.seek(-1, 1) 

372 

373 try: 

374 obj = read_object(stream_data, self) 

375 except PdfStreamError as exc: 

376 # Stream object cannot be read. Normally, a critical error, but 

377 # Adobe Reader doesn't complain, so continue (in strict mode?) 

378 logger_warning( 

379 f"Invalid stream (index {i}) within object " 

380 f"{indirect_reference.idnum} {indirect_reference.generation}: " 

381 f"{exc}", 

382 __name__, 

383 ) 

384 

385 if self.strict: # pragma: no cover 

386 raise PdfReadError( 

387 f"Cannot read object stream: {exc}" 

388 ) # pragma: no cover 

389 # Replace with null. Hopefully it's nothing important. 

390 obj = NullObject() # pragma: no cover 

391 return obj 

392 

393 if self.strict: # pragma: no cover 

394 raise PdfReadError( 

395 "This is a fatal error in strict mode." 

396 ) # pragma: no cover 

397 return NullObject() # pragma: no cover 

398 

399 def get_object( 

400 self, indirect_reference: Union[int, IndirectObject] 

401 ) -> Optional[PdfObject]: 

402 if isinstance(indirect_reference, int): 

403 indirect_reference = IndirectObject(indirect_reference, 0, self) 

404 retval = self.cache_get_indirect_object( 

405 indirect_reference.generation, indirect_reference.idnum 

406 ) 

407 if retval is not None: 

408 return retval 

409 if ( 

410 indirect_reference.generation == 0 

411 and indirect_reference.idnum in self.xref_objStm 

412 ): 

413 retval = self._get_object_from_stream(indirect_reference) # type: ignore 

414 elif ( 

415 indirect_reference.generation in self.xref 

416 and indirect_reference.idnum in self.xref[indirect_reference.generation] 

417 ): 

418 if self.xref_free_entry.get(indirect_reference.generation, {}).get( 

419 indirect_reference.idnum, False 

420 ): 

421 return NullObject() 

422 start = self.xref[indirect_reference.generation][indirect_reference.idnum] 

423 self.stream.seek(start, 0) 

424 try: 

425 idnum, generation = self.read_object_header(self.stream) 

426 if ( 

427 idnum != indirect_reference.idnum 

428 or generation != indirect_reference.generation 

429 ): 

430 raise PdfReadError("Not matching, we parse the file for it") 

431 except Exception: 

432 if hasattr(self.stream, "getbuffer"): 

433 buf = bytes(self.stream.getbuffer()) 

434 else: 

435 p = self.stream.tell() 

436 self.stream.seek(0, 0) 

437 buf = self.stream.read(-1) 

438 self.stream.seek(p, 0) 

439 m = re.search( 

440 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(), 

441 buf, 

442 ) 

443 if m is not None: 

444 logger_warning( 

445 f"Object ID {indirect_reference.idnum},{indirect_reference.generation} ref repaired", 

446 __name__, 

447 ) 

448 self.xref[indirect_reference.generation][ 

449 indirect_reference.idnum 

450 ] = (m.start(0) + 1) 

451 self.stream.seek(m.start(0) + 1) 

452 idnum, generation = self.read_object_header(self.stream) 

453 else: 

454 idnum = -1 

455 generation = -1 # exception will be raised below 

456 if idnum != indirect_reference.idnum and self.xref_index: 

457 # xref table probably had bad indexes due to not being zero-indexed 

458 if self.strict: 

459 raise PdfReadError( 

460 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) " 

461 f"does not match actual ({idnum} {generation}); " 

462 "xref table not zero-indexed." 

463 ) 

464 # xref table is corrected in non-strict mode 

465 elif idnum != indirect_reference.idnum and self.strict: 

466 # some other problem 

467 raise PdfReadError( 

468 f"Expected object ID ({indirect_reference.idnum} {indirect_reference.generation}) " 

469 f"does not match actual ({idnum} {generation})." 

470 ) 

471 if self.strict: 

472 assert generation == indirect_reference.generation 

473 

474 current_object = (indirect_reference.idnum, indirect_reference.generation) 

475 if current_object in self._known_objects: 

476 raise PdfReadError(f"Detected loop with self reference for {indirect_reference!r}.") 

477 self._known_objects.add(current_object) 

478 retval = read_object(self.stream, self) # type: ignore 

479 self._known_objects.remove(current_object) 

480 

481 # override encryption is used for the /Encrypt dictionary 

482 if not self._override_encryption and self._encryption is not None: 

483 # if we don't have the encryption key: 

484 if not self._encryption.is_decrypted(): 

485 raise FileNotDecryptedError("File has not been decrypted") 

486 # otherwise, decrypt here... 

487 retval = cast(PdfObject, retval) 

488 retval = self._encryption.decrypt_object( 

489 retval, indirect_reference.idnum, indirect_reference.generation 

490 ) 

491 else: 

492 if hasattr(self.stream, "getbuffer"): 

493 buf = bytes(self.stream.getbuffer()) 

494 else: 

495 p = self.stream.tell() 

496 self.stream.seek(0, 0) 

497 buf = self.stream.read(-1) 

498 self.stream.seek(p, 0) 

499 m = re.search( 

500 rf"\s{indirect_reference.idnum}\s+{indirect_reference.generation}\s+obj".encode(), 

501 buf, 

502 ) 

503 if m is not None: 

504 logger_warning( 

505 f"Object {indirect_reference.idnum} {indirect_reference.generation} found", 

506 __name__, 

507 ) 

508 if indirect_reference.generation not in self.xref: 

509 self.xref[indirect_reference.generation] = {} 

510 self.xref[indirect_reference.generation][indirect_reference.idnum] = ( 

511 m.start(0) + 1 

512 ) 

513 self.stream.seek(m.end(0) + 1) 

514 skip_over_whitespace(self.stream) 

515 self.stream.seek(-1, 1) 

516 retval = read_object(self.stream, self) # type: ignore 

517 

518 # override encryption is used for the /Encrypt dictionary 

519 if not self._override_encryption and self._encryption is not None: 

520 # if we don't have the encryption key: 

521 if not self._encryption.is_decrypted(): 

522 raise FileNotDecryptedError("File has not been decrypted") 

523 # otherwise, decrypt here... 

524 retval = cast(PdfObject, retval) 

525 retval = self._encryption.decrypt_object( 

526 retval, indirect_reference.idnum, indirect_reference.generation 

527 ) 

528 else: 

529 logger_warning( 

530 f"Object {indirect_reference.idnum} {indirect_reference.generation} not defined.", 

531 __name__, 

532 ) 

533 if self.strict: 

534 raise PdfReadError("Could not find object.") 

535 self.cache_indirect_object( 

536 indirect_reference.generation, indirect_reference.idnum, retval 

537 ) 

538 return retval 

539 

540 def read_object_header(self, stream: StreamType) -> tuple[int, int]: 

541 # Should never be necessary to read out whitespace, since the 

542 # cross-reference table should put us in the right spot to read the 

543 # object header. In reality some files have stupid cross-reference 

544 # tables that are off by whitespace bytes. 

545 skip_over_comment(stream) 

546 extra = skip_over_whitespace(stream) 

547 stream.seek(-1, 1) 

548 idnum = read_until_whitespace(stream) 

549 extra |= skip_over_whitespace(stream) 

550 stream.seek(-1, 1) 

551 generation = read_until_whitespace(stream) 

552 extra |= skip_over_whitespace(stream) 

553 stream.seek(-1, 1) 

554 

555 # although it's not used, it might still be necessary to read 

556 _obj = stream.read(3) 

557 

558 read_non_whitespace(stream) 

559 stream.seek(-1, 1) 

560 if extra and self.strict: 

561 logger_warning( 

562 f"Superfluous whitespace found in object header {idnum} {generation}", # type: ignore 

563 __name__, 

564 ) 

565 return int(idnum), int(generation) 

566 

567 def cache_get_indirect_object( 

568 self, generation: int, idnum: int 

569 ) -> Optional[PdfObject]: 

570 try: 

571 return self.resolved_objects.get((generation, idnum)) 

572 except RecursionError: 

573 raise PdfReadError("Maximum recursion depth reached.") 

574 

575 def cache_indirect_object( 

576 self, generation: int, idnum: int, obj: Optional[PdfObject] 

577 ) -> Optional[PdfObject]: 

578 if (generation, idnum) in self.resolved_objects: 

579 msg = f"Overwriting cache for {generation} {idnum}" 

580 if self.strict: 

581 raise PdfReadError(msg) 

582 logger_warning(msg, __name__) 

583 self.resolved_objects[(generation, idnum)] = obj 

584 if obj is not None: 

585 obj.indirect_reference = IndirectObject(idnum, generation, self) 

586 return obj 

587 

588 def _replace_object(self, indirect: IndirectObject, obj: PdfObject) -> PdfObject: 

589 # function reserved for future development 

590 if indirect.pdf != self: 

591 raise ValueError("Cannot update PdfReader with external object") 

592 if (indirect.generation, indirect.idnum) not in self.resolved_objects: 

593 raise ValueError("Cannot find referenced object") 

594 self.resolved_objects[(indirect.generation, indirect.idnum)] = obj 

595 obj.indirect_reference = indirect 

596 return obj 

597 

598 def read(self, stream: StreamType) -> None: 

599 """ 

600 Read and process the PDF stream, extracting necessary data. 

601 

602 Args: 

603 stream: The PDF file stream. 

604 

605 """ 

606 self._basic_validation(stream) 

607 self._find_eof_marker(stream) 

608 startxref = self._find_startxref_pos(stream) 

609 self._startxref = startxref 

610 

611 # check and eventually correct the startxref only if not strict 

612 xref_issue_nr = self._get_xref_issues(stream, startxref) 

613 if xref_issue_nr != 0: 

614 if self.strict and xref_issue_nr: 

615 raise PdfReadError("Broken xref table") 

616 logger_warning(f"incorrect startxref pointer({xref_issue_nr})", __name__) 

617 

618 # read all cross-reference tables and their trailers 

619 self._read_xref_tables_and_trailers(stream, startxref, xref_issue_nr) 

620 

621 # if not zero-indexed, verify that the table is correct; change it if necessary 

622 if self.xref_index and not self.strict: 

623 loc = stream.tell() 

624 for gen, xref_entry in self.xref.items(): 

625 if gen == 65535: 

626 continue 

627 xref_k = sorted( 

628 xref_entry.keys() 

629 ) # ensure ascending to prevent damage 

630 for id in xref_k: 

631 stream.seek(xref_entry[id], 0) 

632 try: 

633 pid, _pgen = self.read_object_header(stream) 

634 except ValueError: 

635 self._rebuild_xref_table(stream) 

636 break 

637 if pid == id - self.xref_index: 

638 # fixing index item per item is required for revised PDF. 

639 self.xref[gen][pid] = self.xref[gen][id] 

640 del self.xref[gen][id] 

641 # if not, then either it's just plain wrong, or the 

642 # non-zero-index is actually correct 

643 stream.seek(loc, 0) # return to where it was 

644 

645 # remove wrong objects (not pointing to correct structures) - cf #2326 

646 if not self.strict: 

647 loc = stream.tell() 

648 for gen, xref_entry in self.xref.items(): 

649 if gen == 65535: 

650 continue 

651 ids = list(xref_entry.keys()) 

652 for id in ids: 

653 stream.seek(xref_entry[id], 0) 

654 try: 

655 self.read_object_header(stream) 

656 except ValueError: 

657 logger_warning( 

658 f"Ignoring wrong pointing object {id} {gen} (offset {xref_entry[id]})", 

659 __name__, 

660 ) 

661 del xref_entry[id] # we can delete the id, we are parsing ids 

662 stream.seek(loc, 0) # return to where it was 

663 

664 def _basic_validation(self, stream: StreamType) -> None: 

665 """Ensure the stream is valid and not empty.""" 

666 stream.seek(0, os.SEEK_SET) 

667 try: 

668 header_byte = stream.read(5) 

669 except UnicodeDecodeError: 

670 raise UnsupportedOperation("cannot read header") 

671 if header_byte == b"": 

672 raise EmptyFileError("Cannot read an empty file") 

673 if header_byte != b"%PDF-": 

674 if self.strict: 

675 raise PdfReadError( 

676 f"PDF starts with '{header_byte.decode('utf8')}', " 

677 "but '%PDF-' expected" 

678 ) 

679 logger_warning(f"invalid pdf header: {header_byte}", __name__) 

680 stream.seek(0, os.SEEK_END) 

681 

682 def _find_eof_marker(self, stream: StreamType) -> None: 

683 """ 

684 Jump to the %%EOF marker. 

685 

686 According to the specs, the %%EOF marker should be at the very end of 

687 the file. Hence for standard-compliant PDF documents this function will 

688 read only the last part (DEFAULT_BUFFER_SIZE). 

689 """ 

690 HEADER_SIZE = 8 # to parse whole file, Header is e.g. '%PDF-1.6' 

691 line = b"" 

692 first = True 

693 while not line.startswith(b"%%EOF"): 

694 if line != b"" and first: 

695 if any( 

696 line.strip().endswith(tr) for tr in (b"%%EO", b"%%E", b"%%", b"%") 

697 ): 

698 # Consider the file as truncated while 

699 # having enough confidence to carry on. 

700 logger_warning("EOF marker seems truncated", __name__) 

701 break 

702 first = False 

703 if b"startxref" in line: 

704 logger_warning( 

705 "CAUTION: startxref found while searching for %%EOF. " 

706 "The file might be truncated and some data might not be read.", 

707 __name__, 

708 ) 

709 if stream.tell() < HEADER_SIZE: 

710 if self.strict: 

711 raise PdfReadError("EOF marker not found") 

712 logger_warning("EOF marker not found", __name__) 

713 line = read_previous_line(stream) 

714 

715 def _find_startxref_pos(self, stream: StreamType) -> int: 

716 """ 

717 Find startxref entry - the location of the xref table. 

718 

719 Args: 

720 stream: 

721 

722 Returns: 

723 The bytes offset 

724 

725 """ 

726 line = read_previous_line(stream) 

727 try: 

728 startxref = int(line) 

729 except ValueError: 

730 # 'startxref' may be on the same line as the location 

731 if not line.startswith(b"startxref"): 

732 raise PdfReadError("startxref not found") 

733 startxref = int(line[9:].strip()) 

734 logger_warning("startxref on same line as offset", __name__) 

735 else: 

736 line = read_previous_line(stream) 

737 if not line.startswith(b"startxref"): 

738 raise PdfReadError("startxref not found") 

739 return startxref 

740 

741 def _read_standard_xref_table(self, stream: StreamType) -> None: 

742 # standard cross-reference table 

743 ref = stream.read(3) 

744 if ref != b"ref": 

745 raise PdfReadError("xref table read error") 

746 read_non_whitespace(stream) 

747 stream.seek(-1, 1) 

748 first_time = True # check if the first time looking at the xref table 

749 while True: 

750 num = cast(int, read_object(stream, self)) 

751 if first_time and num != 0: 

752 self.xref_index = num 

753 if self.strict: 

754 logger_warning( 

755 "Xref table not zero-indexed. ID numbers for objects will be corrected.", 

756 __name__, 

757 ) 

758 # if table not zero indexed, could be due to error from when PDF was created 

759 # which will lead to mismatched indices later on, only warned and corrected if self.strict==True 

760 first_time = False 

761 read_non_whitespace(stream) 

762 stream.seek(-1, 1) 

763 size = cast(int, read_object(stream, self)) 

764 if not isinstance(size, int): 

765 logger_warning( 

766 "Invalid/Truncated xref table. Rebuilding it.", 

767 __name__, 

768 ) 

769 self._rebuild_xref_table(stream) 

770 stream.read() 

771 return 

772 read_non_whitespace(stream) 

773 stream.seek(-1, 1) 

774 cnt = 0 

775 while cnt < size: 

776 line = stream.read(20) 

777 if not line: 

778 raise PdfReadError("Unexpected empty line in Xref table.") 

779 

780 # It's very clear in section 3.4.3 of the PDF spec 

781 # that all cross-reference table lines are a fixed 

782 # 20 bytes (as of PDF 1.7). However, some files have 

783 # 21-byte entries (or more) due to the use of \r\n 

784 # (CRLF) EOL's. Detect that case, and adjust the line 

785 # until it does not begin with a \r (CR) or \n (LF). 

786 while line[0] in b"\x0D\x0A": 

787 stream.seek(-20 + 1, 1) 

788 line = stream.read(20) 

789 

790 # On the other hand, some malformed PDF files 

791 # use a single character EOL without a preceding 

792 # space. Detect that case, and seek the stream 

793 # back one character (0-9 means we've bled into 

794 # the next xref entry, t means we've bled into the 

795 # text "trailer"): 

796 if line[-1] in b"0123456789t": 

797 stream.seek(-1, 1) 

798 

799 try: 

800 offset_b, generation_b = line[:16].split(b" ") 

801 entry_type_b = line[17:18] 

802 

803 offset, generation = int(offset_b), int(generation_b) 

804 except Exception: 

805 if hasattr(stream, "getbuffer"): 

806 buf = bytes(stream.getbuffer()) 

807 else: 

808 p = stream.tell() 

809 stream.seek(0, 0) 

810 buf = stream.read(-1) 

811 stream.seek(p) 

812 

813 f = re.search(rf"{num}\s+(\d+)\s+obj".encode(), buf) 

814 if f is None: 

815 logger_warning( 

816 f"entry {num} in Xref table invalid; object not found", 

817 __name__, 

818 ) 

819 generation = 65535 

820 offset = -1 

821 entry_type_b = b"f" 

822 else: 

823 logger_warning( 

824 f"entry {num} in Xref table invalid but object found", 

825 __name__, 

826 ) 

827 generation = int(f.group(1)) 

828 offset = f.start() 

829 

830 if generation not in self.xref: 

831 self.xref[generation] = {} 

832 self.xref_free_entry[generation] = {} 

833 if num in self.xref[generation]: 

834 # It really seems like we should allow the last 

835 # xref table in the file to override previous 

836 # ones. Since we read the file backwards, assume 

837 # any existing key is already set correctly. 

838 pass 

839 else: 

840 if entry_type_b == b"n": 

841 self.xref[generation][num] = offset 

842 try: 

843 self.xref_free_entry[generation][num] = entry_type_b == b"f" 

844 except Exception: 

845 pass 

846 try: 

847 self.xref_free_entry[65535][num] = entry_type_b == b"f" 

848 except Exception: 

849 pass 

850 cnt += 1 

851 num += 1 

852 read_non_whitespace(stream) 

853 stream.seek(-1, 1) 

854 trailer_tag = stream.read(7) 

855 if trailer_tag != b"trailer": 

856 # more xrefs! 

857 stream.seek(-7, 1) 

858 else: 

859 break 

860 

861 def _read_xref_tables_and_trailers( 

862 self, stream: StreamType, startxref: Optional[int], xref_issue_nr: int 

863 ) -> None: 

864 """Read the cross-reference tables and trailers in the PDF stream.""" 

865 self.xref = {} 

866 self.xref_free_entry = {} 

867 self.xref_objStm = {} 

868 self.trailer = DictionaryObject() 

869 while startxref is not None: 

870 # load the xref table 

871 stream.seek(startxref, 0) 

872 x = stream.read(1) 

873 if x in b"\r\n": 

874 x = stream.read(1) 

875 if x == b"x": 

876 startxref = self._read_xref(stream) 

877 elif xref_issue_nr: 

878 try: 

879 self._rebuild_xref_table(stream) 

880 break 

881 except Exception: 

882 xref_issue_nr = 0 

883 elif x.isdigit(): 

884 try: 

885 xrefstream = self._read_pdf15_xref_stream(stream) 

886 except Exception as e: 

887 if TK.ROOT in self.trailer: 

888 logger_warning( 

889 f"Previous trailer cannot be read: {e.args}", __name__ 

890 ) 

891 break 

892 raise PdfReadError(f"Trailer cannot be read: {e!s}") 

893 self._process_xref_stream(xrefstream) 

894 if "/Prev" in xrefstream: 

895 startxref = cast(int, xrefstream["/Prev"]) 

896 else: 

897 break 

898 else: 

899 startxref = self._read_xref_other_error(stream, startxref) 

900 

901 def _process_xref_stream(self, xrefstream: DictionaryObject) -> None: 

902 """Process and handle the xref stream.""" 

903 trailer_keys = TK.ROOT, TK.ENCRYPT, TK.INFO, TK.ID, TK.SIZE 

904 for key in trailer_keys: 

905 if key in xrefstream and key not in self.trailer: 

906 self.trailer[NameObject(key)] = xrefstream.raw_get(key) 

907 if "/XRefStm" in xrefstream: 

908 p = self.stream.tell() 

909 self.stream.seek(cast(int, xrefstream["/XRefStm"]) + 1, 0) 

910 self._read_pdf15_xref_stream(self.stream) 

911 self.stream.seek(p, 0) 

912 

913 def _read_xref(self, stream: StreamType) -> Optional[int]: 

914 self._read_standard_xref_table(stream) 

915 if stream.read(1) == b"": 

916 return None 

917 stream.seek(-1, 1) 

918 read_non_whitespace(stream) 

919 stream.seek(-1, 1) 

920 new_trailer = cast(dict[str, Any], read_object(stream, self)) 

921 for key, value in new_trailer.items(): 

922 if key not in self.trailer: 

923 self.trailer[key] = value 

924 if "/XRefStm" in new_trailer: 

925 p = stream.tell() 

926 stream.seek(cast(int, new_trailer["/XRefStm"]) + 1, 0) 

927 try: 

928 self._read_pdf15_xref_stream(stream) 

929 except Exception: 

930 logger_warning( 

931 f"XRef object at {new_trailer['/XRefStm']} can not be read, some object may be missing", 

932 __name__, 

933 ) 

934 stream.seek(p, 0) 

935 if "/Prev" in new_trailer: 

936 return new_trailer["/Prev"] 

937 return None 

938 

939 def _read_xref_other_error( 

940 self, stream: StreamType, startxref: int 

941 ) -> Optional[int]: 

942 # some PDFs have /Prev=0 in the trailer, instead of no /Prev 

943 if startxref == 0: 

944 if self.strict: 

945 raise PdfReadError( 

946 "/Prev=0 in the trailer (try opening with strict=False)" 

947 ) 

948 logger_warning( 

949 "/Prev=0 in the trailer - assuming there is no previous xref table", 

950 __name__, 

951 ) 

952 return None 

953 # bad xref character at startxref. Let's see if we can find 

954 # the xref table nearby, as we've observed this error with an 

955 # off-by-one before. 

956 stream.seek(-11, 1) 

957 tmp = stream.read(20) 

958 xref_loc = tmp.find(b"xref") 

959 if xref_loc != -1: 

960 startxref -= 10 - xref_loc 

961 return startxref 

962 # No explicit xref table, try finding a cross-reference stream. 

963 stream.seek(startxref, 0) 

964 for look in range(25): # value extended to cope with more linearized files 

965 if stream.read(1).isdigit(): 

966 # This is not a standard PDF, consider adding a warning 

967 startxref += look 

968 return startxref 

969 # no xref table found at specified location 

970 if "/Root" in self.trailer and not self.strict: 

971 # if Root has been already found, just raise warning 

972 logger_warning("Invalid parent xref., rebuild xref", __name__) 

973 try: 

974 self._rebuild_xref_table(stream) 

975 return None 

976 except Exception: 

977 raise PdfReadError("Cannot rebuild xref") 

978 raise PdfReadError("Could not find xref table at specified location") 

979 

980 def _read_pdf15_xref_stream( 

981 self, stream: StreamType 

982 ) -> Union[ContentStream, EncodedStreamObject, DecodedStreamObject]: 

983 """Read the cross-reference stream for PDF 1.5+.""" 

984 stream.seek(-1, 1) 

985 idnum, generation = self.read_object_header(stream) 

986 xrefstream = cast(ContentStream, read_object(stream, self)) 

987 if cast(str, xrefstream["/Type"]) != "/XRef": 

988 raise PdfReadError(f"Unexpected type {xrefstream['/Type']!r}") 

989 self.cache_indirect_object(generation, idnum, xrefstream) 

990 

991 # Index pairs specify the subsections in the dictionary. 

992 # If none, create one subsection that spans everything. 

993 if "/Size" not in xrefstream: 

994 # According to table 17 of the PDF 2.0 specification, this key is required. 

995 raise PdfReadError(f"Size missing from XRef stream {xrefstream!r}!") 

996 idx_pairs = xrefstream.get("/Index", [0, xrefstream["/Size"]]) 

997 

998 entry_sizes = cast(dict[Any, Any], xrefstream.get("/W")) 

999 assert len(entry_sizes) >= 3 

1000 if self.strict and len(entry_sizes) > 3: 

1001 raise PdfReadError(f"Too many entry sizes: {entry_sizes}") 

1002 

1003 stream_data = BytesIO(xrefstream.get_data()) 

1004 

1005 def get_entry(i: int) -> Union[int, tuple[int, ...]]: 

1006 # Reads the correct number of bytes for each entry. See the 

1007 # discussion of the W parameter in PDF spec table 17. 

1008 if entry_sizes[i] > 0: 

1009 d = stream_data.read(entry_sizes[i]) 

1010 return convert_to_int(d, entry_sizes[i]) 

1011 

1012 # PDF Spec Table 17: A value of zero for an element in the 

1013 # W array indicates...the default value shall be used 

1014 if i == 0: 

1015 return 1 # First value defaults to 1 

1016 return 0 

1017 

1018 def used_before(num: int, generation: Union[int, tuple[int, ...]]) -> bool: 

1019 # We move backwards through the xrefs, don't replace any. 

1020 return num in self.xref.get(generation, []) or num in self.xref_objStm # type: ignore 

1021 

1022 # Iterate through each subsection 

1023 self._read_xref_subsections(idx_pairs, get_entry, used_before) 

1024 return xrefstream 

1025 

1026 @staticmethod 

1027 def _get_xref_issues(stream: StreamType, startxref: int) -> int: 

1028 """ 

1029 Return an int which indicates an issue. 0 means there is no issue. 

1030 

1031 Args: 

1032 stream: 

1033 startxref: 

1034 

1035 Returns: 

1036 0 means no issue, other values represent specific issues. 

1037 

1038 """ 

1039 if startxref == 0: 

1040 return 4 

1041 

1042 stream.seek(startxref - 1, 0) # -1 to check character before 

1043 line = stream.read(1) 

1044 if line == b"j": 

1045 line = stream.read(1) 

1046 if line not in b"\r\n \t": 

1047 return 1 

1048 line = stream.read(4) 

1049 if line != b"xref": 

1050 # not a xref so check if it is an XREF object 

1051 line = b"" 

1052 while line in b"0123456789 \t": 

1053 line = stream.read(1) 

1054 if line == b"": 

1055 return 2 

1056 line += stream.read(2) # 1 char already read, +2 to check "obj" 

1057 if line.lower() != b"obj": 

1058 return 3 

1059 return 0 

1060 

1061 @classmethod 

1062 def _find_pdf_objects(cls, data: bytes) -> Iterable[tuple[int, int, int]]: 

1063 index = 0 

1064 ord_0 = ord("0") 

1065 ord_9 = ord("9") 

1066 while True: 

1067 index = data.find(b" obj", index) 

1068 if index == -1: 

1069 return 

1070 

1071 index_before_space = index - 1 

1072 

1073 # Skip whitespace backwards 

1074 while index_before_space >= 0 and data[index_before_space] in WHITESPACES_AS_BYTES: 

1075 index_before_space -= 1 

1076 

1077 # Read generation number 

1078 generation_end = index_before_space + 1 

1079 while index_before_space >= 0 and ord_0 <= data[index_before_space] <= ord_9: 

1080 index_before_space -= 1 

1081 generation_start = index_before_space + 1 

1082 

1083 # Skip whitespace 

1084 while index_before_space >= 0 and data[index_before_space] in WHITESPACES_AS_BYTES: 

1085 index_before_space -= 1 

1086 

1087 # Read object number 

1088 object_end = index_before_space + 1 

1089 while index_before_space >= 0 and ord_0 <= data[index_before_space] <= ord_9: 

1090 index_before_space -= 1 

1091 object_start = index_before_space + 1 

1092 

1093 # Validate 

1094 if object_start < object_end and generation_start < generation_end: 

1095 object_number = int(data[object_start:object_end]) 

1096 generation_number = int(data[generation_start:generation_end]) 

1097 

1098 yield object_number, generation_number, object_start 

1099 

1100 index += 4 # len(b" obj") 

1101 

1102 @classmethod 

1103 def _find_pdf_trailers(cls, data: bytes) -> Iterable[int]: 

1104 index = 0 

1105 data_length = len(data) 

1106 while True: 

1107 index = data.find(b"trailer", index) 

1108 if index == -1: 

1109 return 

1110 

1111 index_after_trailer = index + 7 # len(b"trailer") 

1112 

1113 # Skip whitespace 

1114 while index_after_trailer < data_length and data[index_after_trailer] in WHITESPACES_AS_BYTES: 

1115 index_after_trailer += 1 

1116 

1117 # Must be dictionary start 

1118 if index_after_trailer + 1 < data_length and data[index_after_trailer:index_after_trailer+2] == b"<<": 

1119 yield index_after_trailer # offset of '<<' 

1120 

1121 index += 7 # len(b"trailer") 

1122 

1123 def _rebuild_xref_table(self, stream: StreamType) -> None: 

1124 self.xref = {} 

1125 stream.seek(0, 0) 

1126 stream_data = stream.read(-1) 

1127 

1128 for object_number, generation_number, object_start in self._find_pdf_objects(stream_data): 

1129 if generation_number not in self.xref: 

1130 self.xref[generation_number] = {} 

1131 self.xref[generation_number][object_number] = object_start 

1132 

1133 logger_warning("parsing for Object Streams", __name__) 

1134 for generation_number in self.xref: 

1135 for object_number in self.xref[generation_number]: 

1136 # get_object in manual 

1137 stream.seek(self.xref[generation_number][object_number], 0) 

1138 try: 

1139 _ = self.read_object_header(stream) 

1140 obj = cast(StreamObject, read_object(stream, self)) 

1141 if obj.get("/Type", "") != "/ObjStm": 

1142 continue 

1143 object_stream = BytesIO(obj.get_data()) 

1144 actual_count = 0 

1145 while True: 

1146 current = read_until_whitespace(object_stream) 

1147 if not current.isdigit(): 

1148 break 

1149 inner_object_number = int(current) 

1150 skip_over_whitespace(object_stream) 

1151 object_stream.seek(-1, 1) 

1152 current = read_until_whitespace(object_stream) 

1153 if not current.isdigit(): # pragma: no cover 

1154 break # pragma: no cover 

1155 inner_generation_number = int(current) 

1156 self.xref_objStm[inner_object_number] = (object_number, inner_generation_number) 

1157 actual_count += 1 

1158 if actual_count != obj.get("/N"): # pragma: no cover 

1159 logger_warning( # pragma: no cover 

1160 f"found {actual_count} objects within Object({object_number},{generation_number})" 

1161 f" whereas {obj.get('/N')} expected", 

1162 __name__, 

1163 ) 

1164 except Exception: # could be multiple causes 

1165 pass 

1166 

1167 stream.seek(0, 0) 

1168 for position in self._find_pdf_trailers(stream_data): 

1169 stream.seek(position, 0) 

1170 new_trailer = cast(dict[Any, Any], read_object(stream, self)) 

1171 # Here, we are parsing the file from start to end, the new data have to erase the existing. 

1172 for key, value in new_trailer.items(): 

1173 self.trailer[key] = value 

1174 

1175 def _read_xref_subsections( 

1176 self, 

1177 idx_pairs: list[int], 

1178 get_entry: Callable[[int], Union[int, tuple[int, ...]]], 

1179 used_before: Callable[[int, Union[int, tuple[int, ...]]], bool], 

1180 ) -> None: 

1181 """Read and process the subsections of the xref.""" 

1182 for start, size in self._pairs(idx_pairs): 

1183 # The subsections must increase 

1184 for num in range(start, start + size): 

1185 # The first entry is the type 

1186 xref_type = get_entry(0) 

1187 # The rest of the elements depend on the xref_type 

1188 if xref_type == 0: 

1189 # linked list of free objects 

1190 next_free_object = get_entry(1) # noqa: F841 

1191 next_generation = get_entry(2) # noqa: F841 

1192 elif xref_type == 1: 

1193 # objects that are in use but are not compressed 

1194 byte_offset = get_entry(1) 

1195 generation = get_entry(2) 

1196 if generation not in self.xref: 

1197 self.xref[generation] = {} # type: ignore 

1198 if not used_before(num, generation): 

1199 self.xref[generation][num] = byte_offset # type: ignore 

1200 elif xref_type == 2: 

1201 # compressed objects 

1202 objstr_num = get_entry(1) 

1203 obstr_idx = get_entry(2) 

1204 generation = 0 # PDF spec table 18, generation is 0 

1205 if not used_before(num, generation): 

1206 self.xref_objStm[num] = (objstr_num, obstr_idx) 

1207 elif self.strict: 

1208 raise PdfReadError(f"Unknown xref type: {xref_type}") 

1209 

1210 def _pairs(self, array: list[int]) -> Iterable[tuple[int, int]]: 

1211 """Iterate over pairs in the array.""" 

1212 i = 0 

1213 while i + 1 < len(array): 

1214 yield array[i], array[i + 1] 

1215 i += 2 

1216 

1217 def decrypt(self, password: Union[str, bytes]) -> PasswordType: 

1218 """ 

1219 When using an encrypted / secured PDF file with the PDF Standard 

1220 encryption handler, this function will allow the file to be decrypted. 

1221 It checks the given password against the document's user password and 

1222 owner password, and then stores the resulting decryption key if either 

1223 password is correct. 

1224 

1225 It does not matter which password was matched. Both passwords provide 

1226 the correct decryption key that will allow the document to be used with 

1227 this library. 

1228 

1229 Args: 

1230 password: The password to match. 

1231 

1232 Returns: 

1233 An indicator if the document was decrypted and whether it was the 

1234 owner password or the user password. 

1235 

1236 """ 

1237 if not self._encryption: 

1238 raise PdfReadError("Not encrypted file") 

1239 # TODO: raise Exception for wrong password 

1240 return self._encryption.verify(password) 

1241 

1242 @property 

1243 def is_encrypted(self) -> bool: 

1244 """ 

1245 Read-only boolean property showing whether this PDF file is encrypted. 

1246 

1247 Note that this property, if true, will remain true even after the 

1248 :meth:`decrypt()<pypdf.PdfReader.decrypt>` method is called. 

1249 """ 

1250 return TK.ENCRYPT in self.trailer 

1251 

1252 def add_form_topname(self, name: str) -> Optional[DictionaryObject]: 

1253 """ 

1254 Add a top level form that groups all form fields below it. 

1255 

1256 Args: 

1257 name: text string of the "/T" Attribute of the created object 

1258 

1259 Returns: 

1260 The created object. ``None`` means no object was created. 

1261 

1262 """ 

1263 catalog = self.root_object 

1264 

1265 if "/AcroForm" not in catalog or not isinstance( 

1266 catalog["/AcroForm"], DictionaryObject 

1267 ): 

1268 return None 

1269 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")]) 

1270 if "/Fields" not in acroform: 

1271 # TODO: No error but this may be extended for XFA Forms 

1272 return None 

1273 

1274 interim = DictionaryObject() 

1275 interim[NameObject("/T")] = TextStringObject(name) 

1276 interim[NameObject("/Kids")] = acroform[NameObject("/Fields")] 

1277 self.cache_indirect_object( 

1278 0, 

1279 max(i for (g, i) in self.resolved_objects if g == 0) + 1, 

1280 interim, 

1281 ) 

1282 arr = ArrayObject() 

1283 arr.append(interim.indirect_reference) 

1284 acroform[NameObject("/Fields")] = arr 

1285 for o in cast(ArrayObject, interim["/Kids"]): 

1286 obj = o.get_object() 

1287 if "/Parent" in obj: 

1288 logger_warning( 

1289 f"Top Level Form Field {obj.indirect_reference} have a non-expected parent", 

1290 __name__, 

1291 ) 

1292 obj[NameObject("/Parent")] = interim.indirect_reference 

1293 return interim 

1294 

1295 def rename_form_topname(self, name: str) -> Optional[DictionaryObject]: 

1296 """ 

1297 Rename top level form field that all form fields below it. 

1298 

1299 Args: 

1300 name: text string of the "/T" field of the created object 

1301 

1302 Returns: 

1303 The modified object. ``None`` means no object was modified. 

1304 

1305 """ 

1306 catalog = self.root_object 

1307 

1308 if "/AcroForm" not in catalog or not isinstance( 

1309 catalog["/AcroForm"], DictionaryObject 

1310 ): 

1311 return None 

1312 acroform = cast(DictionaryObject, catalog[NameObject("/AcroForm")]) 

1313 if "/Fields" not in acroform: 

1314 return None 

1315 

1316 interim = cast( 

1317 DictionaryObject, 

1318 cast(ArrayObject, acroform[NameObject("/Fields")])[0].get_object(), 

1319 ) 

1320 interim[NameObject("/T")] = TextStringObject(name) 

1321 return interim 

1322 

1323 def _repr_mimebundle_( 

1324 self, 

1325 include: Union[None, Iterable[str]] = None, 

1326 exclude: Union[None, Iterable[str]] = None, 

1327 ) -> dict[str, Any]: 

1328 """ 

1329 Integration into Jupyter Notebooks. 

1330 

1331 This method returns a dictionary that maps a mime-type to its 

1332 representation. 

1333 

1334 .. seealso:: 

1335 

1336 https://ipython.readthedocs.io/en/stable/config/integrating.html 

1337 """ 

1338 self.stream.seek(0) 

1339 pdf_data = self.stream.read() 

1340 data = { 

1341 "application/pdf": pdf_data, 

1342 } 

1343 

1344 if include is not None: 

1345 # Filter representations based on include list 

1346 data = {k: v for k, v in data.items() if k in include} 

1347 

1348 if exclude is not None: 

1349 # Remove representations based on exclude list 

1350 data = {k: v for k, v in data.items() if k not in exclude} 

1351 

1352 return data