Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/PdfParser.py: 19%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

679 statements  

1from __future__ import annotations 

2 

3import calendar 

4import codecs 

5import collections 

6import mmap 

7import os 

8import re 

9import time 

10import zlib 

11from typing import IO, Any, NamedTuple, Union 

12 

13 

14# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set 

15# on page 656 

16def encode_text(s: str) -> bytes: 

17 return codecs.BOM_UTF16_BE + s.encode("utf_16_be") 

18 

19 

20PDFDocEncoding = { 

21 0x16: "\u0017", 

22 0x18: "\u02d8", 

23 0x19: "\u02c7", 

24 0x1A: "\u02c6", 

25 0x1B: "\u02d9", 

26 0x1C: "\u02dd", 

27 0x1D: "\u02db", 

28 0x1E: "\u02da", 

29 0x1F: "\u02dc", 

30 0x80: "\u2022", 

31 0x81: "\u2020", 

32 0x82: "\u2021", 

33 0x83: "\u2026", 

34 0x84: "\u2014", 

35 0x85: "\u2013", 

36 0x86: "\u0192", 

37 0x87: "\u2044", 

38 0x88: "\u2039", 

39 0x89: "\u203a", 

40 0x8A: "\u2212", 

41 0x8B: "\u2030", 

42 0x8C: "\u201e", 

43 0x8D: "\u201c", 

44 0x8E: "\u201d", 

45 0x8F: "\u2018", 

46 0x90: "\u2019", 

47 0x91: "\u201a", 

48 0x92: "\u2122", 

49 0x93: "\ufb01", 

50 0x94: "\ufb02", 

51 0x95: "\u0141", 

52 0x96: "\u0152", 

53 0x97: "\u0160", 

54 0x98: "\u0178", 

55 0x99: "\u017d", 

56 0x9A: "\u0131", 

57 0x9B: "\u0142", 

58 0x9C: "\u0153", 

59 0x9D: "\u0161", 

60 0x9E: "\u017e", 

61 0xA0: "\u20ac", 

62} 

63 

64 

65def decode_text(b: bytes) -> str: 

66 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: 

67 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") 

68 else: 

69 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b) 

70 

71 

72class PdfFormatError(RuntimeError): 

73 """An error that probably indicates a syntactic or semantic error in the 

74 PDF file structure""" 

75 

76 pass 

77 

78 

79def check_format_condition(condition: bool, error_message: str) -> None: 

80 if not condition: 

81 raise PdfFormatError(error_message) 

82 

83 

84class IndirectReferenceTuple(NamedTuple): 

85 object_id: int 

86 generation: int 

87 

88 

89class IndirectReference(IndirectReferenceTuple): 

90 def __str__(self) -> str: 

91 return f"{self.object_id} {self.generation} R" 

92 

93 def __bytes__(self) -> bytes: 

94 return self.__str__().encode("us-ascii") 

95 

96 def __eq__(self, other: object) -> bool: 

97 if self.__class__ is not other.__class__: 

98 return False 

99 assert isinstance(other, IndirectReference) 

100 return other.object_id == self.object_id and other.generation == self.generation 

101 

102 def __ne__(self, other: object) -> bool: 

103 return not (self == other) 

104 

105 def __hash__(self) -> int: 

106 return hash((self.object_id, self.generation)) 

107 

108 

109class IndirectObjectDef(IndirectReference): 

110 def __str__(self) -> str: 

111 return f"{self.object_id} {self.generation} obj" 

112 

113 

114class XrefTable: 

115 def __init__(self) -> None: 

116 self.existing_entries: dict[int, tuple[int, int]] = ( 

117 {} 

118 ) # object ID => (offset, generation) 

119 self.new_entries: dict[int, tuple[int, int]] = ( 

120 {} 

121 ) # object ID => (offset, generation) 

122 self.deleted_entries = {0: 65536} # object ID => generation 

123 self.reading_finished = False 

124 

125 def __setitem__(self, key: int, value: tuple[int, int]) -> None: 

126 if self.reading_finished: 

127 self.new_entries[key] = value 

128 else: 

129 self.existing_entries[key] = value 

130 if key in self.deleted_entries: 

131 del self.deleted_entries[key] 

132 

133 def __getitem__(self, key: int) -> tuple[int, int]: 

134 try: 

135 return self.new_entries[key] 

136 except KeyError: 

137 return self.existing_entries[key] 

138 

139 def __delitem__(self, key: int) -> None: 

140 if key in self.new_entries: 

141 generation = self.new_entries[key][1] + 1 

142 del self.new_entries[key] 

143 self.deleted_entries[key] = generation 

144 elif key in self.existing_entries: 

145 generation = self.existing_entries[key][1] + 1 

146 self.deleted_entries[key] = generation 

147 elif key in self.deleted_entries: 

148 generation = self.deleted_entries[key] 

