Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/pillow-11.0.0-py3.10-linux-x86_64.egg/PIL/PdfParser.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

678 statements  

1from __future__ import annotations 

2 

3import calendar 

4import codecs 

5import collections 

6import mmap 

7import os 

8import re 

9import time 

10import zlib 

11from typing import IO, TYPE_CHECKING, Any, NamedTuple, Union 

12 

13 

14# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set 

15# on page 656 

16def encode_text(s: str) -> bytes: 

17 return codecs.BOM_UTF16_BE + s.encode("utf_16_be") 

18 

19 

20PDFDocEncoding = { 

21 0x16: "\u0017", 

22 0x18: "\u02D8", 

23 0x19: "\u02C7", 

24 0x1A: "\u02C6", 

25 0x1B: "\u02D9", 

26 0x1C: "\u02DD", 

27 0x1D: "\u02DB", 

28 0x1E: "\u02DA", 

29 0x1F: "\u02DC", 

30 0x80: "\u2022", 

31 0x81: "\u2020", 

32 0x82: "\u2021", 

33 0x83: "\u2026", 

34 0x84: "\u2014", 

35 0x85: "\u2013", 

36 0x86: "\u0192", 

37 0x87: "\u2044", 

38 0x88: "\u2039", 

39 0x89: "\u203A", 

40 0x8A: "\u2212", 

41 0x8B: "\u2030", 

42 0x8C: "\u201E", 

43 0x8D: "\u201C", 

44 0x8E: "\u201D", 

45 0x8F: "\u2018", 

46 0x90: "\u2019", 

47 0x91: "\u201A", 

48 0x92: "\u2122", 

49 0x93: "\uFB01", 

50 0x94: "\uFB02", 

51 0x95: "\u0141", 

52 0x96: "\u0152", 

53 0x97: "\u0160", 

54 0x98: "\u0178", 

55 0x99: "\u017D", 

56 0x9A: "\u0131", 

57 0x9B: "\u0142", 

58 0x9C: "\u0153", 

59 0x9D: "\u0161", 

60 0x9E: "\u017E", 

61 0xA0: "\u20AC", 

62} 

63 

64 

65def decode_text(b: bytes) -> str: 

66 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: 

67 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") 

68 else: 

69 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b) 

70 

71 

72class PdfFormatError(RuntimeError): 

73 """An error that probably indicates a syntactic or semantic error in the 

74 PDF file structure""" 

75 

76 pass 

77 

78 

79def check_format_condition(condition: bool, error_message: str) -> None: 

80 if not condition: 

81 raise PdfFormatError(error_message) 

82 

83 

84class IndirectReferenceTuple(NamedTuple): 

85 object_id: int 

86 generation: int 

87 

88 

89class IndirectReference(IndirectReferenceTuple): 

90 def __str__(self) -> str: 

91 return f"{self.object_id} {self.generation} R" 

92 

93 def __bytes__(self) -> bytes: 

94 return self.__str__().encode("us-ascii") 

95 

96 def __eq__(self, other: object) -> bool: 

97 if self.__class__ is not other.__class__: 

98 return False 

99 assert isinstance(other, IndirectReference) 

100 return other.object_id == self.object_id and other.generation == self.generation 

101 

102 def __ne__(self, other: object) -> bool: 

103 return not (self == other) 

104 

105 def __hash__(self) -> int: 

106 return hash((self.object_id, self.generation)) 

107 

108 

109class IndirectObjectDef(IndirectReference): 

110 def __str__(self) -> str: 

111 return f"{self.object_id} {self.generation} obj" 

112 

113 

114class XrefTable: 

115 def __init__(self) -> None: 

116 self.existing_entries: dict[int, tuple[int, int]] = ( 

117 {} 

118 ) # object ID => (offset, generation) 

119 self.new_entries: dict[int, tuple[int, int]] = ( 

120 {} 

121 ) # object ID => (offset, generation) 

122 self.deleted_entries = {0: 65536} # object ID => generation 

123 self.reading_finished = False 

124 

125 def __setitem__(self, key: int, value: tuple[int, int]) -> None: 

126 if self.reading_finished: 

127 self.new_entries[key] = value 

128 else: 

129 self.existing_entries[key] = value 

130 if key in self.deleted_entries: 

131 del self.deleted_entries[key] 

132 

133 def __getitem__(self, key: int) -> tuple[int, int]: 

134 try: 

135 return self.new_entries[key] 

136 except KeyError: 

137 return self.existing_entries[key] 

138 

139 def __delitem__(self, key: int) -> None: 

140 if key in self.new_entries: 

141 generation = self.new_entries[key][1] + 1 

142 del self.new_entries[key] 

143 self.deleted_entries[key] = generation 

144 elif key in self.existing_entries: 

145 generation = self.existing_entries[key][1] + 1 

