Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/PdfParser.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

690 statements  

1from __future__ import annotations 

2 

3import calendar 

4import codecs 

5import collections 

6import mmap 

7import os 

8import re 

9import time 

10import zlib 

11from typing import Any, NamedTuple 

12 

13TYPE_CHECKING = False 

14if TYPE_CHECKING: 

15 from typing import IO 

16 

17 _DictBase = collections.UserDict[str | bytes, Any] 

18else: 

19 _DictBase = collections.UserDict 

20 

21 

22# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set 

23# on page 656 

24def encode_text(s: str) -> bytes: 

25 return codecs.BOM_UTF16_BE + s.encode("utf_16_be") 

26 

27 

28PDFDocEncoding = { 

29 0x16: "\u0017", 

30 0x18: "\u02d8", 

31 0x19: "\u02c7", 

32 0x1A: "\u02c6", 

33 0x1B: "\u02d9", 

34 0x1C: "\u02dd", 

35 0x1D: "\u02db", 

36 0x1E: "\u02da", 

37 0x1F: "\u02dc", 

38 0x80: "\u2022", 

39 0x81: "\u2020", 

40 0x82: "\u2021", 

41 0x83: "\u2026", 

42 0x84: "\u2014", 

43 0x85: "\u2013", 

44 0x86: "\u0192", 

45 0x87: "\u2044", 

46 0x88: "\u2039", 

47 0x89: "\u203a", 

48 0x8A: "\u2212", 

49 0x8B: "\u2030", 

50 0x8C: "\u201e", 

51 0x8D: "\u201c", 

52 0x8E: "\u201d", 

53 0x8F: "\u2018", 

54 0x90: "\u2019", 

55 0x91: "\u201a", 

56 0x92: "\u2122", 

57 0x93: "\ufb01", 

58 0x94: "\ufb02", 

59 0x95: "\u0141", 

60 0x96: "\u0152", 

61 0x97: "\u0160", 

62 0x98: "\u0178", 

63 0x99: "\u017d", 

64 0x9A: "\u0131", 

65 0x9B: "\u0142", 

66 0x9C: "\u0153", 

67 0x9D: "\u0161", 

68 0x9E: "\u017e", 

69 0xA0: "\u20ac", 

70} 

71 

72 

73def decode_text(b: bytes) -> str: 

74 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: 

75 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") 

76 else: 

77 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b) 

78 

79 

80class PdfFormatError(RuntimeError): 

81 """An error that probably indicates a syntactic or semantic error in the 

82 PDF file structure""" 

83 

84 pass 

85 

86 

87def check_format_condition(condition: bool, error_message: str) -> None: 

88 if not condition: 

89 raise PdfFormatError(error_message) 

90 

91 

92class IndirectReferenceTuple(NamedTuple): 

93 object_id: int 

94 generation: int 

95 

96 

97class IndirectReference(IndirectReferenceTuple): 

98 def __str__(self) -> str: 

99 return f"{self.object_id} {self.generation} R" 

100 

101 def __bytes__(self) -> bytes: 

102 return self.__str__().encode("us-ascii") 

103 

104 def __eq__(self, other: object) -> bool: 

105 if self.__class__ is not other.__class__: 

106 return False 

107 assert isinstance(other, IndirectReference) 

108 return other.object_id == self.object_id and other.generation == self.generation 

109 

110 def __ne__(self, other: object) -> bool: 

111 return not (self == other) 

112 

113 def __hash__(self) -> int: 

114 return hash((self.object_id, self.generation)) 

115 

116 

117class IndirectObjectDef(IndirectReference): 

118 def __str__(self) -> str: 

119 return f"{self.object_id} {self.generation} obj" 

120 

121 

122class XrefTable: 

123 def __init__(self) -> None: 

124 self.existing_entries: dict[int, tuple[int, int]] = ( 

125 {} 

126 ) # object ID => (offset, generation) 

127 self.new_entries: dict[int, tuple[int, int]] = ( 

128 {} 

129 ) # object ID => (offset, generation) 

130 self.deleted_entries = {0: 65536} # object ID => generation 

131 self.reading_finished = False 

132 

133 def __setitem__(self, key: int, value: tuple[int, int]) -> None: 

134 if self.reading_finished: 

135 self.new_entries[key] = value 

136 else: 

137 self.existing_entries[key] = value 

138 if key in self.deleted_entries: 

139 del self.deleted_entries[key] 

140 

141 def __getitem__(self, key: int) -> tuple[int, int]: 

142 try: 

143 return self.new_entries[key] 

144 except KeyError: 

145 return self.existing_entries[key] 

146 

147 def __delitem__(self, key: int) -> None: 

148 if key in self.new_entries: 

149 generation = self.new_entries[key][1] + 1 

150 del self.new_entries[key] 

151 self.deleted_entries[key] = generation 

152 elif key in self.existing_entries: 

