Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/PdfParser.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

692 statements  

1from __future__ import annotations 

2 

3import calendar 

4import codecs 

5import collections 

6import mmap 

7import os 

8import re 

9import time 

10import zlib 

11from typing import Any, NamedTuple 

12 

13from . import ImageFile 

14 

15TYPE_CHECKING = False 

16if TYPE_CHECKING: 

17 from typing import IO 

18 

19 _DictBase = collections.UserDict[str | bytes, Any] 

20else: 

21 _DictBase = collections.UserDict 

22 

23 

24# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set 

25# on page 656 

26def encode_text(s: str) -> bytes: 

27 return codecs.BOM_UTF16_BE + s.encode("utf_16_be") 

28 

29 

30PDFDocEncoding = { 

31 0x16: "\u0017", 

32 0x18: "\u02d8", 

33 0x19: "\u02c7", 

34 0x1A: "\u02c6", 

35 0x1B: "\u02d9", 

36 0x1C: "\u02dd", 

37 0x1D: "\u02db", 

38 0x1E: "\u02da", 

39 0x1F: "\u02dc", 

40 0x80: "\u2022", 

41 0x81: "\u2020", 

42 0x82: "\u2021", 

43 0x83: "\u2026", 

44 0x84: "\u2014", 

45 0x85: "\u2013", 

46 0x86: "\u0192", 

47 0x87: "\u2044", 

48 0x88: "\u2039", 

49 0x89: "\u203a", 

50 0x8A: "\u2212", 

51 0x8B: "\u2030", 

52 0x8C: "\u201e", 

53 0x8D: "\u201c", 

54 0x8E: "\u201d", 

55 0x8F: "\u2018", 

56 0x90: "\u2019", 

57 0x91: "\u201a", 

58 0x92: "\u2122", 

59 0x93: "\ufb01", 

60 0x94: "\ufb02", 

61 0x95: "\u0141", 

62 0x96: "\u0152", 

63 0x97: "\u0160", 

64 0x98: "\u0178", 

65 0x99: "\u017d", 

66 0x9A: "\u0131", 

67 0x9B: "\u0142", 

68 0x9C: "\u0153", 

69 0x9D: "\u0161", 

70 0x9E: "\u017e", 

71 0xA0: "\u20ac", 

72} 

73 

74 

75def decode_text(b: bytes) -> str: 

76 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: 

77 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") 

78 else: 

79 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b) 

80 

81 

82class PdfFormatError(RuntimeError): 

83 """An error that probably indicates a syntactic or semantic error in the 

84 PDF file structure""" 

85 

86 pass 

87 

88 

89def check_format_condition(condition: bool, error_message: str) -> None: 

90 if not condition: 

91 raise PdfFormatError(error_message) 

92 

93 

94class IndirectReferenceTuple(NamedTuple): 

95 object_id: int 

96 generation: int 

97 

98 

99class IndirectReference(IndirectReferenceTuple): 

100 def __str__(self) -> str: 

101 return f"{self.object_id} {self.generation} R" 

102 

103 def __bytes__(self) -> bytes: 

104 return self.__str__().encode("us-ascii") 

105 

106 def __eq__(self, other: object) -> bool: 

107 if self.__class__ is not other.__class__: 

108 return False 

109 assert isinstance(other, IndirectReference) 

110 return other.object_id == self.object_id and other.generation == self.generation 

111 

112 def __ne__(self, other: object) -> bool: 

113 return not (self == other) 

114 

115 def __hash__(self) -> int: 

116 return hash((self.object_id, self.generation)) 

117 

118 

119class IndirectObjectDef(IndirectReference): 

120 def __str__(self) -> str: 

121 return f"{self.object_id} {self.generation} obj" 

122 

123 

124class XrefTable: 

125 def __init__(self) -> None: 

126 self.existing_entries: dict[int, tuple[int, int]] = ( 

127 {} 

128 ) # object ID => (offset, generation) 

129 self.new_entries: dict[int, tuple[int, int]] = ( 

130 {} 

131 ) # object ID => (offset, generation) 

132 self.deleted_entries = {0: 65536} # object ID => generation 

133 self.reading_finished = False 

134 

135 def __setitem__(self, key: int, value: tuple[int, int]) -> None: 

136 if self.reading_finished: 

137 self.new_entries[key] = value 

138 else: 

139 self.existing_entries[key] = value 

140 if key in self.deleted_entries: 

141 del self.deleted_entries[key] 

142 

143 def __getitem__(self, key: int) -> tuple[int, int]: 

144 try: 

145 return self.new_entries[key] 

146 except KeyError: 

147 return self.existing_entries[key] 

148 

149 def __delitem__(self, key: int) -> None: 

150 if key in self.new_entries: 

151 generation = self.new_entries[key][1] + 1 

152 del self.new_entries[key] 

153 self.deleted_entries[key] = generation 