146 self.deleted_entries[key] = generation 

147 elif key in self.deleted_entries: 

148 generation = self.deleted_entries[key] 

149 else: 

150 msg = f"object ID {key} cannot be deleted because it doesn't exist" 

151 raise IndexError(msg) 

152 

153 def __contains__(self, key: int) -> bool: 

154 return key in self.existing_entries or key in self.new_entries 

155 

156 def __len__(self) -> int: 

157 return len( 

158 set(self.existing_entries.keys()) 

159 | set(self.new_entries.keys()) 

160 | set(self.deleted_entries.keys()) 

161 ) 

162 

163 def keys(self) -> set[int]: 

164 return ( 

165 set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) 

166 ) | set(self.new_entries.keys()) 

167 

168 def write(self, f: IO[bytes]) -> int: 

169 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) 

170 deleted_keys = sorted(set(self.deleted_entries.keys())) 

171 startxref = f.tell() 

172 f.write(b"xref\n") 

173 while keys: 

174 # find a contiguous sequence of object IDs 

175 prev: int | None = None 

176 for index, key in enumerate(keys): 

177 if prev is None or prev + 1 == key: 

178 prev = key 

179 else: 

180 contiguous_keys = keys[:index] 

181 keys = keys[index:] 

182 break 

183 else: 

184 contiguous_keys = keys 

185 keys = [] 

186 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) 

187 for object_id in contiguous_keys: 

188 if object_id in self.new_entries: 

189 f.write(b"%010d %05d n \n" % self.new_entries[object_id]) 

190 else: 

191 this_deleted_object_id = deleted_keys.pop(0) 

192 check_format_condition( 

193 object_id == this_deleted_object_id, 

194 f"expected the next deleted object ID to be {object_id}, " 

195 f"instead found {this_deleted_object_id}", 

196 ) 

197 try: 

198 next_in_linked_list = deleted_keys[0] 

199 except IndexError: 

200 next_in_linked_list = 0 

201 f.write( 

202 b"%010d %05d f \n" 

203 % (next_in_linked_list, self.deleted_entries[object_id]) 

204 ) 

205 return startxref 

206 

207 

208class PdfName: 

209 name: bytes 

210 

211 def __init__(self, name: PdfName | bytes | str) -> None: 

212 if isinstance(name, PdfName): 

213 self.name = name.name 

214 elif isinstance(name, bytes): 

215 self.name = name 

216 else: 

217 self.name = name.encode("us-ascii") 

218 

219 def name_as_str(self) -> str: 

220 return self.name.decode("us-ascii") 

221 

222 def __eq__(self, other: object) -> bool: 

223 return ( 

224 isinstance(other, PdfName) and other.name == self.name 

225 ) or other == self.name 

226 

227 def __hash__(self) -> int: 

228 return hash(self.name) 

229 

230 def __repr__(self) -> str: 

231 return f"{self.__class__.__name__}({repr(self.name)})" 

232 

233 @classmethod 

234 def from_pdf_stream(cls, data: bytes) -> PdfName: 

235 return cls(PdfParser.interpret_name(data)) 

236 

237 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} 

238 

239 def __bytes__(self) -> bytes: 

240 result = bytearray(b"/") 

241 for b in self.name: 

242 if b in self.allowed_chars: 

243 result.append(b) 

244 else: 

245 result.extend(b"#%02X" % b) 

246 return bytes(result) 

247 

248 

249class PdfArray(list[Any]): 

250 def __bytes__(self) -> bytes: 

251 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]" 

252 

253 

254if TYPE_CHECKING: 

255 _DictBase = collections.UserDict[Union[str, bytes], Any] 

256else: 

257 _DictBase = collections.UserDict 

258 

259 

260class PdfDict(_DictBase): 

261 def __setattr__(self, key: str, value: Any) -> None: 

262 if key == "data": 

263 collections.UserDict.__setattr__(self, key, value) 

264 else: 

265 self[key.encode("us-ascii")] = value 

266 

267 def __getattr__(self, key: str) -> str | time.struct_time: 

268 try: 

269 value = self[key.encode("us-ascii")] 

270 except KeyError as e: 

271 raise AttributeError(key) from e 

272 if isinstance(value, bytes): 

273 value = decode_text(value) 

274 if key.endswith("Date"): 

275 if value.startswith("D:"): 

276 value = value[2:] 

277 

278 relationship = "Z" 

279 if len(value) > 17: 

280 relationship = value[14] 

281 offset = int(value[15:17]) * 60 

282 if len(value) > 20: 

283 offset += int(value[18:20]) 

284 

285 format = "%Y%m%d%H%M%S"[: len(value) - 2] 

286 value = time.strptime(value[: len(format) + 2], format) 

287 if relationship in ["+", "-"]: 

288 offset *= 60 