153 generation = self.existing_entries[key][1] + 1 

154 self.deleted_entries[key] = generation 

155 elif key in self.deleted_entries: 

156 generation = self.deleted_entries[key] 

157 else: 

158 msg = f"object ID {key} cannot be deleted because it doesn't exist" 

159 raise IndexError(msg) 

160 

161 def __contains__(self, key: int) -> bool: 

162 return key in self.existing_entries or key in self.new_entries 

163 

164 def __len__(self) -> int: 

165 return len( 

166 set(self.existing_entries.keys()) 

167 | set(self.new_entries.keys()) 

168 | set(self.deleted_entries.keys()) 

169 ) 

170 

171 def keys(self) -> set[int]: 

172 return ( 

173 set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) 

174 ) | set(self.new_entries.keys()) 

175 

176 def write(self, f: IO[bytes]) -> int: 

177 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) 

178 deleted_keys = sorted(set(self.deleted_entries.keys())) 

179 startxref = f.tell() 

180 f.write(b"xref\n") 

181 while keys: 

182 # find a contiguous sequence of object IDs 

183 prev: int | None = None 

184 for index, key in enumerate(keys): 

185 if prev is None or prev + 1 == key: 

186 prev = key 

187 else: 

188 contiguous_keys = keys[:index] 

189 keys = keys[index:] 

190 break 

191 else: 

192 contiguous_keys = keys 

193 keys = [] 

194 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) 

195 for object_id in contiguous_keys: 

196 if object_id in self.new_entries: 

197 f.write(b"%010d %05d n \n" % self.new_entries[object_id]) 

198 else: 

199 this_deleted_object_id = deleted_keys.pop(0) 

200 check_format_condition( 

201 object_id == this_deleted_object_id, 

202 f"expected the next deleted object ID to be {object_id}, " 

203 f"instead found {this_deleted_object_id}", 

204 ) 

205 try: 

206 next_in_linked_list = deleted_keys[0] 

207 except IndexError: 

208 next_in_linked_list = 0 

209 f.write( 

210 b"%010d %05d f \n" 

211 % (next_in_linked_list, self.deleted_entries[object_id]) 

212 ) 

213 return startxref 

214 

215 

216class PdfName: 

217 name: bytes 

218 

219 def __init__(self, name: PdfName | bytes | str) -> None: 

220 if isinstance(name, PdfName): 

221 self.name = name.name 

222 elif isinstance(name, bytes): 

223 self.name = name 

224 else: 

225 self.name = name.encode("us-ascii") 

226 

227 def name_as_str(self) -> str: 

228 return self.name.decode("us-ascii") 

229 

230 def __eq__(self, other: object) -> bool: 

231 return ( 

232 isinstance(other, PdfName) and other.name == self.name 

233 ) or other == self.name 

234 

235 def __hash__(self) -> int: 

236 return hash(self.name) 

237 

238 def __repr__(self) -> str: 

239 return f"{self.__class__.__name__}({repr(self.name)})" 

240 

241 @classmethod 

242 def from_pdf_stream(cls, data: bytes) -> PdfName: 

243 return cls(PdfParser.interpret_name(data)) 

244 

245 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} 

246 

247 def __bytes__(self) -> bytes: 

248 result = bytearray(b"/") 

249 for b in self.name: 

250 if b in self.allowed_chars: 

251 result.append(b) 

252 else: 

253 result.extend(b"#%02X" % b) 

254 return bytes(result) 

255 

256 

257class PdfArray(list[Any]): 

258 def __bytes__(self) -> bytes: 

259 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]" 

260 

261 

262class PdfDict(_DictBase): 

263 def __setattr__(self, key: str, value: Any) -> None: 

264 if key == "data": 

265 collections.UserDict.__setattr__(self, key, value) 

266 else: 

267 self[key.encode("us-ascii")] = value 

268 

269 def __getattr__(self, key: str) -> str | time.struct_time: 

270 try: 

271 value = self[key.encode("us-ascii")] 

272 except KeyError as e: 

273 raise AttributeError(key) from e 

274 if isinstance(value, bytes): 

275 value = decode_text(value) 

276 if key.endswith("Date"): 

277 if value.startswith("D:"): 

278 value = value[2:] 

279 

280 relationship = "Z" 

281 if len(value) > 17: 

282 relationship = value[14] 

283 offset = int(value[15:17]) * 60 

284 if len(value) > 20: 

285 offset += int(value[18:20]) 

286 

287 format = "%Y%m%d%H%M%S"[: len(value) - 2] 

288 value = time.strptime(value[: len(format) + 2], format) 

289 if relationship in ["+", "-"]: 

290 offset *= 60 

291 if relationship == "+": 

292 offset *= -1 

293 value = time.gmtime(calendar.timegm(value) + offset) 

294 return value 

295 

296 def __bytes__(self) -> bytes: 