154 elif key in self.existing_entries: 

155 generation = self.existing_entries[key][1] + 1 

156 self.deleted_entries[key] = generation 

157 elif key in self.deleted_entries: 

158 generation = self.deleted_entries[key] 

159 else: 

160 msg = f"object ID {key} cannot be deleted because it doesn't exist" 

161 raise IndexError(msg) 

162 

163 def __contains__(self, key: int) -> bool: 

164 return key in self.existing_entries or key in self.new_entries 

165 

166 def __len__(self) -> int: 

167 return len( 

168 set(self.existing_entries.keys()) 

169 | set(self.new_entries.keys()) 

170 | set(self.deleted_entries.keys()) 

171 ) 

172 

173 def keys(self) -> set[int]: 

174 return ( 

175 set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) 

176 ) | set(self.new_entries.keys()) 

177 

178 def write(self, f: IO[bytes]) -> int: 

179 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) 

180 deleted_keys = sorted(set(self.deleted_entries.keys())) 

181 startxref = f.tell() 

182 f.write(b"xref\n") 

183 while keys: 

184 # find a contiguous sequence of object IDs 

185 prev: int | None = None 

186 for index, key in enumerate(keys): 

187 if prev is None or prev + 1 == key: 

188 prev = key 

189 else: 

190 contiguous_keys = keys[:index] 

191 keys = keys[index:] 

192 break 

193 else: 

194 contiguous_keys = keys 

195 keys = [] 

196 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) 

197 for object_id in contiguous_keys: 

198 if object_id in self.new_entries: 

199 f.write(b"%010d %05d n \n" % self.new_entries[object_id]) 

200 else: 

201 this_deleted_object_id = deleted_keys.pop(0) 

202 check_format_condition( 

203 object_id == this_deleted_object_id, 

204 f"expected the next deleted object ID to be {object_id}, " 

205 f"instead found {this_deleted_object_id}", 

206 ) 

207 try: 

208 next_in_linked_list = deleted_keys[0] 

209 except IndexError: 

210 next_in_linked_list = 0 

211 f.write( 

212 b"%010d %05d f \n" 

213 % (next_in_linked_list, self.deleted_entries[object_id]) 

214 ) 

215 return startxref 

216 

217 

218class PdfName: 

219 name: bytes 

220 

221 def __init__(self, name: PdfName | bytes | str) -> None: 

222 if isinstance(name, PdfName): 

223 self.name = name.name 

224 elif isinstance(name, bytes): 

225 self.name = name 

226 else: 

227 self.name = name.encode("us-ascii") 

228 

229 def name_as_str(self) -> str: 

230 return self.name.decode("us-ascii") 

231 

232 def __eq__(self, other: object) -> bool: 

233 return ( 

234 isinstance(other, PdfName) and other.name == self.name 

235 ) or other == self.name 

236 

237 def __hash__(self) -> int: 

238 return hash(self.name) 

239 

240 def __repr__(self) -> str: 

241 return f"{self.__class__.__name__}({repr(self.name)})" 

242 

243 @classmethod 

244 def from_pdf_stream(cls, data: bytes) -> PdfName: 

245 return cls(PdfParser.interpret_name(data)) 

246 

247 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} 

248 

249 def __bytes__(self) -> bytes: 

250 result = bytearray(b"/") 

251 for b in self.name: 

252 if b in self.allowed_chars: 

253 result.append(b) 

254 else: 

255 result.extend(b"#%02X" % b) 

256 return bytes(result) 

257 

258 

259class PdfArray(list[Any]): 

260 def __bytes__(self) -> bytes: 

261 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]" 

262 

263 

264class PdfDict(_DictBase): 

265 def __setattr__(self, key: str, value: Any) -> None: 

266 if key == "data": 

267 collections.UserDict.__setattr__(self, key, value) 

268 else: 

269 self[key.encode("us-ascii")] = value 

270 

271 def __getattr__(self, key: str) -> str | time.struct_time: 

272 try: 

273 value = self[key.encode("us-ascii")] 

274 except KeyError as e: 

275 raise AttributeError(key) from e 

276 if isinstance(value, bytes): 

277 value = decode_text(value) 

278 if key.endswith("Date"): 

279 if value.startswith("D:"): 

280 value = value[2:] 

281 

282 relationship = "Z" 

283 if len(value) > 17: 

284 relationship = value[14] 

285 offset = int(value[15:17]) * 60 

286 if len(value) > 20: 

287 offset += int(value[18:20]) 

288 

289 format = "%Y%m%d%H%M%S"[: len(value) - 2] 

290 value = time.strptime(value[: len(format) + 2], format) 

291 if relationship in ["+", "-"]: 

292 offset *= 60 

293 if relationship == "+": 

294 offset *= -1 

295 value = time.gmtime(calendar.timegm(value) + offset) 

296 return value 

297 

298 def __bytes__(self) -> bytes: 