289 if relationship == "+": 

290 offset *= -1 

291 value = time.gmtime(calendar.timegm(value) + offset) 

292 return value 

293 

294 def __bytes__(self) -> bytes: 

295 out = bytearray(b"<<") 

296 for key, value in self.items(): 

297 if value is None: 

298 continue 

299 value = pdf_repr(value) 

300 out.extend(b"\n") 

301 out.extend(bytes(PdfName(key))) 

302 out.extend(b" ") 

303 out.extend(value) 

304 out.extend(b"\n>>") 

305 return bytes(out) 

306 

307 

308class PdfBinary: 

309 def __init__(self, data: list[int] | bytes) -> None: 

310 self.data = data 

311 

312 def __bytes__(self) -> bytes: 

313 return b"<%s>" % b"".join(b"%02X" % b for b in self.data) 

314 

315 

316class PdfStream: 

317 def __init__(self, dictionary: PdfDict, buf: bytes) -> None: 

318 self.dictionary = dictionary 

319 self.buf = buf 

320 

321 def decode(self) -> bytes: 

322 try: 

323 filter = self.dictionary[b"Filter"] 

324 except KeyError: 

325 return self.buf 

326 if filter == b"FlateDecode": 

327 try: 

328 expected_length = self.dictionary[b"DL"] 

329 except KeyError: 

330 expected_length = self.dictionary[b"Length"] 

331 return zlib.decompress(self.buf, bufsize=int(expected_length)) 

332 else: 

333 msg = f"stream filter {repr(filter)} unknown/unsupported" 

334 raise NotImplementedError(msg) 

335 

336 

337def pdf_repr(x: Any) -> bytes: 

338 if x is True: 

339 return b"true" 

340 elif x is False: 

341 return b"false" 

342 elif x is None: 

343 return b"null" 

344 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)): 

345 return bytes(x) 

346 elif isinstance(x, (int, float)): 

347 return str(x).encode("us-ascii") 

348 elif isinstance(x, time.struct_time): 

349 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")" 

350 elif isinstance(x, dict): 

351 return bytes(PdfDict(x)) 

352 elif isinstance(x, list): 

353 return bytes(PdfArray(x)) 

354 elif isinstance(x, str): 

355 return pdf_repr(encode_text(x)) 

356 elif isinstance(x, bytes): 

357 # XXX escape more chars? handle binary garbage 

358 x = x.replace(b"\\", b"\\\\") 

359 x = x.replace(b"(", b"\\(") 

360 x = x.replace(b")", b"\\)") 

361 return b"(" + x + b")" 

362 else: 

363 return bytes(x) 

364 

365 

366class PdfParser: 

367 """Based on 

368 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf 

369 Supports PDF up to 1.4 

370 """ 

371 

372 def __init__( 

373 self, 

374 filename: str | None = None, 

375 f: IO[bytes] | None = None, 

376 buf: bytes | bytearray | None = None, 

377 start_offset: int = 0, 

378 mode: str = "rb", 

379 ) -> None: 

380 if buf and f: 

381 msg = "specify buf or f or filename, but not both buf and f" 

382 raise RuntimeError(msg) 

383 self.filename = filename 

384 self.buf: bytes | bytearray | mmap.mmap | None = buf 

385 self.f = f 

386 self.start_offset = start_offset 

387 self.should_close_buf = False 

388 self.should_close_file = False 

389 if filename is not None and f is None: 

390 self.f = f = open(filename, mode) 

391 self.should_close_file = True 

392 if f is not None: 

393 self.buf = self.get_buf_from_file(f) 

394 self.should_close_buf = True 

395 if not filename and hasattr(f, "name"): 

396 self.filename = f.name 

397 self.cached_objects: dict[IndirectReference, Any] = {} 

398 self.root_ref: IndirectReference | None 

399 self.info_ref: IndirectReference | None 

400 self.pages_ref: IndirectReference | None 

401 self.last_xref_section_offset: int | None 

402 if self.buf: 

403 self.read_pdf_info() 

404 else: 

405 self.file_size_total = self.file_size_this = 0 

406 self.root = PdfDict() 

407 self.root_ref = None 

408 self.info = PdfDict() 

409 self.info_ref = None 

410 self.page_tree_root = PdfDict() 

411 self.pages: list[IndirectReference] = [] 

412 self.orig_pages: list[IndirectReference] = [] 

413 self.pages_ref = None 

414 self.last_xref_section_offset = None 

415 self.trailer_dict: dict[bytes, Any] = {} 

416 self.xref_table = XrefTable() 

417 self.xref_table.reading_finished = True 

418 if f: 

419 self.seek_end() 

420 

421 def __enter__(self) -> PdfParser: 

422 return self 

423 

424 def __exit__(self, *args: object) -> None: 

425 self.close() 

426 