297 out = bytearray(b"<<") 

298 for key, value in self.items(): 

299 if value is None: 

300 continue 

301 value = pdf_repr(value) 

302 out.extend(b"\n") 

303 out.extend(bytes(PdfName(key))) 

304 out.extend(b" ") 

305 out.extend(value) 

306 out.extend(b"\n>>") 

307 return bytes(out) 

308 

309 

310class PdfBinary: 

311 def __init__(self, data: list[int] | bytes) -> None: 

312 self.data = data 

313 

314 def __bytes__(self) -> bytes: 

315 return b"<%s>" % b"".join(b"%02X" % b for b in self.data) 

316 

317 

318class PdfStream: 

319 def __init__(self, dictionary: PdfDict, buf: bytes) -> None: 

320 self.dictionary = dictionary 

321 self.buf = buf 

322 

323 def decode(self) -> bytes: 

324 try: 

325 filter = self.dictionary[b"Filter"] 

326 except KeyError: 

327 return self.buf 

328 if filter == b"FlateDecode": 

329 try: 

330 expected_length = self.dictionary[b"DL"] 

331 except KeyError: 

332 expected_length = self.dictionary[b"Length"] 

333 return zlib.decompress(self.buf, bufsize=int(expected_length)) 

334 else: 

335 msg = f"stream filter {repr(filter)} unknown/unsupported" 

336 raise NotImplementedError(msg) 

337 

338 

339def pdf_repr(x: Any) -> bytes: 

340 if x is True: 

341 return b"true" 

342 elif x is False: 

343 return b"false" 

344 elif x is None: 

345 return b"null" 

346 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)): 

347 return bytes(x) 

348 elif isinstance(x, (int, float)): 

349 return str(x).encode("us-ascii") 

350 elif isinstance(x, time.struct_time): 

351 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")" 

352 elif isinstance(x, dict): 

353 return bytes(PdfDict(x)) 

354 elif isinstance(x, list): 

355 return bytes(PdfArray(x)) 

356 elif isinstance(x, str): 

357 return pdf_repr(encode_text(x)) 

358 elif isinstance(x, bytes): 

359 # XXX escape more chars? handle binary garbage 

360 x = x.replace(b"\\", b"\\\\") 

361 x = x.replace(b"(", b"\\(") 

362 x = x.replace(b")", b"\\)") 

363 return b"(" + x + b")" 

364 else: 

365 return bytes(x) 

366 

367 

368class PdfParser: 

369 """Based on 

370 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf 

371 Supports PDF up to 1.4 

372 """ 

373 

374 def __init__( 

375 self, 

376 filename: str | None = None, 

377 f: IO[bytes] | None = None, 

378 buf: bytes | bytearray | None = None, 

379 start_offset: int = 0, 

380 mode: str = "rb", 

381 ) -> None: 

382 if buf and f: 

383 msg = "specify buf or f or filename, but not both buf and f" 

384 raise RuntimeError(msg) 

385 self.filename = filename 

386 self.buf: bytes | bytearray | memoryview | mmap.mmap | None = buf 

387 self.f = f 

388 self.start_offset = start_offset 

389 self.should_close_buf = False 

390 self.should_close_file = False 

391 if filename is not None and f is None: 

392 self.f = f = open(filename, mode) 

393 self.should_close_file = True 

394 if f is not None: 

395 self.buf = self.get_buf_from_file(f) 

396 self.should_close_buf = True 

397 if not filename and hasattr(f, "name"): 

398 self.filename = f.name 

399 self.cached_objects: dict[IndirectReference, Any] = {} 

400 self.root_ref: IndirectReference | None 

401 self.info_ref: IndirectReference | None 

402 self.pages_ref: IndirectReference | None 

403 self.last_xref_section_offset: int | None 

404 if self.buf: 

405 try: 

406 self.read_pdf_info() 

407 except PdfFormatError: 

408 self.close() 

409 raise 

410 else: 

411 self.file_size_total = self.file_size_this = 0 

412 self.root = PdfDict() 

413 self.root_ref = None 

414 self.info = PdfDict() 

415 self.info_ref = None 

416 self.page_tree_root = PdfDict() 

417 self.pages: list[IndirectReference] = [] 

418 self.orig_pages: list[IndirectReference] = [] 

419 self.pages_ref = None 

420 self.last_xref_section_offset = None 

421 self.trailer_dict: dict[bytes, Any] = {} 

422 self.xref_table = XrefTable() 

423 self.xref_table.reading_finished = True 

424 if f: 

425 self.seek_end() 

426 

427 def __enter__(self) -> PdfParser: 

428 return self 

429 

430 def __exit__(self, *args: object) -> None: 

431 self.close() 

432 

433 def start_writing(self) -> None: 

434 self.close_buf() 

435 self.seek_end() 

436 

437 def close_buf(self) -> None: 