149 else: 

150 msg = f"object ID {key} cannot be deleted because it doesn't exist" 

151 raise IndexError(msg) 

152 

153 def __contains__(self, key: int) -> bool: 

154 return key in self.existing_entries or key in self.new_entries 

155 

156 def __len__(self) -> int: 

157 return len( 

158 set(self.existing_entries.keys()) 

159 | set(self.new_entries.keys()) 

160 | set(self.deleted_entries.keys()) 

161 ) 

162 

163 def keys(self) -> set[int]: 

164 return ( 

165 set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) 

166 ) | set(self.new_entries.keys()) 

167 

168 def write(self, f: IO[bytes]) -> int: 

169 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) 

170 deleted_keys = sorted(set(self.deleted_entries.keys())) 

171 startxref = f.tell() 

172 f.write(b"xref\n") 

173 while keys: 

174 # find a contiguous sequence of object IDs 

175 prev: int | None = None 

176 for index, key in enumerate(keys): 

177 if prev is None or prev + 1 == key: 

178 prev = key 

179 else: 

180 contiguous_keys = keys[:index] 

181 keys = keys[index:] 

182 break 

183 else: 

184 contiguous_keys = keys 

185 keys = [] 

186 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) 

187 for object_id in contiguous_keys: 

188 if object_id in self.new_entries: 

189 f.write(b"%010d %05d n \n" % self.new_entries[object_id]) 

190 else: 

191 this_deleted_object_id = deleted_keys.pop(0) 

192 check_format_condition( 

193 object_id == this_deleted_object_id, 

194 f"expected the next deleted object ID to be {object_id}, " 

195 f"instead found {this_deleted_object_id}", 

196 ) 

197 try: 

198 next_in_linked_list = deleted_keys[0] 

199 except IndexError: 

200 next_in_linked_list = 0 

201 f.write( 

202 b"%010d %05d f \n" 

203 % (next_in_linked_list, self.deleted_entries[object_id]) 

204 ) 

205 return startxref 

206 

207 

208class PdfName: 

209 name: bytes 

210 

211 def __init__(self, name: PdfName | bytes | str) -> None: 

212 if isinstance(name, PdfName): 

213 self.name = name.name 

214 elif isinstance(name, bytes): 

215 self.name = name 

216 else: 

217 self.name = name.encode("us-ascii") 

218 

219 def name_as_str(self) -> str: 

220 return self.name.decode("us-ascii") 

221 

222 def __eq__(self, other: object) -> bool: 

223 return ( 

224 isinstance(other, PdfName) and other.name == self.name 

225 ) or other == self.name 

226 

227 def __hash__(self) -> int: 

228 return hash(self.name) 

229 

230 def __repr__(self) -> str: 

231 return f"{self.__class__.__name__}({repr(self.name)})" 

232 

233 @classmethod 

234 def from_pdf_stream(cls, data: bytes) -> PdfName: 

235 return cls(PdfParser.interpret_name(data)) 

236 

237 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} 

238 

239 def __bytes__(self) -> bytes: 

240 result = bytearray(b"/") 

241 for b in self.name: 

242 if b in self.allowed_chars: 

243 result.append(b) 

244 else: 

245 result.extend(b"#%02X" % b) 

246 return bytes(result) 

247 

248 

249class PdfArray(list[Any]): 

250 def __bytes__(self) -> bytes: 

251 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]" 

252 

253 

254TYPE_CHECKING = False 

255if TYPE_CHECKING: 

256 _DictBase = collections.UserDict[Union[str, bytes], Any] 

257else: 

258 _DictBase = collections.UserDict 

259 

260 

261class PdfDict(_DictBase): 

262 def __setattr__(self, key: str, value: Any) -> None: 

263 if key == "data": 

264 collections.UserDict.__setattr__(self, key, value) 

265 else: 

266 self[key.encode("us-ascii")] = value 

267 

268 def __getattr__(self, key: str) -> str | time.struct_time: 

269 try: 

270 value = self[key.encode("us-ascii")] 

271 except KeyError as e: 

272 raise AttributeError(key) from e 

273 if isinstance(value, bytes): 

274 value = decode_text(value) 

275 if key.endswith("Date"): 

276 if value.startswith("D:"): 

277 value = value[2:] 

278 

279 relationship = "Z" 

280 if len(value) > 17: 

281 relationship = value[14] 

282 offset = int(value[15:17]) * 60 

283 if len(value) > 20: 

284 offset += int(value[18:20]) 

285 

286 format = "%Y%m%d%H%M%S"[: len(value) - 2] 

287 value = time.strptime(value[: len(format) + 2], format) 

288 if relationship in ["+", "-"]: 

289 offset *= 60 

290 if relationship == "+": 

291 offset *= -1 

292 value = time.gmtime(calendar.timegm(value) + offset) 

293 return value 

294 