427 def start_writing(self) -> None: 

428 self.close_buf() 

429 self.seek_end() 

430 

431 def close_buf(self) -> None: 

432 if isinstance(self.buf, mmap.mmap): 

433 self.buf.close() 

434 self.buf = None 

435 

436 def close(self) -> None: 

437 if self.should_close_buf: 

438 self.close_buf() 

439 if self.f is not None and self.should_close_file: 

440 self.f.close() 

441 self.f = None 

442 

443 def seek_end(self) -> None: 

444 assert self.f is not None 

445 self.f.seek(0, os.SEEK_END) 

446 

447 def write_header(self) -> None: 

448 assert self.f is not None 

449 self.f.write(b"%PDF-1.4\n") 

450 

451 def write_comment(self, s: str) -> None: 

452 assert self.f is not None 

453 self.f.write(f"% {s}\n".encode()) 

454 

455 def write_catalog(self) -> IndirectReference: 

456 assert self.f is not None 

457 self.del_root() 

458 self.root_ref = self.next_object_id(self.f.tell()) 

459 self.pages_ref = self.next_object_id(0) 

460 self.rewrite_pages() 

461 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref) 

462 self.write_obj( 

463 self.pages_ref, 

464 Type=PdfName(b"Pages"), 

465 Count=len(self.pages), 

466 Kids=self.pages, 

467 ) 

468 return self.root_ref 

469 

470 def rewrite_pages(self) -> None: 

471 pages_tree_nodes_to_delete = [] 

472 for i, page_ref in enumerate(self.orig_pages): 

473 page_info = self.cached_objects[page_ref] 

474 del self.xref_table[page_ref.object_id] 

475 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")]) 

476 if page_ref not in self.pages: 

477 # the page has been deleted 

478 continue 

479 # make dict keys into strings for passing to write_page 

480 stringified_page_info = {} 

481 for key, value in page_info.items(): 

482 # key should be a PdfName 

483 stringified_page_info[key.name_as_str()] = value 

484 stringified_page_info["Parent"] = self.pages_ref 

485 new_page_ref = self.write_page(None, **stringified_page_info) 

486 for j, cur_page_ref in enumerate(self.pages): 

487 if cur_page_ref == page_ref: 

488 # replace the page reference with the new one 

489 self.pages[j] = new_page_ref 

490 # delete redundant Pages tree nodes from xref table 

491 for pages_tree_node_ref in pages_tree_nodes_to_delete: 

492 while pages_tree_node_ref: 

493 pages_tree_node = self.cached_objects[pages_tree_node_ref] 

494 if pages_tree_node_ref.object_id in self.xref_table: 

495 del self.xref_table[pages_tree_node_ref.object_id] 

496 pages_tree_node_ref = pages_tree_node.get(b"Parent", None) 

497 self.orig_pages = [] 

498 

499 def write_xref_and_trailer( 

500 self, new_root_ref: IndirectReference | None = None 

501 ) -> None: 

502 assert self.f is not None 

503 if new_root_ref: 

504 self.del_root() 

505 self.root_ref = new_root_ref 

506 if self.info: 

507 self.info_ref = self.write_obj(None, self.info) 

508 start_xref = self.xref_table.write(self.f) 

509 num_entries = len(self.xref_table) 

510 trailer_dict: dict[str | bytes, Any] = { 

511 b"Root": self.root_ref, 

512 b"Size": num_entries, 

513 } 

514 if self.last_xref_section_offset is not None: 

515 trailer_dict[b"Prev"] = self.last_xref_section_offset 

516 if self.info: 

517 trailer_dict[b"Info"] = self.info_ref 

518 self.last_xref_section_offset = start_xref 

519 self.f.write( 

520 b"trailer\n" 

521 + bytes(PdfDict(trailer_dict)) 

522 + b"\nstartxref\n%d\n%%%%EOF" % start_xref 

523 ) 

524 

525 def write_page( 

526 self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any 

527 ) -> IndirectReference: 

528 obj_ref = self.pages[ref] if isinstance(ref, int) else ref 

529 if "Type" not in dict_obj: 

530 dict_obj["Type"] = PdfName(b"Page") 

531 if "Parent" not in dict_obj: 

532 dict_obj["Parent"] = self.pages_ref 

533 return self.write_obj(obj_ref, *objs, **dict_obj) 

534 

535 def write_obj( 

536 self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any 

537 ) -> IndirectReference: 

538 assert self.f is not None 

539 f = self.f 

540 if ref is None: 

541 ref = self.next_object_id(f.tell()) 

542 else: 

543 self.xref_table[ref.object_id] = (f.tell(), ref.generation) 

544 f.write(bytes(IndirectObjectDef(*ref))) 

545 stream = dict_obj.pop("stream", None) 

546 if stream is not None: 