438 if isinstance(self.buf, memoryview): 

439 self.buf.release() 

440 elif isinstance(self.buf, mmap.mmap): 

441 self.buf.close() 

442 self.buf = None 

443 

444 def close(self) -> None: 

445 if self.should_close_buf: 

446 self.close_buf() 

447 if self.f is not None and self.should_close_file: 

448 self.f.close() 

449 self.f = None 

450 

451 def seek_end(self) -> None: 

452 assert self.f is not None 

453 self.f.seek(0, os.SEEK_END) 

454 

455 def write_header(self) -> None: 

456 assert self.f is not None 

457 self.f.write(b"%PDF-1.4\n") 

458 

459 def write_comment(self, s: str) -> None: 

460 assert self.f is not None 

461 self.f.write(f"% {s}\n".encode()) 

462 

463 def write_catalog(self) -> IndirectReference: 

464 assert self.f is not None 

465 self.del_root() 

466 self.root_ref = self.next_object_id(self.f.tell()) 

467 self.pages_ref = self.next_object_id(0) 

468 self.rewrite_pages() 

469 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref) 

470 self.write_obj( 

471 self.pages_ref, 

472 Type=PdfName(b"Pages"), 

473 Count=len(self.pages), 

474 Kids=self.pages, 

475 ) 

476 return self.root_ref 

477 

478 def rewrite_pages(self) -> None: 

479 pages_tree_nodes_to_delete = [] 

480 for i, page_ref in enumerate(self.orig_pages): 

481 page_info = self.cached_objects[page_ref] 

482 del self.xref_table[page_ref.object_id] 

483 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")]) 

484 if page_ref not in self.pages: 

485 # the page has been deleted 

486 continue 

487 # make dict keys into strings for passing to write_page 

488 stringified_page_info = {} 

489 for key, value in page_info.items(): 

490 # key should be a PdfName 

491 stringified_page_info[key.name_as_str()] = value 

492 stringified_page_info["Parent"] = self.pages_ref 

493 new_page_ref = self.write_page(None, **stringified_page_info) 

494 for j, cur_page_ref in enumerate(self.pages): 

495 if cur_page_ref == page_ref: 

496 # replace the page reference with the new one 

497 self.pages[j] = new_page_ref 

498 # delete redundant Pages tree nodes from xref table 

499 for pages_tree_node_ref in pages_tree_nodes_to_delete: 

500 while pages_tree_node_ref: 

501 pages_tree_node = self.cached_objects[pages_tree_node_ref] 

502 if pages_tree_node_ref.object_id in self.xref_table: 

503 del self.xref_table[pages_tree_node_ref.object_id] 

504 pages_tree_node_ref = pages_tree_node.get(b"Parent", None) 

505 self.orig_pages = [] 

506 

507 def write_xref_and_trailer( 

508 self, new_root_ref: IndirectReference | None = None 

509 ) -> None: 

510 assert self.f is not None 

511 if new_root_ref: 

512 self.del_root() 

513 self.root_ref = new_root_ref 

514 if self.info: 

515 self.info_ref = self.write_obj(None, self.info) 

516 start_xref = self.xref_table.write(self.f) 

517 num_entries = len(self.xref_table) 

518 trailer_dict: dict[str | bytes, Any] = { 

519 b"Root": self.root_ref, 

520 b"Size": num_entries, 

521 } 

522 if self.last_xref_section_offset is not None: 

523 trailer_dict[b"Prev"] = self.last_xref_section_offset 

524 if self.info: 

525 trailer_dict[b"Info"] = self.info_ref 

526 self.last_xref_section_offset = start_xref 

527 self.f.write( 

528 b"trailer\n" 

529 + bytes(PdfDict(trailer_dict)) 

530 + b"\nstartxref\n%d\n%%%%EOF" % start_xref 

531 ) 

532 

533 def write_page( 

534 self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any 

535 ) -> IndirectReference: 

536 obj_ref = self.pages[ref] if isinstance(ref, int) else ref 

537 if "Type" not in dict_obj: 

538 dict_obj["Type"] = PdfName(b"Page") 

539 if "Parent" not in dict_obj: 

540 dict_obj["Parent"] = self.pages_ref 

541 return self.write_obj(obj_ref, *objs, **dict_obj) 

542 

543 def write_obj( 

544 self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any 

545 ) -> IndirectReference: 

546 assert self.f is not None 

547 f = self.f 

548 if ref is None: 

549 ref = self.next_object_id(f.tell()) 

550 else: 

551 self.xref_table[ref.object_id] = (f.tell(), ref.generation) 

552 f.write(bytes(IndirectObjectDef(*ref))) 

553 stream = dict_obj.pop("stream", None) 

554 if stream is not None: 

555 dict_obj["Length"] = len(stream) 

556 if dict_obj: 

557 f.write(pdf_repr(dict_obj)) 

558 for obj in objs: 