295 def __bytes__(self) -> bytes: 

296 out = bytearray(b"<<") 

297 for key, value in self.items(): 

298 if value is None: 

299 continue 

300 value = pdf_repr(value) 

301 out.extend(b"\n") 

302 out.extend(bytes(PdfName(key))) 

303 out.extend(b" ") 

304 out.extend(value) 

305 out.extend(b"\n>>") 

306 return bytes(out) 

307 

308 

309class PdfBinary: 

310 def __init__(self, data: list[int] | bytes) -> None: 

311 self.data = data 

312 

313 def __bytes__(self) -> bytes: 

314 return b"<%s>" % b"".join(b"%02X" % b for b in self.data) 

315 

316 

317class PdfStream: 

318 def __init__(self, dictionary: PdfDict, buf: bytes) -> None: 

319 self.dictionary = dictionary 

320 self.buf = buf 

321 

322 def decode(self) -> bytes: 

323 try: 

324 filter = self.dictionary[b"Filter"] 

325 except KeyError: 

326 return self.buf 

327 if filter == b"FlateDecode": 

328 try: 

329 expected_length = self.dictionary[b"DL"] 

330 except KeyError: 

331 expected_length = self.dictionary[b"Length"] 

332 return zlib.decompress(self.buf, bufsize=int(expected_length)) 

333 else: 

334 msg = f"stream filter {repr(filter)} unknown/unsupported" 

335 raise NotImplementedError(msg) 

336 

337 

338def pdf_repr(x: Any) -> bytes: 

339 if x is True: 

340 return b"true" 

341 elif x is False: 

342 return b"false" 

343 elif x is None: 

344 return b"null" 

345 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)): 

346 return bytes(x) 

347 elif isinstance(x, (int, float)): 

348 return str(x).encode("us-ascii") 

349 elif isinstance(x, time.struct_time): 

350 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")" 

351 elif isinstance(x, dict): 

352 return bytes(PdfDict(x)) 

353 elif isinstance(x, list): 

354 return bytes(PdfArray(x)) 

355 elif isinstance(x, str): 

356 return pdf_repr(encode_text(x)) 

357 elif isinstance(x, bytes): 

358 # XXX escape more chars? handle binary garbage 

359 x = x.replace(b"\\", b"\\\\") 

360 x = x.replace(b"(", b"\\(") 

361 x = x.replace(b")", b"\\)") 

362 return b"(" + x + b")" 

363 else: 

364 return bytes(x) 

365 

366 

367class PdfParser: 

368 """Based on 

369 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf 

370 Supports PDF up to 1.4 

371 """ 

372 

373 def __init__( 

374 self, 

375 filename: str | None = None, 

376 f: IO[bytes] | None = None, 

377 buf: bytes | bytearray | None = None, 

378 start_offset: int = 0, 

379 mode: str = "rb", 

380 ) -> None: 

381 if buf and f: 

382 msg = "specify buf or f or filename, but not both buf and f" 

383 raise RuntimeError(msg) 

384 self.filename = filename 

385 self.buf: bytes | bytearray | mmap.mmap | None = buf 

386 self.f = f 

387 self.start_offset = start_offset 

388 self.should_close_buf = False 

389 self.should_close_file = False 

390 if filename is not None and f is None: 

391 self.f = f = open(filename, mode) 

392 self.should_close_file = True 

393 if f is not None: 

394 self.buf = self.get_buf_from_file(f) 

395 self.should_close_buf = True 

396 if not filename and hasattr(f, "name"): 

397 self.filename = f.name 

398 self.cached_objects: dict[IndirectReference, Any] = {} 

399 self.root_ref: IndirectReference | None 

400 self.info_ref: IndirectReference | None 

401 self.pages_ref: IndirectReference | None 

402 self.last_xref_section_offset: int | None 

403 if self.buf: 

404 self.read_pdf_info() 

405 else: 

406 self.file_size_total = self.file_size_this = 0 

407 self.root = PdfDict() 

408 self.root_ref = None 

409 self.info = PdfDict() 

410 self.info_ref = None 

411 self.page_tree_root = PdfDict() 

412 self.pages: list[IndirectReference] = [] 

413 self.orig_pages: list[IndirectReference] = [] 

414 self.pages_ref = None 

415 self.last_xref_section_offset = None 

416 self.trailer_dict: dict[bytes, Any] = {} 

417 self.xref_table = XrefTable() 

418 self.xref_table.reading_finished = True 

419 if f: 

420 self.seek_end() 

421 

422 def __enter__(self) -> PdfParser: 

423 return self 

424 

425 def __exit__(self, *args: object) -> None: 

426 self.close() 

427 

428 def start_writing(self) -> None: 

429 self.close_buf() 

430 self.seek_end() 

431 

432 def close_buf(self) -> None: 

433 if isinstance(self.buf, mmap.mmap): 