299 out = bytearray(b"<<") 

300 for key, value in self.items(): 

301 if value is None: 

302 continue 

303 value = pdf_repr(value) 

304 out.extend(b"\n") 

305 out.extend(bytes(PdfName(key))) 

306 out.extend(b" ") 

307 out.extend(value) 

308 out.extend(b"\n>>") 

309 return bytes(out) 

310 

311 

312class PdfBinary: 

313 def __init__(self, data: list[int] | bytes) -> None: 

314 self.data = data 

315 

316 def __bytes__(self) -> bytes: 

317 return b"<%s>" % b"".join(b"%02X" % b for b in self.data) 

318 

319 

320class PdfStream: 

321 def __init__(self, dictionary: PdfDict, buf: bytes) -> None: 

322 self.dictionary = dictionary 

323 self.buf = buf 

324 

325 def decode(self, max_length: int = ImageFile.SAFEBLOCK) -> bytes: 

326 try: 

327 filter = self.dictionary[b"Filter"] 

328 except KeyError: 

329 return self.buf 

330 if filter == b"FlateDecode": 

331 dobj = zlib.decompressobj() 

332 plaintext = dobj.decompress(self.buf, max_length) 

333 if dobj.unconsumed_tail: 

334 msg = "Decompressed data too large" 

335 raise ValueError(msg) 

336 return plaintext 

337 else: 

338 msg = f"stream filter {repr(filter)} unknown/unsupported" 

339 raise NotImplementedError(msg) 

340 

341 

342def pdf_repr(x: Any) -> bytes: 

343 if x is True: 

344 return b"true" 

345 elif x is False: 

346 return b"false" 

347 elif x is None: 

348 return b"null" 

349 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)): 

350 return bytes(x) 

351 elif isinstance(x, (int, float)): 

352 return str(x).encode("us-ascii") 

353 elif isinstance(x, time.struct_time): 

354 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")" 

355 elif isinstance(x, dict): 

356 return bytes(PdfDict(x)) 

357 elif isinstance(x, list): 

358 return bytes(PdfArray(x)) 

359 elif isinstance(x, str): 

360 return pdf_repr(encode_text(x)) 

361 elif isinstance(x, bytes): 

362 # XXX escape more chars? handle binary garbage 

363 x = x.replace(b"\\", b"\\\\") 

364 x = x.replace(b"(", b"\\(") 

365 x = x.replace(b")", b"\\)") 

366 return b"(" + x + b")" 

367 else: 

368 return bytes(x) 

369 

370 

371class PdfParser: 

372 """Based on 

373 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf 

374 Supports PDF up to 1.4 

375 """ 

376 

377 def __init__( 

378 self, 

379 filename: str | None = None, 

380 f: IO[bytes] | None = None, 

381 buf: bytes | bytearray | None = None, 

382 start_offset: int = 0, 

383 mode: str = "rb", 

384 ) -> None: 

385 if buf and f: 

386 msg = "specify buf or f or filename, but not both buf and f" 

387 raise RuntimeError(msg) 

388 self.filename = filename 

389 self.buf: bytes | bytearray | memoryview | mmap.mmap | None = buf 

390 self.f = f 

391 self.start_offset = start_offset 

392 self.should_close_buf = False 

393 self.should_close_file = False 

394 if filename is not None and f is None: 

395 self.f = f = open(filename, mode) 

396 self.should_close_file = True 

397 if f is not None: 

398 self.buf = self.get_buf_from_file(f) 

399 self.should_close_buf = True 

400 if not filename and hasattr(f, "name"): 

401 self.filename = f.name 

402 self.cached_objects: dict[IndirectReference, Any] = {} 

403 self.root_ref: IndirectReference | None 

404 self.info_ref: IndirectReference | None 

405 self.pages_ref: IndirectReference | None 

406 self.last_xref_section_offset: int | None 

407 if self.buf: 

408 try: 

409 self.read_pdf_info() 

410 except PdfFormatError: 

411 self.close() 

412 raise 

413 else: 

414 self.file_size_total = self.file_size_this = 0 

415 self.root = PdfDict() 

416 self.root_ref = None 

417 self.info = PdfDict() 

418 self.info_ref = None 

419 self.page_tree_root = PdfDict() 

420 self.pages: list[IndirectReference] = [] 

421 self.orig_pages: list[IndirectReference] = [] 

422 self.pages_ref = None 

423 self.last_xref_section_offset = None 

424 self.trailer_dict: dict[bytes, Any] = {} 

425 self.xref_table = XrefTable() 

426 self.xref_table.reading_finished = True 

427 if f: 

428 self.seek_end() 

429 

430 def __enter__(self) -> PdfParser: 

431 return self 

432 

433 def __exit__(self, *args: object) -> None: 

434 self.close() 

435 

436 def start_writing(self) -> None: 

437 self.close_buf() 

438 self.seek_end() 

439 