559 f.write(pdf_repr(obj)) 

560 if stream is not None: 

561 f.write(b"stream\n") 

562 f.write(stream) 

563 f.write(b"\nendstream\n") 

564 f.write(b"endobj\n") 

565 return ref 

566 

567 def del_root(self) -> None: 

568 if self.root_ref is None: 

569 return 

570 del self.xref_table[self.root_ref.object_id] 

571 del self.xref_table[self.root[b"Pages"].object_id] 

572 

573 @staticmethod 

574 def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap: 

575 if hasattr(f, "getbuffer"): 

576 return f.getbuffer() 

577 elif hasattr(f, "getvalue"): 

578 return f.getvalue() 

579 else: 

580 try: 

581 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 

582 except ValueError: # cannot mmap an empty file 

583 return b"" 

584 

585 def read_pdf_info(self) -> None: 

586 assert self.buf is not None 

587 self.file_size_total = len(self.buf) 

588 self.file_size_this = self.file_size_total - self.start_offset 

589 self.read_trailer() 

590 check_format_condition( 

591 self.trailer_dict.get(b"Root") is not None, "Root is missing" 

592 ) 

593 self.root_ref = self.trailer_dict[b"Root"] 

594 assert self.root_ref is not None 

595 self.info_ref = self.trailer_dict.get(b"Info", None) 

596 self.root = PdfDict(self.read_indirect(self.root_ref)) 

597 if self.info_ref is None: 

598 self.info = PdfDict() 

599 else: 

600 self.info = PdfDict(self.read_indirect(self.info_ref)) 

601 check_format_condition(b"Type" in self.root, "/Type missing in Root") 

602 check_format_condition( 

603 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" 

604 ) 

605 check_format_condition( 

606 self.root.get(b"Pages") is not None, "/Pages missing in Root" 

607 ) 

608 check_format_condition( 

609 isinstance(self.root[b"Pages"], IndirectReference), 

610 "/Pages in Root is not an indirect reference", 

611 ) 

612 self.pages_ref = self.root[b"Pages"] 

613 assert self.pages_ref is not None 

614 self.page_tree_root = self.read_indirect(self.pages_ref) 

615 self.pages = self.linearize_page_tree(self.page_tree_root) 

616 # save the original list of page references 

617 # in case the user modifies, adds or deletes some pages 

618 # and we need to rewrite the pages and their list 

619 self.orig_pages = self.pages[:] 

620 

621 def next_object_id(self, offset: int | None = None) -> IndirectReference: 

622 try: 

623 # TODO: support reuse of deleted objects 

624 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) 

625 except ValueError: 

626 reference = IndirectReference(1, 0) 

627 if offset is not None: 

628 self.xref_table[reference.object_id] = (offset, 0) 

629 return reference 

630 

631 delimiter = rb"[][()<>{}/%]" 

632 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]" 

633 whitespace = rb"[\000\011\012\014\015\040]" 

634 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]" 

635 whitespace_optional = whitespace + b"*" 

636 whitespace_mandatory = whitespace + b"+" 

637 # No "\012" aka "\n" or "\015" aka "\r": 

638 whitespace_optional_no_nl = rb"[\000\011\014\040]*" 

639 newline_only = rb"[\r\n]+" 

640 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl 

641 re_trailer_end = re.compile( 

642 whitespace_mandatory 

643 + rb"trailer" 

644 + whitespace_optional 

645 + rb"<<(.*>>)" 

646 + newline 

647 + rb"startxref" 

648 + newline 

649 + rb"([0-9]+)" 

650 + newline 

651 + rb"%%EOF" 

652 + whitespace_optional 

653 + rb"$", 

654 re.DOTALL, 

655 ) 

656 re_trailer_prev = re.compile( 

657 whitespace_optional 

658 + rb"trailer" 

659 + whitespace_optional 

660 + rb"<<(.*?>>)" 

661 + newline 

662 + rb"startxref" 

663 + newline 

664 + rb"([0-9]+)" 

665 + newline 

666 + rb"%%EOF" 

667 + whitespace_optional, 

668 re.DOTALL, 

669 ) 

670 

671 def read_trailer(self) -> None: 

672 assert self.buf is not None 

673 search_start_offset = len(self.buf) - 16384 

674 if search_start_offset < self.start_offset: 

675 search_start_offset = self.start_offset 

676 m = self.re_trailer_end.search(self.buf, search_start_offset) 

677 check_format_condition(m is not None, "trailer end not found") 

678 # make sure we found the LAST trailer 

679 last_match = m 

680 while m: 

681 last_match = m 

682 m = self.re_trailer_end.search(self.buf, m.start() + 16) 

683 if not m: 

684 m = last_match 

685 assert m is not None 

686 trailer_data = m.group(1) 

687 self.last_xref_section_offset = int(m.group(2)) 