434 self.buf.close() 

435 self.buf = None 

436 

437 def close(self) -> None: 

438 if self.should_close_buf: 

439 self.close_buf() 

440 if self.f is not None and self.should_close_file: 

441 self.f.close() 

442 self.f = None 

443 

444 def seek_end(self) -> None: 

445 assert self.f is not None 

446 self.f.seek(0, os.SEEK_END) 

447 

448 def write_header(self) -> None: 

449 assert self.f is not None 

450 self.f.write(b"%PDF-1.4\n") 

451 

452 def write_comment(self, s: str) -> None: 

453 assert self.f is not None 

454 self.f.write(f"% {s}\n".encode()) 

455 

456 def write_catalog(self) -> IndirectReference: 

457 assert self.f is not None 

458 self.del_root() 

459 self.root_ref = self.next_object_id(self.f.tell()) 

460 self.pages_ref = self.next_object_id(0) 

461 self.rewrite_pages() 

462 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref) 

463 self.write_obj( 

464 self.pages_ref, 

465 Type=PdfName(b"Pages"), 

466 Count=len(self.pages), 

467 Kids=self.pages, 

468 ) 

469 return self.root_ref 

470 

471 def rewrite_pages(self) -> None: 

472 pages_tree_nodes_to_delete = [] 

473 for i, page_ref in enumerate(self.orig_pages): 

474 page_info = self.cached_objects[page_ref] 

475 del self.xref_table[page_ref.object_id] 

476 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")]) 

477 if page_ref not in self.pages: 

478 # the page has been deleted 

479 continue 

480 # make dict keys into strings for passing to write_page 

481 stringified_page_info = {} 

482 for key, value in page_info.items(): 

483 # key should be a PdfName 

484 stringified_page_info[key.name_as_str()] = value 

485 stringified_page_info["Parent"] = self.pages_ref 

486 new_page_ref = self.write_page(None, **stringified_page_info) 

487 for j, cur_page_ref in enumerate(self.pages): 

488 if cur_page_ref == page_ref: 

489 # replace the page reference with the new one 

490 self.pages[j] = new_page_ref 

491 # delete redundant Pages tree nodes from xref table 

492 for pages_tree_node_ref in pages_tree_nodes_to_delete: 

493 while pages_tree_node_ref: 

494 pages_tree_node = self.cached_objects[pages_tree_node_ref] 

495 if pages_tree_node_ref.object_id in self.xref_table: 

496 del self.xref_table[pages_tree_node_ref.object_id] 

497 pages_tree_node_ref = pages_tree_node.get(b"Parent", None) 

498 self.orig_pages = [] 

499 

500 def write_xref_and_trailer( 

501 self, new_root_ref: IndirectReference | None = None 

502 ) -> None: 

503 assert self.f is not None 

504 if new_root_ref: 

505 self.del_root() 

506 self.root_ref = new_root_ref 

507 if self.info: 

508 self.info_ref = self.write_obj(None, self.info) 

509 start_xref = self.xref_table.write(self.f) 

510 num_entries = len(self.xref_table) 

511 trailer_dict: dict[str | bytes, Any] = { 

512 b"Root": self.root_ref, 

513 b"Size": num_entries, 

514 } 

515 if self.last_xref_section_offset is not None: 

516 trailer_dict[b"Prev"] = self.last_xref_section_offset 

517 if self.info: 

518 trailer_dict[b"Info"] = self.info_ref 

519 self.last_xref_section_offset = start_xref 

520 self.f.write( 

521 b"trailer\n" 

522 + bytes(PdfDict(trailer_dict)) 

523 + b"\nstartxref\n%d\n%%%%EOF" % start_xref 

524 ) 

525 

526 def write_page( 

527 self, ref: int | IndirectReference | None, *objs: Any, **dict_obj: Any 

528 ) -> IndirectReference: 

529 obj_ref = self.pages[ref] if isinstance(ref, int) else ref 

530 if "Type" not in dict_obj: 

531 dict_obj["Type"] = PdfName(b"Page") 

532 if "Parent" not in dict_obj: 

533 dict_obj["Parent"] = self.pages_ref 

534 return self.write_obj(obj_ref, *objs, **dict_obj) 

535 

536 def write_obj( 

537 self, ref: IndirectReference | None, *objs: Any, **dict_obj: Any 

538 ) -> IndirectReference: 

539 assert self.f is not None 

540 f = self.f 

541 if ref is None: 

542 ref = self.next_object_id(f.tell()) 

543 else: 

544 self.xref_table[ref.object_id] = (f.tell(), ref.generation) 

545 f.write(bytes(IndirectObjectDef(*ref))) 

546 stream = dict_obj.pop("stream", None) 

547 if stream is not None: 

548 dict_obj["Length"] = len(stream) 

549 if dict_obj: 

550 f.write(pdf_repr(dict_obj)) 