440 def close_buf(self) -> None: 

441 if isinstance(self.buf, memoryview): 

442 self.buf.release() 

443 elif isinstance(self.buf, mmap.mmap): 

444 self.buf.close() 

445 self.buf = None 

446 

447 def close(self) -> None: 

448 if self.should_close_buf: 

449 self.close_buf() 

450 if self.f is not None and self.should_close_file: 

451 self.f.close() 

452 self.f = None 

453 

454 def seek_end(self) -> None: 

455 assert self.f is not None 

456 self.f.seek(0, os.SEEK_END) 

457 

458 def write_header(self) -> None: 

459 assert self.f is not None 

460 self.f.write(b"%PDF-1.4\n") 

461 

462 def write_comment(self, s: str) -> None: 

463 assert self.f is not None 

464 self.f.write(f"% {s}\n".encode()) 

465 

466 def write_catalog(self) -> IndirectReference: 

467 assert self.f is not None 

468 self.del_root() 

469 self.root_ref = self.next_object_id(self.f.tell()) 

470 self.pages_ref = self.next_object_id(0) 

471 self.rewrite_pages() 

472 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref) 

473 self.write_obj( 

474 self.pages_ref, 

475 Type=PdfName(b"Pages"), 

476 Count=len(self.pages), 

477 Kids=self.pages, 

478 ) 

479 return self.root_ref 

480 

481 def rewrite_pages(self) -> None: 

482 pages_tree_nodes_to_delete = [] 

483 for i, page_ref in enumerate(self.orig_pages): 

484 page_info = self.cached_objects[page_ref] 

485 del self.xref_table[page_ref.object_id] 

486 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")]) 

487 if page_ref not in self.pages: 

488 # the page has been deleted 

489 continue 

490 # make dict keys into strings for passing to write_page 

491 stringified_page_info = {} 

492 for key, value in page_info.items(): 

493 # key should be a PdfName 

494 stringified_page_info[key.name_as_str()] = value 

495 stringified_page_info["Parent"] = self.pages_ref 

496 new_page_ref = self.write_page(None, **stringified_page_info) 

497 for j, cur_page_ref in enumerate(self.pages): 

498 if cur_page_ref == page_ref: 

499 # replace the page reference with the new one 

500 self.pages[j] = new_page_ref 

501 # delete redundant Pages tree nodes from xref table 

502 for pages_tree_node_ref in pages_tree_nodes_to_delete: 

503 while pages_tree_node_ref: 

504 pages_tree_node = self.cached_objects[pages_tree_node_ref] 

505 if pages_tree_node_ref.object_id in self.xref_table: 

506 del self.xref_table[pages_tree_node_ref.object_id] 

507 pages_tree_node_ref = pages_tree_node.get(b"Parent", None) 

508 self.orig_pages = [] 

509 

510 def write_xref_and_trailer( 

511 self, new_root_ref: IndirectReference | None = None 

512 ) -> None: 

513 assert self.f is not None 

514 if new_root_ref: 

515 self.del_root() 

516 self.root_ref = new_root_ref 

517 if self.info: 

518 self.info_ref = self.write_obj(None, self.info) 

519 start_xref = self.xref_table.write(self.f) 

520 num_entries = len(self.xref_table) 

521 trailer_dict: dict[str | bytes, Any] = { 

522 b"Root": self.root_ref, 

523 b"Size": num_entries, 

524 } 

525 if self.last_xref_section_offset is not None: 

526 trailer_dict[b"Prev"] = self.last_xref_section_offset 

527 if self.info: 

528 trailer_dict[b"Info"] = self.info_ref 

529 self.last_xref_section_offset = start_xref 

530 self.f.write( 

531 b"trailer\n" 

532 + bytes(PdfDict(trailer_dict)) 

533 + b"\nstartxref\n%d\n%%%%EOF" % start_xref 

534 ) 

535 

536 def write_page( 

537 self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any 

538 ) -> IndirectReference: 

539 obj_ref = self.pages[ref] if isinstance(ref, int) else ref 

540 if "Type" not in dict_obj: 

541 dict_obj["Type"] = PdfName(b"Page") 

542 if "Parent" not in dict_obj: 

543 dict_obj["Parent"] = self.pages_ref 

544 return self.write_obj(obj_ref, *objs, **dict_obj) 

545 

546 def write_obj( 

547 self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any 

548 ) -> IndirectReference: 

549 assert self.f is not None 

550 f = self.f 

551 if ref is None: 

552 ref = self.next_object_id(f.tell()) 

553 else: 

554 self.xref_table[ref.object_id] = (f.tell(), ref.generation) 

555 f.write(bytes(IndirectObjectDef(*ref))) 

556 stream = dict_obj.pop("stream", None) 

557 if stream is not None: 

558 dict_obj["Length"] = len(stream) 

559 if dict_obj: 

560 f.write(pdf_repr(dict_obj)) 