547 dict_obj["Length"] = len(stream) 

548 if dict_obj: 

549 f.write(pdf_repr(dict_obj)) 

550 for obj in objs: 

551 f.write(pdf_repr(obj)) 

552 if stream is not None: 

553 f.write(b"stream\n") 

554 f.write(stream) 

555 f.write(b"\nendstream\n") 

556 f.write(b"endobj\n") 

557 return ref 

558 

559 def del_root(self) -> None: 

560 if self.root_ref is None: 

561 return 

562 del self.xref_table[self.root_ref.object_id] 

563 del self.xref_table[self.root[b"Pages"].object_id] 

564 

565 @staticmethod 

566 def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap: 

567 if hasattr(f, "getbuffer"): 

568 return f.getbuffer() 

569 elif hasattr(f, "getvalue"): 

570 return f.getvalue() 

571 else: 

572 try: 

573 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 

574 except ValueError: # cannot mmap an empty file 

575 return b"" 

576 

577 def read_pdf_info(self) -> None: 

578 assert self.buf is not None 

579 self.file_size_total = len(self.buf) 

580 self.file_size_this = self.file_size_total - self.start_offset 

581 self.read_trailer() 

582 check_format_condition( 

583 self.trailer_dict.get(b"Root") is not None, "Root is missing" 

584 ) 

585 self.root_ref = self.trailer_dict[b"Root"] 

586 assert self.root_ref is not None 

587 self.info_ref = self.trailer_dict.get(b"Info", None) 

588 self.root = PdfDict(self.read_indirect(self.root_ref)) 

589 if self.info_ref is None: 

590 self.info = PdfDict() 

591 else: 

592 self.info = PdfDict(self.read_indirect(self.info_ref)) 

593 check_format_condition(b"Type" in self.root, "/Type missing in Root") 

594 check_format_condition( 

595 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" 

596 ) 

597 check_format_condition( 

598 self.root.get(b"Pages") is not None, "/Pages missing in Root" 

599 ) 

600 check_format_condition( 

601 isinstance(self.root[b"Pages"], IndirectReference), 

602 "/Pages in Root is not an indirect reference", 

603 ) 

604 self.pages_ref = self.root[b"Pages"] 

605 assert self.pages_ref is not None 

606 self.page_tree_root = self.read_indirect(self.pages_ref) 

607 self.pages = self.linearize_page_tree(self.page_tree_root) 

608 # save the original list of page references 

609 # in case the user modifies, adds or deletes some pages 

610 # and we need to rewrite the pages and their list 

611 self.orig_pages = self.pages[:] 

612 

613 def next_object_id(self, offset: int | None = None) -> IndirectReference: 

614 try: 

615 # TODO: support reuse of deleted objects 

616 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) 

617 except ValueError: 

618 reference = IndirectReference(1, 0) 

619 if offset is not None: 

620 self.xref_table[reference.object_id] = (offset, 0) 

621 return reference 

622 

623 delimiter = rb"[][()<>{}/%]" 

624 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]" 

625 whitespace = rb"[\000\011\012\014\015\040]" 

626 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]" 

627 whitespace_optional = whitespace + b"*" 

628 whitespace_mandatory = whitespace + b"+" 

629 # No "\012" aka "\n" or "\015" aka "\r": 

630 whitespace_optional_no_nl = rb"[\000\011\014\040]*" 

631 newline_only = rb"[\r\n]+" 

632 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl 

633 re_trailer_end = re.compile( 

634 whitespace_mandatory 

635 + rb"trailer" 

636 + whitespace_optional 

637 + rb"<<(.*>>)" 

638 + newline 

639 + rb"startxref" 

640 + newline 

641 + rb"([0-9]+)" 

642 + newline 

643 + rb"%%EOF" 

644 + whitespace_optional 

645 + rb"$", 

646 re.DOTALL, 

647 ) 

648 re_trailer_prev = re.compile( 

649 whitespace_optional 

650 + rb"trailer" 

651 + whitespace_optional 

652 + rb"<<(.*?>>)" 

653 + newline 

654 + rb"startxref" 

655 + newline 

656 + rb"([0-9]+)" 

657 + newline 

658 + rb"%%EOF" 

659 + whitespace_optional, 

660 re.DOTALL, 

661 ) 

662 

663 def read_trailer(self) -> None: 

664 assert self.buf is not None 

665 search_start_offset = len(self.buf) - 16384 

666 if search_start_offset < self.start_offset: 

667 search_start_offset = self.start_offset 

668 m = self.re_trailer_end.search(self.buf, search_start_offset) 

669 check_format_condition(m is not None, "trailer end not found") 

670 # make sure we found the LAST trailer 

671 last_match = m 

672 while m: 

673 last_match = m 

674 m = self.re_trailer_end.search(self.buf, m.start() + 16) 