551 for obj in objs: 

552 f.write(pdf_repr(obj)) 

553 if stream is not None: 

554 f.write(b"stream\n") 

555 f.write(stream) 

556 f.write(b"\nendstream\n") 

557 f.write(b"endobj\n") 

558 return ref 

559 

560 def del_root(self) -> None: 

561 if self.root_ref is None: 

562 return 

563 del self.xref_table[self.root_ref.object_id] 

564 del self.xref_table[self.root[b"Pages"].object_id] 

565 

566 @staticmethod 

567 def get_buf_from_file(f: IO[bytes]) -> bytes | mmap.mmap: 

568 if hasattr(f, "getbuffer"): 

569 return f.getbuffer() 

570 elif hasattr(f, "getvalue"): 

571 return f.getvalue() 

572 else: 

573 try: 

574 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 

575 except ValueError: # cannot mmap an empty file 

576 return b"" 

577 

578 def read_pdf_info(self) -> None: 

579 assert self.buf is not None 

580 self.file_size_total = len(self.buf) 

581 self.file_size_this = self.file_size_total - self.start_offset 

582 self.read_trailer() 

583 check_format_condition( 

584 self.trailer_dict.get(b"Root") is not None, "Root is missing" 

585 ) 

586 self.root_ref = self.trailer_dict[b"Root"] 

587 assert self.root_ref is not None 

588 self.info_ref = self.trailer_dict.get(b"Info", None) 

589 self.root = PdfDict(self.read_indirect(self.root_ref)) 

590 if self.info_ref is None: 

591 self.info = PdfDict() 

592 else: 

593 self.info = PdfDict(self.read_indirect(self.info_ref)) 

594 check_format_condition(b"Type" in self.root, "/Type missing in Root") 

595 check_format_condition( 

596 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" 

597 ) 

598 check_format_condition( 

599 self.root.get(b"Pages") is not None, "/Pages missing in Root" 

600 ) 

601 check_format_condition( 

602 isinstance(self.root[b"Pages"], IndirectReference), 

603 "/Pages in Root is not an indirect reference", 

604 ) 

605 self.pages_ref = self.root[b"Pages"] 

606 assert self.pages_ref is not None 

607 self.page_tree_root = self.read_indirect(self.pages_ref) 

608 self.pages = self.linearize_page_tree(self.page_tree_root) 

609 # save the original list of page references 

610 # in case the user modifies, adds or deletes some pages 

611 # and we need to rewrite the pages and their list 

612 self.orig_pages = self.pages[:] 

613 

614 def next_object_id(self, offset: int | None = None) -> IndirectReference: 

615 try: 

616 # TODO: support reuse of deleted objects 

617 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) 

618 except ValueError: 

619 reference = IndirectReference(1, 0) 

620 if offset is not None: 

621 self.xref_table[reference.object_id] = (offset, 0) 

622 return reference 

623 

624 delimiter = rb"[][()<>{}/%]" 

625 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]" 

626 whitespace = rb"[\000\011\012\014\015\040]" 

627 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]" 

628 whitespace_optional = whitespace + b"*" 

629 whitespace_mandatory = whitespace + b"+" 

630 # No "\012" aka "\n" or "\015" aka "\r": 

631 whitespace_optional_no_nl = rb"[\000\011\014\040]*" 

632 newline_only = rb"[\r\n]+" 

633 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl 

634 re_trailer_end = re.compile( 

635 whitespace_mandatory 

636 + rb"trailer" 

637 + whitespace_optional 

638 + rb"<<(.*>>)" 

639 + newline 

640 + rb"startxref" 

641 + newline 

642 + rb"([0-9]+)" 

643 + newline 

644 + rb"%%EOF" 

645 + whitespace_optional 

646 + rb"$", 

647 re.DOTALL, 

648 ) 

649 re_trailer_prev = re.compile( 

650 whitespace_optional 

651 + rb"trailer" 

652 + whitespace_optional 

653 + rb"<<(.*?>>)" 

654 + newline 

655 + rb"startxref" 

656 + newline 

657 + rb"([0-9]+)" 

658 + newline 

659 + rb"%%EOF" 

660 + whitespace_optional, 

661 re.DOTALL, 

662 ) 

663 

664 def read_trailer(self) -> None: 

665 assert self.buf is not None 

666 search_start_offset = len(self.buf) - 16384 

667 if search_start_offset < self.start_offset: 

668 search_start_offset = self.start_offset 

669 m = self.re_trailer_end.search(self.buf, search_start_offset) 

670 check_format_condition(m is not None, "trailer end not found") 

671 # make sure we found the LAST trailer 

672 last_match = m 

673 while m: 

674 last_match = m 

675 m = self.re_trailer_end.search(self.buf, m.start() + 16) 

676 if not m: 

677 m = last_match 

678 assert m is not None 

679 trailer_data = m.group(1) 