688 self.trailer_dict = self.interpret_trailer(trailer_data) 

689 self.xref_table = XrefTable() 

690 self.read_xref_table(xref_section_offset=self.last_xref_section_offset) 

691 if b"Prev" in self.trailer_dict: 

692 self.read_prev_trailer(self.trailer_dict[b"Prev"]) 

693 

694 def read_prev_trailer( 

695 self, xref_section_offset: int, processed_offsets: list[int] | None = None 

696 ) -> None: 

697 assert self.buf is not None 

698 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) 

699 m = self.re_trailer_prev.search( 

700 self.buf[trailer_offset : trailer_offset + 16384] 

701 ) 

702 check_format_condition(m is not None, "previous trailer not found") 

703 assert m is not None 

704 trailer_data = m.group(1) 

705 check_format_condition( 

706 int(m.group(2)) == xref_section_offset, 

707 "xref section offset in previous trailer doesn't match what was expected", 

708 ) 

709 trailer_dict = self.interpret_trailer(trailer_data) 

710 if b"Prev" in trailer_dict: 

711 if processed_offsets is None: 

712 processed_offsets = [] 

713 processed_offsets.append(xref_section_offset) 

714 check_format_condition( 

715 trailer_dict[b"Prev"] not in processed_offsets, "trailer loop found" 

716 ) 

717 self.read_prev_trailer(trailer_dict[b"Prev"], processed_offsets) 

718 

719 re_whitespace_optional = re.compile(whitespace_optional) 

720 re_name = re.compile( 

721 whitespace_optional 

722 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?=" 

723 + delimiter_or_ws 

724 + rb")" 

725 ) 

726 re_dict_start = re.compile(whitespace_optional + rb"<<") 

727 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) 

728 

729 @classmethod 

730 def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]: 

731 trailer = {} 

732 offset = 0 

733 while True: 

734 m = cls.re_name.match(trailer_data, offset) 

735 if not m: 

736 m = cls.re_dict_end.match(trailer_data, offset) 

737 check_format_condition( 

738 m is not None and m.end() == len(trailer_data), 

739 "name not found in trailer, remaining data: " 

740 + repr(trailer_data[offset:]), 

741 ) 

742 break 

743 key = cls.interpret_name(m.group(1)) 

744 assert isinstance(key, bytes) 

745 value, value_offset = cls.get_value(trailer_data, m.end()) 

746 trailer[key] = value 

747 if value_offset is None: 

748 break 

749 offset = value_offset 

750 check_format_condition( 

751 b"Size" in trailer and isinstance(trailer[b"Size"], int), 

752 "/Size not in trailer or not an integer", 

753 ) 

754 check_format_condition( 

755 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference), 

756 "/Root not in trailer or not an indirect reference", 

757 ) 

758 return trailer 

759 

760 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") 

761 

762 @classmethod 

763 def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes: 

764 name = b"" 

765 for m in cls.re_hashes_in_name.finditer(raw): 

766 if m.group(3): 

767 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii")) 

768 else: 

769 name += m.group(1) 

770 if as_text: 

771 return name.decode("utf-8") 

772 else: 

773 return bytes(name) 

774 

775 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")") 

776 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")") 

777 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")") 

778 re_int = re.compile( 

779 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")" 

780 ) 

781 re_real = re.compile( 

782 whitespace_optional 

783 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?=" 

784 + delimiter_or_ws 

785 + rb")" 

786 ) 

787 re_array_start = re.compile(whitespace_optional + rb"\[") 

788 re_array_end = re.compile(whitespace_optional + rb"]") 

789 re_string_hex = re.compile( 

790 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>" 

791 ) 

792 re_string_lit = re.compile(whitespace_optional + rb"\(") 

793 re_indirect_reference = re.compile( 

794 whitespace_optional 

795 + rb"([-+]?[0-9]+)" 

796 + whitespace_mandatory 

797 + rb"([-+]?[0-9]+)" 

798 + whitespace_mandatory 

799 + rb"R(?=" 

800 + delimiter_or_ws 

801 + rb")" 

802 ) 

803 re_indirect_def_start = re.compile( 

804 whitespace_optional 

805 + rb"([-+]?[0-9]+)" 

806 + whitespace_mandatory 

807 + rb"([-+]?[0-9]+)" 

808 + whitespace_mandatory 

809 + rb"obj(?=" 

810 + delimiter_or_ws 

811 + rb")" 

812 ) 

813 re_indirect_def_end = re.compile( 

814 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")" 

815 ) 

816 re_comment = re.compile( 

817 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*" 

818 ) 

819 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n") 

820 re_stream_end = re.compile( 

821 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")" 

822 ) 

823 

824 @classmethod 

825 def get_value( 

826 cls, 

827 data: bytes | bytearray | mmap.mmap, 

828 offset: int, 

829 expect_indirect: IndirectReference | None = None, 

830 max_nesting: int = -1, 

831 ) -> tuple[Any, int | None]: 