675 if not m: 

676 m = last_match 

677 assert m is not None 

678 trailer_data = m.group(1) 

679 self.last_xref_section_offset = int(m.group(2)) 

680 self.trailer_dict = self.interpret_trailer(trailer_data) 

681 self.xref_table = XrefTable() 

682 self.read_xref_table(xref_section_offset=self.last_xref_section_offset) 

683 if b"Prev" in self.trailer_dict: 

684 self.read_prev_trailer(self.trailer_dict[b"Prev"]) 

685 

686 def read_prev_trailer(self, xref_section_offset: int) -> None: 

687 assert self.buf is not None 

688 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) 

689 m = self.re_trailer_prev.search( 

690 self.buf[trailer_offset : trailer_offset + 16384] 

691 ) 

692 check_format_condition(m is not None, "previous trailer not found") 

693 assert m is not None 

694 trailer_data = m.group(1) 

695 check_format_condition( 

696 int(m.group(2)) == xref_section_offset, 

697 "xref section offset in previous trailer doesn't match what was expected", 

698 ) 

699 trailer_dict = self.interpret_trailer(trailer_data) 

700 if b"Prev" in trailer_dict: 

701 self.read_prev_trailer(trailer_dict[b"Prev"]) 

702 

703 re_whitespace_optional = re.compile(whitespace_optional) 

704 re_name = re.compile( 

705 whitespace_optional 

706 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?=" 

707 + delimiter_or_ws 

708 + rb")" 

709 ) 

710 re_dict_start = re.compile(whitespace_optional + rb"<<") 

711 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) 

712 

713 @classmethod 

714 def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]: 

715 trailer = {} 

716 offset = 0 

717 while True: 

718 m = cls.re_name.match(trailer_data, offset) 

719 if not m: 

720 m = cls.re_dict_end.match(trailer_data, offset) 

721 check_format_condition( 

722 m is not None and m.end() == len(trailer_data), 

723 "name not found in trailer, remaining data: " 

724 + repr(trailer_data[offset:]), 

725 ) 

726 break 

727 key = cls.interpret_name(m.group(1)) 

728 assert isinstance(key, bytes) 

729 value, value_offset = cls.get_value(trailer_data, m.end()) 

730 trailer[key] = value 

731 if value_offset is None: 

732 break 

733 offset = value_offset 

734 check_format_condition( 

735 b"Size" in trailer and isinstance(trailer[b"Size"], int), 

736 "/Size not in trailer or not an integer", 

737 ) 

738 check_format_condition( 

739 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference), 

740 "/Root not in trailer or not an indirect reference", 

741 ) 

742 return trailer 

743 

744 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") 

745 

746 @classmethod 

747 def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes: 

748 name = b"" 

749 for m in cls.re_hashes_in_name.finditer(raw): 

750 if m.group(3): 

751 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii")) 

752 else: 

753 name += m.group(1) 

754 if as_text: 

755 return name.decode("utf-8") 

756 else: 

757 return bytes(name) 

758 

759 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")") 

760 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")") 

761 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")") 

762 re_int = re.compile( 

763 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")" 

764 ) 

765 re_real = re.compile( 

766 whitespace_optional 

767 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?=" 

768 + delimiter_or_ws 

769 + rb")" 

770 ) 

771 re_array_start = re.compile(whitespace_optional + rb"\[") 

772 re_array_end = re.compile(whitespace_optional + rb"]") 

773 re_string_hex = re.compile( 

774 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>" 

775 ) 

776 re_string_lit = re.compile(whitespace_optional + rb"\(") 

777 re_indirect_reference = re.compile( 

778 whitespace_optional 

779 + rb"([-+]?[0-9]+)" 

780 + whitespace_mandatory 

781 + rb"([-+]?[0-9]+)" 

782 + whitespace_mandatory 

783 + rb"R(?=" 

784 + delimiter_or_ws 

785 + rb")" 

786 ) 

787 re_indirect_def_start = re.compile( 

788 whitespace_optional 

789 + rb"([-+]?[0-9]+)" 

790 + whitespace_mandatory 

791 + rb"([-+]?[0-9]+)" 

792 + whitespace_mandatory 

793 + rb"obj(?=" 

794 + delimiter_or_ws 

795 + rb")" 

796 ) 

797 re_indirect_def_end = re.compile( 

798 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")" 

799 ) 

800 re_comment = re.compile( 

801 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*" 

802 ) 

803 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n") 

804 re_stream_end = re.compile( 

805 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")" 

806 ) 

807 

808 @classmethod 

809 def get_value( 

810 cls, 

811 data: bytes | bytearray | mmap.mmap, 

812 offset: int, 

813 expect_indirect: IndirectReference | None = None, 

814 max_nesting: int = -1, 

815 ) -> tuple[Any, int | None]: 