680 self.last_xref_section_offset = int(m.group(2)) 

681 self.trailer_dict = self.interpret_trailer(trailer_data) 

682 self.xref_table = XrefTable() 

683 self.read_xref_table(xref_section_offset=self.last_xref_section_offset) 

684 if b"Prev" in self.trailer_dict: 

685 self.read_prev_trailer(self.trailer_dict[b"Prev"]) 

686 

687 def read_prev_trailer(self, xref_section_offset: int) -> None: 

688 assert self.buf is not None 

689 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) 

690 m = self.re_trailer_prev.search( 

691 self.buf[trailer_offset : trailer_offset + 16384] 

692 ) 

693 check_format_condition(m is not None, "previous trailer not found") 

694 assert m is not None 

695 trailer_data = m.group(1) 

696 check_format_condition( 

697 int(m.group(2)) == xref_section_offset, 

698 "xref section offset in previous trailer doesn't match what was expected", 

699 ) 

700 trailer_dict = self.interpret_trailer(trailer_data) 

701 if b"Prev" in trailer_dict: 

702 self.read_prev_trailer(trailer_dict[b"Prev"]) 

703 

704 re_whitespace_optional = re.compile(whitespace_optional) 

705 re_name = re.compile( 

706 whitespace_optional 

707 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?=" 

708 + delimiter_or_ws 

709 + rb")" 

710 ) 

711 re_dict_start = re.compile(whitespace_optional + rb"<<") 

712 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) 

713 

714 @classmethod 

715 def interpret_trailer(cls, trailer_data: bytes) -> dict[bytes, Any]: 

716 trailer = {} 

717 offset = 0 

718 while True: 

719 m = cls.re_name.match(trailer_data, offset) 

720 if not m: 

721 m = cls.re_dict_end.match(trailer_data, offset) 

722 check_format_condition( 

723 m is not None and m.end() == len(trailer_data), 

724 "name not found in trailer, remaining data: " 

725 + repr(trailer_data[offset:]), 

726 ) 

727 break 

728 key = cls.interpret_name(m.group(1)) 

729 assert isinstance(key, bytes) 

730 value, value_offset = cls.get_value(trailer_data, m.end()) 

731 trailer[key] = value 

732 if value_offset is None: 

733 break 

734 offset = value_offset 

735 check_format_condition( 

736 b"Size" in trailer and isinstance(trailer[b"Size"], int), 

737 "/Size not in trailer or not an integer", 

738 ) 

739 check_format_condition( 

740 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference), 

741 "/Root not in trailer or not an indirect reference", 

742 ) 

743 return trailer 

744 

745 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") 

746 

747 @classmethod 

748 def interpret_name(cls, raw: bytes, as_text: bool = False) -> str | bytes: 

749 name = b"" 

750 for m in cls.re_hashes_in_name.finditer(raw): 

751 if m.group(3): 

752 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii")) 

753 else: 

754 name += m.group(1) 

755 if as_text: 

756 return name.decode("utf-8") 

757 else: 

758 return bytes(name) 

759 

760 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")") 

761 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")") 

762 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")") 

763 re_int = re.compile( 

764 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")" 

765 ) 

766 re_real = re.compile( 

767 whitespace_optional 

768 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?=" 

769 + delimiter_or_ws 

770 + rb")" 

771 ) 

772 re_array_start = re.compile(whitespace_optional + rb"\[") 

773 re_array_end = re.compile(whitespace_optional + rb"]") 

774 re_string_hex = re.compile( 

775 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>" 

776 ) 

777 re_string_lit = re.compile(whitespace_optional + rb"\(") 

778 re_indirect_reference = re.compile( 

779 whitespace_optional 

780 + rb"([-+]?[0-9]+)" 

781 + whitespace_mandatory 

782 + rb"([-+]?[0-9]+)" 

783 + whitespace_mandatory 

784 + rb"R(?=" 

785 + delimiter_or_ws 

786 + rb")" 

787 ) 

788 re_indirect_def_start = re.compile( 

789 whitespace_optional 

790 + rb"([-+]?[0-9]+)" 

791 + whitespace_mandatory 

792 + rb"([-+]?[0-9]+)" 

793 + whitespace_mandatory 

794 + rb"obj(?=" 

795 + delimiter_or_ws 

796 + rb")" 

797 ) 

798 re_indirect_def_end = re.compile( 

799 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")" 

800 ) 

801 re_comment = re.compile( 

802 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*" 

803 ) 

804 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n") 

805 re_stream_end = re.compile( 

806 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")" 

807 ) 

808 

809 @classmethod 

810 def get_value( 

811 cls, 

812 data: bytes | bytearray | mmap.mmap, 

813 offset: int, 

814 expect_indirect: IndirectReference | None = None, 

815 max_nesting: int = -1, 

816 ) -> tuple[Any, int | None]: 