561 for obj in objs: 

562 f.write(pdf_repr(obj)) 

563 if stream is not None: 

564 f.write(b"stream\n") 

565 f.write(stream) 

566 f.write(b"\nendstream\n") 

567 f.write(b"endobj\n") 

568 return ref 

569 

570 def del_root(self) -> None: 

571 if self.root_ref is None: 

572 return 

573 del self.xref_table[self.root_ref.object_id] 

574 del self.xref_table[self.root[b"Pages"].object_id] 

575 

576 @staticmethod 

577 def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap: 

578 if hasattr(f, "getbuffer"): 

579 return f.getbuffer() 

580 elif hasattr(f, "getvalue"): 

581 return f.getvalue() 

582 else: 

583 try: 

584 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 

585 except ValueError: # cannot mmap an empty file 

586 return b"" 

587 

588 def read_pdf_info(self) -> None: 

589 assert self.buf is not None 

590 self.file_size_total = len(self.buf) 

591 self.file_size_this = self.file_size_total - self.start_offset 

592 self.read_trailer() 

593 check_format_condition( 

594 self.trailer_dict.get(b"Root") is not None, "Root is missing" 

595 ) 

596 self.root_ref = self.trailer_dict[b"Root"] 

597 assert self.root_ref is not None 

598 self.info_ref = self.trailer_dict.get(b"Info", None) 

599 self.root = PdfDict(self.read_indirect(self.root_ref)) 

600 if self.info_ref is None: 

601 self.info = PdfDict() 

602 else: 

603 self.info = PdfDict(self.read_indirect(self.info_ref)) 

604 check_format_condition(b"Type" in self.root, "/Type missing in Root") 

605 check_format_condition( 

606 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" 

607 ) 

608 check_format_condition( 

609 self.root.get(b"Pages") is not None, "/Pages missing in Root" 

610 ) 

611 check_format_condition( 

612 isinstance(self.root[b"Pages"], IndirectReference), 

613 "/Pages in Root is not an indirect reference", 

614 ) 

615 self.pages_ref = self.root[b"Pages"] 

616 assert self.pages_ref is not None 

617 self.page_tree_root = self.read_indirect(self.pages_ref) 

618 self.pages = self.linearize_page_tree(self.page_tree_root) 

619 # save the original list of page references 

620 # in case the user modifies, adds or deletes some pages 

621 # and we need to rewrite the pages and their list 

622 self.orig_pages = self.pages[:] 

623 

624 def next_object_id(self, offset: int | None = None) -> IndirectReference: 

625 try: 

626 # TODO: support reuse of deleted objects 

627 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) 

628 except ValueError: 

629 reference = IndirectReference(1, 0) 

630 if offset is not None: 

631 self.xref_table[reference.object_id] = (offset, 0) 

632 return reference 

633 

634 delimiter = rb"[][()<>{}/%]" 

635 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]" 

636 whitespace = rb"[\000\011\012\014\015\040]" 

637 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]" 

638 whitespace_optional = whitespace + b"*" 

639 whitespace_mandatory = whitespace + b"+" 

640 # No "\012" aka "\n" or "\015" aka "\r": 

641 whitespace_optional_no_nl = rb"[\000\011\014\040]*" 

642 newline_only = rb"[\r\n]+" 

643 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl 

644 re_trailer_end = re.compile( 

645 whitespace_mandatory 

646 + rb"trailer" 

647 + whitespace_optional 

648 + rb"<<(.*>>)" 

649 + newline 

650 + rb"startxref" 

651 + newline 

652 + rb"([0-9]+)" 

653 + newline 

654 + rb"%%EOF" 

655 + whitespace_optional 

656 + rb"$", 

657 re.DOTALL, 

658 ) 

659 re_trailer_prev = re.compile( 

660 whitespace_optional 

661 + rb"trailer" 

662 + whitespace_optional 

663 + rb"<<(.*?>>)" 

664 + newline 

665 + rb"startxref" 

666 + newline 

667 + rb"([0-9]+)" 

668 + newline 

669 + rb"%%EOF" 

670 + whitespace_optional, 

671 re.DOTALL, 

672 ) 

673 

674 def read_trailer(self) -> None: 

675 assert self.buf is not None 

676 search_start_offset = len(self.buf) - 16384 

677 if search_start_offset < self.start_offset: 

678 search_start_offset = self.start_offset 

679 m = self.re_trailer_end.search(self.buf, search_start_offset) 

680 check_format_condition(m is not None, "trailer end not found") 

681 # make sure we found the LAST trailer 

682 last_match = m 

683 while m: 

684 last_match = m 

685 m = self.re_trailer_end.search(self.buf, m.start() + 16) 

686 if not m: 

687 m = last_match 

688 assert m is not None 

689 trailer_data = m.group(1) 

690 self.last_xref_section_offset = int(m.group(2)) 