832 if max_nesting == 0: 

833 return None, None 

834 m = cls.re_comment.match(data, offset) 

835 if m: 

836 offset = m.end() 

837 m = cls.re_indirect_def_start.match(data, offset) 

838 if m: 

839 check_format_condition( 

840 int(m.group(1)) > 0, 

841 "indirect object definition: object ID must be greater than 0", 

842 ) 

843 check_format_condition( 

844 int(m.group(2)) >= 0, 

845 "indirect object definition: generation must be non-negative", 

846 ) 

847 check_format_condition( 

848 expect_indirect is None 

849 or expect_indirect 

850 == IndirectReference(int(m.group(1)), int(m.group(2))), 

851 "indirect object definition different than expected", 

852 ) 

853 object, object_offset = cls.get_value( 

854 data, m.end(), max_nesting=max_nesting - 1 

855 ) 

856 if object_offset is None: 

857 return object, None 

858 m = cls.re_indirect_def_end.match(data, object_offset) 

859 check_format_condition( 

860 m is not None, "indirect object definition end not found" 

861 ) 

862 assert m is not None 

863 return object, m.end() 

864 check_format_condition( 

865 not expect_indirect, "indirect object definition not found" 

866 ) 

867 m = cls.re_indirect_reference.match(data, offset) 

868 if m: 

869 check_format_condition( 

870 int(m.group(1)) > 0, 

871 "indirect object reference: object ID must be greater than 0", 

872 ) 

873 check_format_condition( 

874 int(m.group(2)) >= 0, 

875 "indirect object reference: generation must be non-negative", 

876 ) 

877 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end() 

878 m = cls.re_dict_start.match(data, offset) 

879 if m: 

880 offset = m.end() 

881 result: dict[Any, Any] = {} 

882 m = cls.re_dict_end.match(data, offset) 

883 current_offset: int | None = offset 

884 while not m: 

885 assert current_offset is not None 

886 key, current_offset = cls.get_value( 

887 data, current_offset, max_nesting=max_nesting - 1 

888 ) 

889 if current_offset is None: 

890 return result, None 

891 value, current_offset = cls.get_value( 

892 data, current_offset, max_nesting=max_nesting - 1 

893 ) 

894 result[key] = value 

895 if current_offset is None: 

896 return result, None 

897 m = cls.re_dict_end.match(data, current_offset) 

898 current_offset = m.end() 

899 m = cls.re_stream_start.match(data, current_offset) 

900 if m: 

901 stream_len = result.get(b"Length") 

902 if stream_len is None or not isinstance(stream_len, int): 

903 msg = f"bad or missing Length in stream dict ({stream_len})" 

904 raise PdfFormatError(msg) 

905 stream_data = data[m.end() : m.end() + stream_len] 

906 m = cls.re_stream_end.match(data, m.end() + stream_len) 

907 check_format_condition(m is not None, "stream end not found") 

908 assert m is not None 

909 current_offset = m.end() 

910 return PdfStream(PdfDict(result), stream_data), current_offset 

911 return PdfDict(result), current_offset 

912 m = cls.re_array_start.match(data, offset) 

913 if m: 

914 offset = m.end() 

915 results = [] 

916 m = cls.re_array_end.match(data, offset) 

917 current_offset = offset 

918 while not m: 

919 assert current_offset is not None 

920 value, current_offset = cls.get_value( 

921 data, current_offset, max_nesting=max_nesting - 1 

922 ) 

923 results.append(value) 

924 if current_offset is None: 

925 return results, None 

926 m = cls.re_array_end.match(data, current_offset) 

927 return results, m.end() 

928 m = cls.re_null.match(data, offset) 

929 if m: 

930 return None, m.end() 

931 m = cls.re_true.match(data, offset) 

932 if m: 

933 return True, m.end() 

934 m = cls.re_false.match(data, offset) 

935 if m: 

936 return False, m.end() 

937 m = cls.re_name.match(data, offset) 

938 if m: 

939 return PdfName(cls.interpret_name(m.group(1))), m.end() 

940 m = cls.re_int.match(data, offset) 

941 if m: 

942 return int(m.group(1)), m.end() 

943 m = cls.re_real.match(data, offset) 

944 if m: 

945 # XXX Decimal instead of float??? 

946 return float(m.group(1)), m.end() 

947 m = cls.re_string_hex.match(data, offset) 

948 if m: 

949 # filter out whitespace 

950 hex_string = bytearray( 

951 b for b in m.group(1) if b in b"0123456789abcdefABCDEF" 

952 ) 

953 if len(hex_string) % 2 == 1: 

954 # append a 0 if the length is not even - yes, at the end 

955 hex_string.append(ord(b"0")) 

956 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end() 

957 m = cls.re_string_lit.match(data, offset) 

958 if m: 