816 if max_nesting == 0: 

817 return None, None 

818 m = cls.re_comment.match(data, offset) 

819 if m: 

820 offset = m.end() 

821 m = cls.re_indirect_def_start.match(data, offset) 

822 if m: 

823 check_format_condition( 

824 int(m.group(1)) > 0, 

825 "indirect object definition: object ID must be greater than 0", 

826 ) 

827 check_format_condition( 

828 int(m.group(2)) >= 0, 

829 "indirect object definition: generation must be non-negative", 

830 ) 

831 check_format_condition( 

832 expect_indirect is None 

833 or expect_indirect 

834 == IndirectReference(int(m.group(1)), int(m.group(2))), 

835 "indirect object definition different than expected", 

836 ) 

837 object, object_offset = cls.get_value( 

838 data, m.end(), max_nesting=max_nesting - 1 

839 ) 

840 if object_offset is None: 

841 return object, None 

842 m = cls.re_indirect_def_end.match(data, object_offset) 

843 check_format_condition( 

844 m is not None, "indirect object definition end not found" 

845 ) 

846 assert m is not None 

847 return object, m.end() 

848 check_format_condition( 

849 not expect_indirect, "indirect object definition not found" 

850 ) 

851 m = cls.re_indirect_reference.match(data, offset) 

852 if m: 

853 check_format_condition( 

854 int(m.group(1)) > 0, 

855 "indirect object reference: object ID must be greater than 0", 

856 ) 

857 check_format_condition( 

858 int(m.group(2)) >= 0, 

859 "indirect object reference: generation must be non-negative", 

860 ) 

861 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end() 

862 m = cls.re_dict_start.match(data, offset) 

863 if m: 

864 offset = m.end() 

865 result: dict[Any, Any] = {} 

866 m = cls.re_dict_end.match(data, offset) 

867 current_offset: int | None = offset 

868 while not m: 

869 assert current_offset is not None 

870 key, current_offset = cls.get_value( 

871 data, current_offset, max_nesting=max_nesting - 1 

872 ) 

873 if current_offset is None: 

874 return result, None 

875 value, current_offset = cls.get_value( 

876 data, current_offset, max_nesting=max_nesting - 1 

877 ) 

878 result[key] = value 

879 if current_offset is None: 

880 return result, None 

881 m = cls.re_dict_end.match(data, current_offset) 

882 current_offset = m.end() 

883 m = cls.re_stream_start.match(data, current_offset) 

884 if m: 

885 stream_len = result.get(b"Length") 

886 if stream_len is None or not isinstance(stream_len, int): 

887 msg = f"bad or missing Length in stream dict ({stream_len})" 

888 raise PdfFormatError(msg) 

889 stream_data = data[m.end() : m.end() + stream_len] 

890 m = cls.re_stream_end.match(data, m.end() + stream_len) 

891 check_format_condition(m is not None, "stream end not found") 

892 assert m is not None 

893 current_offset = m.end() 

894 return PdfStream(PdfDict(result), stream_data), current_offset 

895 return PdfDict(result), current_offset 

896 m = cls.re_array_start.match(data, offset) 

897 if m: 

898 offset = m.end() 

899 results = [] 

900 m = cls.re_array_end.match(data, offset) 

901 current_offset = offset 

902 while not m: 

903 assert current_offset is not None 

904 value, current_offset = cls.get_value( 

905 data, current_offset, max_nesting=max_nesting - 1 

906 ) 

907 results.append(value) 

908 if current_offset is None: 

909 return results, None 

910 m = cls.re_array_end.match(data, current_offset) 

911 return results, m.end() 

912 m = cls.re_null.match(data, offset) 

913 if m: 

914 return None, m.end() 

915 m = cls.re_true.match(data, offset) 

916 if m: 

917 return True, m.end() 

918 m = cls.re_false.match(data, offset) 

919 if m: 

920 return False, m.end() 

921 m = cls.re_name.match(data, offset) 

922 if m: 

923 return PdfName(cls.interpret_name(m.group(1))), m.end() 

924 m = cls.re_int.match(data, offset) 

925 if m: 

926 return int(m.group(1)), m.end() 

927 m = cls.re_real.match(data, offset) 

928 if m: 

929 # XXX Decimal instead of float??? 

930 return float(m.group(1)), m.end() 

931 m = cls.re_string_hex.match(data, offset) 

932 if m: 

933 # filter out whitespace 

934 hex_string = bytearray( 

935 b for b in m.group(1) if b in b"0123456789abcdefABCDEF" 

936 ) 

937 if len(hex_string) % 2 == 1: 

938 # append a 0 if the length is not even - yes, at the end 

939 hex_string.append(ord(b"0")) 

940 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end() 

941 m = cls.re_string_lit.match(data, offset) 

942 if m: 