691 self.trailer_dict = self.interpret_trailer(trailer_data) 

692 self.xref_table = XrefTable() 

693 self.read_xref_table(xref_section_offset=self.last_xref_section_offset) 

694 if b"Prev" in self.trailer_dict: 

695 self.read_prev_trailer(self.trailer_dict[b"Prev"]) 

696 

697 def read_prev_trailer( 

698 self, xref_section_offset: int, processed_offsets: list[int] | None = None 

699 ) -> None: 

700 assert self.buf is not None 

701 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) 

702 m = self.re_trailer_prev.search( 

703 self.buf[trailer_offset : trailer_offset + 16384] 

704 ) 

705 check_format_condition(m is not None, "previous trailer not found") 

706 assert m is not None 

707 trailer_data = m.group(1) 

708 check_format_condition( 

709 int(m.group(2)) == xref_section_offset, 

710 "xref section offset in previous trailer doesn't match what was expected", 

711 ) 

712 trailer_dict = self.interpret_trailer(trailer_data) 

713 if b"Prev" in trailer_dict: 

714 if processed_offsets is None: 

715 processed_offsets = [] 

716 processed_offsets.append(xref_section_offset) 

717 check_format_condition( 

718 trailer_dict[b"Prev"] not in processed_offsets, "trailer loop found" 

719 ) 

720 self.read_prev_trailer(trailer_dict[b"Prev"], processed_offsets) 

721 

722 re_whitespace_optional = re.compile(whitespace_optional) 

723 re_name = re.compile( 

724 whitespace_optional 

725 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?=" 

726 + delimiter_or_ws 

727 + rb")" 

728 ) 

729 re_dict_start = re.compile(whitespace_optional + rb"<<") 

730 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) 

731 

732 @classmethod 

733 def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]: 

734 trailer = {} 

735 offset = 0 

736 while True: 

737 m = cls.re_name.match(trailer_data, offset) 

738 if not m: 

739 m = cls.re_dict_end.match(trailer_data, offset) 

740 check_format_condition( 

741 m is not None and m.end() == len(trailer_data), 

742 "name not found in trailer, remaining data: " 

743 + repr(trailer_data[offset:]), 

744 ) 

745 break 

746 key = cls.interpret_name(m.group(1)) 

747 assert isinstance(key, bytes) 

748 value, value_offset = cls.get_value(trailer_data, m.end()) 

749 trailer[key] = value 

750 if value_offset is None: 

751 break 

752 offset = value_offset 

753 check_format_condition( 

754 b"Size" in trailer and isinstance(trailer[b"Size"], int), 

755 "/Size not in trailer or not an integer", 

756 ) 

757 check_format_condition( 

758 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference), 

759 "/Root not in trailer or not an indirect reference", 

760 ) 

761 return trailer 

762 

763 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") 

764 

765 @classmethod 

766 def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes: 

767 name = b"" 

768 for m in cls.re_hashes_in_name.finditer(raw): 

769 if m.group(3): 

770 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii")) 

771 else: 

772 name += m.group(1) 

773 if as_text: 

774 return name.decode("utf-8") 

775 else: 

776 return bytes(name) 

777 

778 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")") 

779 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")") 

780 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")") 

781 re_int = re.compile( 

782 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")" 

783 ) 

784 re_real = re.compile( 

785 whitespace_optional 

786 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?=" 

787 + delimiter_or_ws 

788 + rb")" 

789 ) 

790 re_array_start = re.compile(whitespace_optional + rb"\[") 

791 re_array_end = re.compile(whitespace_optional + rb"]") 

792 re_string_hex = re.compile( 

793 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>" 

794 ) 

795 re_string_lit = re.compile(whitespace_optional + rb"\(") 

796 re_indirect_reference = re.compile( 

797 whitespace_optional 

798 + rb"([-+]?[0-9]+)" 

799 + whitespace_mandatory 

800 + rb"([-+]?[0-9]+)" 

801 + whitespace_mandatory 

802 + rb"R(?=" 

803 + delimiter_or_ws 

804 + rb")" 

805 ) 

806 re_indirect_def_start = re.compile( 

807 whitespace_optional 

808 + rb"([-+]?[0-9]+)" 

809 + whitespace_mandatory 

810 + rb"([-+]?[0-9]+)" 

811 + whitespace_mandatory 

812 + rb"obj(?=" 

813 + delimiter_or_ws 

814 + rb")" 

815 ) 

816 re_indirect_def_end = re.compile( 

817 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")" 

818 ) 

819 re_comment = re.compile( 

820 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*" 

821 ) 

822 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n") 

823 re_stream_end = re.compile( 

824 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")" 

825 ) 

826 

827 @classmethod 

828 def get_value( 

829 cls, 

830 data: bytes | bytearray | memoryview | mmap.mmap, 

831 offset: int, 

832 expect_indirect: IndirectReference | None = None, 

833 max_nesting: int = -1, 

834 ) -> tuple[Any, int | None]: 