959 return cls.get_literal_string(data, m.end()) 

960 # return None, offset # fallback (only for debugging) 

961 msg = f"unrecognized object: {repr(data[offset : offset + 32])}" 

962 raise PdfFormatError(msg) 

963 

964 re_lit_str_token = re.compile( 

965 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))" 

966 ) 

967 escaped_chars = { 

968 b"n": b"\n", 

969 b"r": b"\r", 

970 b"t": b"\t", 

971 b"b": b"\b", 

972 b"f": b"\f", 

973 b"(": b"(", 

974 b")": b")", 

975 b"\\": b"\\", 

976 ord(b"n"): b"\n", 

977 ord(b"r"): b"\r", 

978 ord(b"t"): b"\t", 

979 ord(b"b"): b"\b", 

980 ord(b"f"): b"\f", 

981 ord(b"("): b"(", 

982 ord(b")"): b")", 

983 ord(b"\\"): b"\\", 

984 } 

985 

986 @classmethod 

987 def get_literal_string( 

988 cls, data: bytes | bytearray | mmap.mmap, offset: int 

989 ) -> tuple[bytes, int]: 

990 nesting_depth = 0 

991 result = bytearray() 

992 for m in cls.re_lit_str_token.finditer(data, offset): 

993 result.extend(data[offset : m.start()]) 

994 if m.group(1): 

995 result.extend(cls.escaped_chars[m.group(1)[1]]) 

996 elif m.group(2): 

997 result.append(int(m.group(2)[1:], 8)) 

998 elif m.group(3): 

999 pass 

1000 elif m.group(5): 

1001 result.extend(b"\n") 

1002 elif m.group(6): 

1003 result.extend(b"(") 

1004 nesting_depth += 1 

1005 elif m.group(7): 

1006 if nesting_depth == 0: 

1007 return bytes(result), m.end() 

1008 result.extend(b")") 

1009 nesting_depth -= 1 

1010 offset = m.end() 

1011 msg = "unfinished literal string" 

1012 raise PdfFormatError(msg) 

1013 

1014 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline) 

1015 re_xref_subsection_start = re.compile( 

1016 whitespace_optional 

1017 + rb"([0-9]+)" 

1018 + whitespace_mandatory 

1019 + rb"([0-9]+)" 

1020 + whitespace_optional 

1021 + newline_only 

1022 ) 

1023 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") 

1024 

1025 def read_xref_table(self, xref_section_offset: int) -> int: 

1026 assert self.buf is not None 

1027 subsection_found = False 

1028 m = self.re_xref_section_start.match( 

1029 self.buf, xref_section_offset + self.start_offset 

1030 ) 

1031 check_format_condition(m is not None, "xref section start not found") 

1032 assert m is not None 

1033 offset = m.end() 

1034 while True: 

1035 m = self.re_xref_subsection_start.match(self.buf, offset) 

1036 if not m: 

1037 check_format_condition( 

1038 subsection_found, "xref subsection start not found" 

1039 ) 

1040 break 

1041 subsection_found = True 

1042 offset = m.end() 

1043 first_object = int(m.group(1)) 

1044 num_objects = int(m.group(2)) 

1045 for i in range(first_object, first_object + num_objects): 

1046 m = self.re_xref_entry.match(self.buf, offset) 

1047 check_format_condition(m is not None, "xref entry not found") 

1048 assert m is not None 

1049 offset = m.end() 

1050 is_free = m.group(3) == b"f" 

1051 if not is_free: 

1052 generation = int(m.group(2)) 

1053 new_entry = (int(m.group(1)), generation) 

1054 if i not in self.xref_table: 

1055 self.xref_table[i] = new_entry 

1056 return offset 

1057 

1058 def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any: 

1059 offset, generation = self.xref_table[ref[0]] 

1060 check_format_condition( 

1061 generation == ref[1], 

1062 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " 

1063 f"table, instead found generation {generation} at offset {offset}", 

1064 ) 

1065 assert self.buf is not None 

1066 value = self.get_value( 

1067 self.buf, 

1068 offset + self.start_offset, 

1069 expect_indirect=IndirectReference(*ref), 

1070 max_nesting=max_nesting, 

1071 )[0] 

1072 self.cached_objects[ref] = value 

1073 return value 

1074 

1075 def linearize_page_tree( 

1076 self, node: PdfDict | None = None 

1077 ) -> list[IndirectReference]: 

1078 page_node = node if node is not None else self.page_tree_root 

1079 check_format_condition( 

1080 page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" 

1081 ) 

1082 pages = [] 

1083 for kid in page_node[b"Kids"]: 

1084 kid_object = self.read_indirect(kid) 

1085 if kid_object[b"Type"] == b"Page": 

1086 pages.append(kid) 

1087 else: 

1088 pages.extend(self.linearize_page_tree(node=kid_object)) 

1089 return pages