943 return cls.get_literal_string(data, m.end()) 

944 # return None, offset # fallback (only for debugging) 

945 msg = f"unrecognized object: {repr(data[offset : offset + 32])}" 

946 raise PdfFormatError(msg) 

947 

948 re_lit_str_token = re.compile( 

949 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))" 

950 ) 

951 escaped_chars = { 

952 b"n": b"\n", 

953 b"r": b"\r", 

954 b"t": b"\t", 

955 b"b": b"\b", 

956 b"f": b"\f", 

957 b"(": b"(", 

958 b")": b")", 

959 b"\\": b"\\", 

960 ord(b"n"): b"\n", 

961 ord(b"r"): b"\r", 

962 ord(b"t"): b"\t", 

963 ord(b"b"): b"\b", 

964 ord(b"f"): b"\f", 

965 ord(b"("): b"(", 

966 ord(b")"): b")", 

967 ord(b"\\"): b"\\", 

968 } 

969 

970 @classmethod 

971 def get_literal_string( 

972 cls, data: bytes | bytearray | mmap.mmap, offset: int 

973 ) -> tuple[bytes, int]: 

974 nesting_depth = 0 

975 result = bytearray() 

976 for m in cls.re_lit_str_token.finditer(data, offset): 

977 result.extend(data[offset : m.start()]) 

978 if m.group(1): 

979 result.extend(cls.escaped_chars[m.group(1)[1]]) 

980 elif m.group(2): 

981 result.append(int(m.group(2)[1:], 8)) 

982 elif m.group(3): 

983 pass 

984 elif m.group(5): 

985 result.extend(b"\n") 

986 elif m.group(6): 

987 result.extend(b"(") 

988 nesting_depth += 1 

989 elif m.group(7): 

990 if nesting_depth == 0: 

991 return bytes(result), m.end() 

992 result.extend(b")") 

993 nesting_depth -= 1 

994 offset = m.end() 

995 msg = "unfinished literal string" 

996 raise PdfFormatError(msg) 

997 

998 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline) 

999 re_xref_subsection_start = re.compile( 

1000 whitespace_optional 

1001 + rb"([0-9]+)" 

1002 + whitespace_mandatory 

1003 + rb"([0-9]+)" 

1004 + whitespace_optional 

1005 + newline_only 

1006 ) 

1007 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") 

1008 

1009 def read_xref_table(self, xref_section_offset: int) -> int: 

1010 assert self.buf is not None 

1011 subsection_found = False 

1012 m = self.re_xref_section_start.match( 

1013 self.buf, xref_section_offset + self.start_offset 

1014 ) 

1015 check_format_condition(m is not None, "xref section start not found") 

1016 assert m is not None 

1017 offset = m.end() 

1018 while True: 

1019 m = self.re_xref_subsection_start.match(self.buf, offset) 

1020 if not m: 

1021 check_format_condition( 

1022 subsection_found, "xref subsection start not found" 

1023 ) 

1024 break 

1025 subsection_found = True 

1026 offset = m.end() 

1027 first_object = int(m.group(1)) 

1028 num_objects = int(m.group(2)) 

1029 for i in range(first_object, first_object + num_objects): 

1030 m = self.re_xref_entry.match(self.buf, offset) 

1031 check_format_condition(m is not None, "xref entry not found") 

1032 assert m is not None 

1033 offset = m.end() 

1034 is_free = m.group(3) == b"f" 

1035 if not is_free: 

1036 generation = int(m.group(2)) 

1037 new_entry = (int(m.group(1)), generation) 

1038 if i not in self.xref_table: 

1039 self.xref_table[i] = new_entry 

1040 return offset 

1041 

1042 def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any: 

1043 offset, generation = self.xref_table[ref[0]] 

1044 check_format_condition( 

1045 generation == ref[1], 

1046 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " 

1047 f"table, instead found generation {generation} at offset {offset}", 

1048 ) 

1049 assert self.buf is not None 

1050 value = self.get_value( 

1051 self.buf, 

1052 offset + self.start_offset, 

1053 expect_indirect=IndirectReference(*ref), 

1054 max_nesting=max_nesting, 

1055 )[0] 

1056 self.cached_objects[ref] = value 

1057 return value 

1058 

1059 def linearize_page_tree( 

1060 self, node: PdfDict | None = None 

1061 ) -> list[IndirectReference]: 

1062 page_node = node if node is not None else self.page_tree_root 

1063 check_format_condition( 

1064 page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" 

1065 ) 

1066 pages = [] 

1067 for kid in page_node[b"Kids"]: 

1068 kid_object = self.read_indirect(kid) 

1069 if kid_object[b"Type"] == b"Page": 

1070 pages.append(kid) 

1071 else: 

1072 pages.extend(self.linearize_page_tree(node=kid_object)) 

1073 return pages