835 if max_nesting == 0: 

836 return None, None 

837 m = cls.re_comment.match(data, offset) 

838 if m: 

839 offset = m.end() 

840 m = cls.re_indirect_def_start.match(data, offset) 

841 if m: 

842 check_format_condition( 

843 int(m.group(1)) > 0, 

844 "indirect object definition: object ID must be greater than 0", 

845 ) 

846 check_format_condition( 

847 int(m.group(2)) >= 0, 

848 "indirect object definition: generation must be non-negative", 

849 ) 

850 check_format_condition( 

851 expect_indirect is None 

852 or expect_indirect 

853 == IndirectReference(int(m.group(1)), int(m.group(2))), 

854 "indirect object definition different than expected", 

855 ) 

856 object, object_offset = cls.get_value( 

857 data, m.end(), max_nesting=max_nesting - 1 

858 ) 

859 if object_offset is None: 

860 return object, None 

861 m = cls.re_indirect_def_end.match(data, object_offset) 

862 check_format_condition( 

863 m is not None, "indirect object definition end not found" 

864 ) 

865 assert m is not None 

866 return object, m.end() 

867 check_format_condition( 

868 not expect_indirect, "indirect object definition not found" 

869 ) 

870 m = cls.re_indirect_reference.match(data, offset) 

871 if m: 

872 check_format_condition( 

873 int(m.group(1)) > 0, 

874 "indirect object reference: object ID must be greater than 0", 

875 ) 

876 check_format_condition( 

877 int(m.group(2)) >= 0, 

878 "indirect object reference: generation must be non-negative", 

879 ) 

880 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end() 

881 m = cls.re_dict_start.match(data, offset) 

882 if m: 

883 offset = m.end() 

884 result: dict[Any, Any] = {} 

885 m = cls.re_dict_end.match(data, offset) 

886 current_offset: int | None = offset 

887 while not m: 

888 assert current_offset is not None 

889 key, current_offset = cls.get_value( 

890 data, current_offset, max_nesting=max_nesting - 1 

891 ) 

892 if current_offset is None: 

893 return result, None 

894 value, current_offset = cls.get_value( 

895 data, current_offset, max_nesting=max_nesting - 1 

896 ) 

897 result[key] = value 

898 if current_offset is None: 

899 return result, None 

900 m = cls.re_dict_end.match(data, current_offset) 

901 current_offset = m.end() 

902 m = cls.re_stream_start.match(data, current_offset) 

903 if m: 

904 stream_len = result.get(b"Length") 

905 if stream_len is None or not isinstance(stream_len, int): 

906 msg = f"bad or missing Length in stream dict ({stream_len})" 

907 raise PdfFormatError(msg) 

908 stream_data = bytes(data[m.end() : m.end() + stream_len]) 

909 m = cls.re_stream_end.match(data, m.end() + stream_len) 

910 check_format_condition(m is not None, "stream end not found") 

911 assert m is not None 

912 current_offset = m.end() 

913 return PdfStream(PdfDict(result), stream_data), current_offset 

914 return PdfDict(result), current_offset 

915 m = cls.re_array_start.match(data, offset) 

916 if m: 

917 offset = m.end() 

918 results = [] 

919 m = cls.re_array_end.match(data, offset) 

920 current_offset = offset 

921 while not m: 

922 assert current_offset is not None 

923 value, current_offset = cls.get_value( 

924 data, current_offset, max_nesting=max_nesting - 1 

925 ) 

926 results.append(value) 

927 if current_offset is None: 

928 return results, None 

929 m = cls.re_array_end.match(data, current_offset) 

930 return results, m.end() 

931 m = cls.re_null.match(data, offset) 

932 if m: 

933 return None, m.end() 

934 m = cls.re_true.match(data, offset) 

935 if m: 

936 return True, m.end() 

937 m = cls.re_false.match(data, offset) 

938 if m: 

939 return False, m.end() 

940 m = cls.re_name.match(data, offset) 

941 if m: 

942 return PdfName(cls.interpret_name(m.group(1))), m.end() 

943 m = cls.re_int.match(data, offset) 

944 if m: 

945 return int(m.group(1)), m.end() 

946 m = cls.re_real.match(data, offset) 

947 if m: 

948 # XXX Decimal instead of float??? 

949 return float(m.group(1)), m.end() 

950 m = cls.re_string_hex.match(data, offset) 

951 if m: 

952 # filter out whitespace 

953 hex_string = bytearray( 

954 b for b in m.group(1) if b in b"0123456789abcdefABCDEF" 

955 ) 

956 if len(hex_string) % 2 == 1: 

957 # append a 0 if the length is not even - yes, at the end 

958 hex_string.append(ord(b"0")) 

959 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end() 

960 m = cls.re_string_lit.match(data, offset) 

961 if m: 