817 if max_nesting == 0: 

818 return None, None 

819 m = cls.re_comment.match(data, offset) 

820 if m: 

821 offset = m.end() 

822 m = cls.re_indirect_def_start.match(data, offset) 

823 if m: 

824 check_format_condition( 

825 int(m.group(1)) > 0, 

826 "indirect object definition: object ID must be greater than 0", 

827 ) 

828 check_format_condition( 

829 int(m.group(2)) >= 0, 

830 "indirect object definition: generation must be non-negative", 

831 ) 

832 check_format_condition( 

833 expect_indirect is None 

834 or expect_indirect 

835 == IndirectReference(int(m.group(1)), int(m.group(2))), 

836 "indirect object definition different than expected", 

837 ) 

838 object, object_offset = cls.get_value( 

839 data, m.end(), max_nesting=max_nesting - 1 

840 ) 

841 if object_offset is None: 

842 return object, None 

843 m = cls.re_indirect_def_end.match(data, object_offset) 

844 check_format_condition( 

845 m is not None, "indirect object definition end not found" 

846 ) 

847 assert m is not None 

848 return object, m.end() 

849 check_format_condition( 

850 not expect_indirect, "indirect object definition not found" 

851 ) 

852 m = cls.re_indirect_reference.match(data, offset) 

853 if m: 

854 check_format_condition( 

855 int(m.group(1)) > 0, 

856 "indirect object reference: object ID must be greater than 0", 

857 ) 

858 check_format_condition( 

859 int(m.group(2)) >= 0, 

860 "indirect object reference: generation must be non-negative", 

861 ) 

862 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end() 

863 m = cls.re_dict_start.match(data, offset) 

864 if m: 

865 offset = m.end() 

866 result: dict[Any, Any] = {} 

867 m = cls.re_dict_end.match(data, offset) 

868 current_offset: int | None = offset 

869 while not m: 

870 assert current_offset is not None 

871 key, current_offset = cls.get_value( 

872 data, current_offset, max_nesting=max_nesting - 1 

873 ) 

874 if current_offset is None: 

875 return result, None 

876 value, current_offset = cls.get_value( 

877 data, current_offset, max_nesting=max_nesting - 1 

878 ) 

879 result[key] = value 

880 if current_offset is None: 

881 return result, None 

882 m = cls.re_dict_end.match(data, current_offset) 

883 current_offset = m.end() 

884 m = cls.re_stream_start.match(data, current_offset) 

885 if m: 

886 stream_len = result.get(b"Length") 

887 if stream_len is None or not isinstance(stream_len, int): 

888 msg = f"bad or missing Length in stream dict ({stream_len})" 

889 raise PdfFormatError(msg) 

890 stream_data = data[m.end() : m.end() + stream_len] 

891 m = cls.re_stream_end.match(data, m.end() + stream_len) 

892 check_format_condition(m is not None, "stream end not found") 

893 assert m is not None 

894 current_offset = m.end() 

895 return PdfStream(PdfDict(result), stream_data), current_offset 

896 return PdfDict(result), current_offset 

897 m = cls.re_array_start.match(data, offset) 

898 if m: 

899 offset = m.end() 

900 results = [] 

901 m = cls.re_array_end.match(data, offset) 

902 current_offset = offset 

903 while not m: 

904 assert current_offset is not None 

905 value, current_offset = cls.get_value( 

906 data, current_offset, max_nesting=max_nesting - 1 

907 ) 

908 results.append(value) 

909 if current_offset is None: 

910 return results, None 

911 m = cls.re_array_end.match(data, current_offset) 

912 return results, m.end() 

913 m = cls.re_null.match(data, offset) 

914 if m: 

915 return None, m.end() 

916 m = cls.re_true.match(data, offset) 

917 if m: 

918 return True, m.end() 

919 m = cls.re_false.match(data, offset) 

920 if m: 

921 return False, m.end() 

922 m = cls.re_name.match(data, offset) 

923 if m: 

924 return PdfName(cls.interpret_name(m.group(1))), m.end() 

925 m = cls.re_int.match(data, offset) 

926 if m: 

927 return int(m.group(1)), m.end() 

928 m = cls.re_real.match(data, offset) 

929 if m: 

930 # XXX Decimal instead of float??? 

931 return float(m.group(1)), m.end() 

932 m = cls.re_string_hex.match(data, offset) 

933 if m: 

934 # filter out whitespace 

935 hex_string = bytearray( 

936 b for b in m.group(1) if b in b"0123456789abcdefABCDEF" 

937 ) 

938 if len(hex_string) % 2 == 1: 

939 # append a 0 if the length is not even - yes, at the end 

940 hex_string.append(ord(b"0")) 

941 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end() 

942 m = cls.re_string_lit.match(data, offset) 

943 if m: 

944 return cls.get_literal_string(data, m.end()) 

945 # return None, offset # fallback (only for debugging) 

946 msg = f"unrecognized object: {repr(data[offset : offset + 32])}" 

947 raise PdfFormatError(msg) 

948 

949 re_lit_str_token = re.compile( 

950 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))" 

951 ) 

952 escaped_chars = { 

953 b"n": b"\n", 

954 b"r": b"\r", 

955 b"t": b"\t", 

956 b"b": b"\b", 

957 b"f": b"\f", 

958 b"(": b"(", 

959 b")": b")", 

960 b"\\": b"\\", 

961 ord(b"n"): b"\n", 

962 ord(b"r"): b"\r", 

963 ord(b"t"): b"\t", 

964 ord(b"b"): b"\b", 

965 ord(b"f"): b"\f", 

966 ord(b"("): b"(", 

967 ord(b")"): b")", 

968 ord(b"\\"): b"\\", 

969 } 

970 

971 @classmethod 

972 def get_literal_string( 

973 cls, data: bytes | bytearray | mmap.mmap, offset: int 

974 ) -> tuple[bytes, int]: 

975 nesting_depth = 0 

976 result = bytearray() 

977 for m in cls.re_lit_str_token.finditer(data, offset): 

978 result.extend(data[offset : m.start()]) 

979 if m.group(1): 

980 result.extend(cls.escaped_chars[m.group(1)[1]]) 

981 elif m.group(2): 

982 result.append(int(m.group(2)[1:], 8)) 

983 elif m.group(3): 

984 pass 

985 elif m.group(5): 

986 result.extend(b"\n") 

987 elif m.group(6): 

988 result.extend(b"(") 

989 nesting_depth += 1 

990 elif m.group(7): 

991 if nesting_depth == 0: 

992 return bytes(result), m.end() 

993 result.extend(b")") 

994 nesting_depth -= 1 

995 offset = m.end() 

996 msg = "unfinished literal string" 

997 raise PdfFormatError(msg) 

998 

999 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline) 

1000 re_xref_subsection_start = re.compile( 

1001 whitespace_optional 

1002 + rb"([0-9]+)" 

1003 + whitespace_mandatory 

1004 + rb"([0-9]+)" 

1005 + whitespace_optional 

1006 + newline_only 

1007 ) 

1008 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") 

1009 

1010 def read_xref_table(self, xref_section_offset: int) -> int: 

1011 assert self.buf is not None 

1012 subsection_found = False 

1013 m = self.re_xref_section_start.match( 

1014 self.buf, xref_section_offset + self.start_offset 

1015 ) 

1016 check_format_condition(m is not None, "xref section start not found") 

1017 assert m is not None 

1018 offset = m.end() 

1019 while True: 

1020 m = self.re_xref_subsection_start.match(self.buf, offset) 

1021 if not m: 

1022 check_format_condition( 

1023 subsection_found, "xref subsection start not found" 

1024 ) 

1025 break 

1026 subsection_found = True 

1027 offset = m.end() 

1028 first_object = int(m.group(1)) 

1029 num_objects = int(m.group(2)) 

1030 for i in range(first_object, first_object + num_objects): 

1031 m = self.re_xref_entry.match(self.buf, offset) 

1032 check_format_condition(m is not None, "xref entry not found") 

1033 assert m is not None 

1034 offset = m.end() 

1035 is_free = m.group(3) == b"f" 

1036 if not is_free: 

1037 generation = int(m.group(2)) 

1038 new_entry = (int(m.group(1)), generation) 

1039 if i not in self.xref_table: 

1040 self.xref_table[i] = new_entry 

1041 return offset 

1042 

1043 def read_indirect(self, ref: IndirectReference, max_nesting: int = -1) -> Any: 

1044 offset, generation = self.xref_table[ref[0]] 

1045 check_format_condition( 

1046 generation == ref[1], 

1047 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " 

1048 f"table, instead found generation {generation} at offset {offset}", 

1049 ) 

1050 assert self.buf is not None 

1051 value = self.get_value( 

1052 self.buf, 

1053 offset + self.start_offset, 

1054 expect_indirect=IndirectReference(*ref), 

1055 max_nesting=max_nesting, 

1056 )[0] 

1057 self.cached_objects[ref] = value 

1058 return value 

1059 

1060 def linearize_page_tree( 

1061 self, node: PdfDict | None = None 

1062 ) -> list[IndirectReference]: 

1063 page_node = node if node is not None else self.page_tree_root 

1064 check_format_condition( 

1065 page_node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" 

1066 ) 

1067 pages = [] 

1068 for kid in page_node[b"Kids"]: 

1069 kid_object = self.read_indirect(kid) 

1070 if kid_object[b"Type"] == b"Page": 

1071 pages.append(kid) 

1072 else: 

1073 pages.extend(self.linearize_page_tree(node=kid_object)) 

1074 return pages