962 return cls.get_literal_string(data, m.end()) 

963 # return None, offset # fallback (only for debugging) 

964 msg = f"unrecognized object: {repr(data[offset : offset + 32])}" 

965 raise PdfFormatError(msg) 

966 

967 re_lit_str_token = re.compile( 

968 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))" 

969 ) 

970 escaped_chars = { 

971 b"n": b"\n", 

972 b"r": b"\r", 

973 b"t": b"\t", 

974 b"b": b"\b", 

975 b"f": b"\f", 

976 b"(": b"(", 

977 b")": b")", 

978 b"\\": b"\\", 

979 ord(b"n"): b"\n", 

980 ord(b"r"): b"\r", 

981 ord(b"t"): b"\t", 

982 ord(b"b"): b"\b", 

983 ord(b"f"): b"\f", 

984 ord(b"("): b"(", 

985 ord(b")"): b")", 

986 ord(b"\\"): b"\\", 

987 } 

988 

989 @classmethod 

990 def get_literal_string( 

991 cls, data: bytes | bytearray | memoryview | mmap.mmap, offset: int 

992 ) -> tuple[bytes, int]: 

993 nesting_depth = 0 

994 result = bytearray() 

995 for m in cls.re_lit_str_token.finditer(data, offset): 

996 result.extend(data[offset : m.start()]) 

997 if m.group(1): 

998 result.extend(cls.escaped_chars[m.group(1)[1]]) 

999 elif m.group(2): 

1000 result.append(int(m.group(2)[1:], 8)) 

1001 elif m.group(3): 

1002 pass 

1003 elif m.group(5): 

1004 result.extend(b"\n") 

1005 elif m.group(6): 

1006 result.extend(b"(") 

1007 nesting_depth += 1 

1008 elif m.group(7): 

1009 if nesting_depth == 0: 

1010 return bytes(result), m.end() 

1011 result.extend(b")") 

1012 nesting_depth -= 1 

1013 offset = m.end() 

1014 msg = "unfinished literal string" 

1015 raise PdfFormatError(msg) 

1016 

1017 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline) 

1018 re_xref_subsection_start = re.compile( 

1019 whitespace_optional 

1020 + rb"([0-9]+)" 

1021 + whitespace_mandatory 

1022 + rb"([0-9]+)" 

1023 + whitespace_optional 

1024 + newline_only 

1025 ) 

1026 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") 

1027 

1028 def read_xref_table(self, xref_section_offset: int) -> int: 

1029 assert self.buf is not None 

1030 subsection_found = False 

1031 m = self.re_xref_section_start.match( 

1032 self.buf, xref_section_offset + self.start_offset 

1033 ) 

1034 check_format_condition(m is not None, "xref section start not found") 

1035 assert m is not None 

1036 offset = m.end() 

1037 while True: 

1038 m = self.re_xref_subsection_start.match(self.buf, offset) 

1039 if not m: 

1040 check_format_condition( 

1041 subsection_found, "xref subsection start not found" 

1042 ) 

1043 break 

1044 subsection_found = True 

1045 offset = m.end() 

1046 first_object = int(m.group(1)) 

1047 num_objects = int(m.group(2)) 

1048 for i in range(first_object, first_object + num_objects): 

1049 m = self.re_xref_entry.match(self.buf, offset) 

1050 check_format_condition(m is not None, "xref entry not found") 

1051 assert m is not None 

1052 offset = m.end() 

1053 is_free = m.group(3) == b"f" 

1054 if not is_free: 

1055 generation = int(m.group(2)) 

1056 new_entry = (int(m.group(1)), generation) 

1057 if i not in self.xref_table: 

1058 self.xref_table[i] = new_entry 

1059 return offset 

1060 

1061 def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any: 

1062 offset, generation = self.xref_table[ref[0]] 

1063 check_format_condition( 

1064 generation == ref[1], 

1065 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " 

1066 f"table, instead found generation {generation} at offset {offset}", 

1067 ) 

1068 assert self.buf is not None 

1069 value = self.get_value( 

1070 self.buf, 

1071 offset + self.start_offset, 

1072 expect_indirect=IndirectReference(*ref), 

1073 max_nesting=max_nesting, 

1074 )[0] 

1075 self.cached_objects[ref] = value 

1076 return value 

1077 

1078 def linearize_page_tree( 

1079 self, node: PdfDict | None = None 

1080 ) -> list[IndirectReference]: 

1081 page_node = node if node is not None else self.page_tree_root 

1082 check_format_condition( 

1083 page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" 

1084 ) 

1085 pages = [] 

1086 for kid in page_node[b"Kids"]: 

1087 kid_object = self.read_indirect(kid) 

1088 if kid_object[b"Type"] == b"Page": 

1089 pages.append(kid) 

1090 else: 

1091 pages.extend(self.linearize_page_tree(node=kid_object)) 

1092 return pages