Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pillow-10.4.0-py3.8-linux-x86_64.egg/PIL/PdfParser.py: 20%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

650 statements  

1from __future__ import annotations 

2 

3import calendar 

4import codecs 

5import collections 

6import mmap 

7import os 

8import re 

9import time 

10import zlib 

11from typing import TYPE_CHECKING, Any, List, NamedTuple, Union 

12 

13 

14# see 7.9.2.2 Text String Type on page 86 and D.3 PDFDocEncoding Character Set 

15# on page 656 

16def encode_text(s: str) -> bytes: 

17 return codecs.BOM_UTF16_BE + s.encode("utf_16_be") 

18 

19 

20PDFDocEncoding = { 

21 0x16: "\u0017", 

22 0x18: "\u02D8", 

23 0x19: "\u02C7", 

24 0x1A: "\u02C6", 

25 0x1B: "\u02D9", 

26 0x1C: "\u02DD", 

27 0x1D: "\u02DB", 

28 0x1E: "\u02DA", 

29 0x1F: "\u02DC", 

30 0x80: "\u2022", 

31 0x81: "\u2020", 

32 0x82: "\u2021", 

33 0x83: "\u2026", 

34 0x84: "\u2014", 

35 0x85: "\u2013", 

36 0x86: "\u0192", 

37 0x87: "\u2044", 

38 0x88: "\u2039", 

39 0x89: "\u203A", 

40 0x8A: "\u2212", 

41 0x8B: "\u2030", 

42 0x8C: "\u201E", 

43 0x8D: "\u201C", 

44 0x8E: "\u201D", 

45 0x8F: "\u2018", 

46 0x90: "\u2019", 

47 0x91: "\u201A", 

48 0x92: "\u2122", 

49 0x93: "\uFB01", 

50 0x94: "\uFB02", 

51 0x95: "\u0141", 

52 0x96: "\u0152", 

53 0x97: "\u0160", 

54 0x98: "\u0178", 

55 0x99: "\u017D", 

56 0x9A: "\u0131", 

57 0x9B: "\u0142", 

58 0x9C: "\u0153", 

59 0x9D: "\u0161", 

60 0x9E: "\u017E", 

61 0xA0: "\u20AC", 

62} 

63 

64 

65def decode_text(b): 

66 if b[: len(codecs.BOM_UTF16_BE)] == codecs.BOM_UTF16_BE: 

67 return b[len(codecs.BOM_UTF16_BE) :].decode("utf_16_be") 

68 else: 

69 return "".join(PDFDocEncoding.get(byte, chr(byte)) for byte in b) 

70 

71 

72class PdfFormatError(RuntimeError): 

73 """An error that probably indicates a syntactic or semantic error in the 

74 PDF file structure""" 

75 

76 pass 

77 

78 

79def check_format_condition(condition: bool, error_message: str) -> None: 

80 if not condition: 

81 raise PdfFormatError(error_message) 

82 

83 

84class IndirectReferenceTuple(NamedTuple): 

85 object_id: int 

86 generation: int 

87 

88 

89class IndirectReference(IndirectReferenceTuple): 

90 def __str__(self) -> str: 

91 return f"{self.object_id} {self.generation} R" 

92 

93 def __bytes__(self) -> bytes: 

94 return self.__str__().encode("us-ascii") 

95 

96 def __eq__(self, other: object) -> bool: 

97 if self.__class__ is not other.__class__: 

98 return False 

99 assert isinstance(other, IndirectReference) 

100 return other.object_id == self.object_id and other.generation == self.generation 

101 

102 def __ne__(self, other): 

103 return not (self == other) 

104 

105 def __hash__(self) -> int: 

106 return hash((self.object_id, self.generation)) 

107 

108 

109class IndirectObjectDef(IndirectReference): 

110 def __str__(self) -> str: 

111 return f"{self.object_id} {self.generation} obj" 

112 

113 

114class XrefTable: 

115 def __init__(self): 

116 self.existing_entries = {} # object ID => (offset, generation) 

117 self.new_entries = {} # object ID => (offset, generation) 

118 self.deleted_entries = {0: 65536} # object ID => generation 

119 self.reading_finished = False 

120 

121 def __setitem__(self, key, value): 

122 if self.reading_finished: 

123 self.new_entries[key] = value 

124 else: 

125 self.existing_entries[key] = value 

126 if key in self.deleted_entries: 

127 del self.deleted_entries[key] 

128 

129 def __getitem__(self, key): 

130 try: 

131 return self.new_entries[key] 

132 except KeyError: 

133 return self.existing_entries[key] 

134 

135 def __delitem__(self, key): 

136 if key in self.new_entries: 

137 generation = self.new_entries[key][1] + 1 

138 del self.new_entries[key] 

139 self.deleted_entries[key] = generation 

140 elif key in self.existing_entries: 

141 generation = self.existing_entries[key][1] + 1 

142 self.deleted_entries[key] = generation 

143 elif key in self.deleted_entries: 

144 generation = self.deleted_entries[key] 

145 else: 

146 msg = f"object ID {key} cannot be deleted because it doesn't exist" 

147 raise IndexError(msg) 

148 

149 def __contains__(self, key): 

150 return key in self.existing_entries or key in self.new_entries 

151 

152 def __len__(self) -> int: 

153 return len( 

154 set(self.existing_entries.keys()) 

155 | set(self.new_entries.keys()) 

156 | set(self.deleted_entries.keys()) 

157 ) 

158 

159 def keys(self): 

160 return ( 

161 set(self.existing_entries.keys()) - set(self.deleted_entries.keys()) 

162 ) | set(self.new_entries.keys()) 

163 

164 def write(self, f): 

165 keys = sorted(set(self.new_entries.keys()) | set(self.deleted_entries.keys())) 

166 deleted_keys = sorted(set(self.deleted_entries.keys())) 

167 startxref = f.tell() 

168 f.write(b"xref\n") 

169 while keys: 

170 # find a contiguous sequence of object IDs 

171 prev = None 

172 for index, key in enumerate(keys): 

173 if prev is None or prev + 1 == key: 

174 prev = key 

175 else: 

176 contiguous_keys = keys[:index] 

177 keys = keys[index:] 

178 break 

179 else: 

180 contiguous_keys = keys 

181 keys = None 

182 f.write(b"%d %d\n" % (contiguous_keys[0], len(contiguous_keys))) 

183 for object_id in contiguous_keys: 

184 if object_id in self.new_entries: 

185 f.write(b"%010d %05d n \n" % self.new_entries[object_id]) 

186 else: 

187 this_deleted_object_id = deleted_keys.pop(0) 

188 check_format_condition( 

189 object_id == this_deleted_object_id, 

190 f"expected the next deleted object ID to be {object_id}, " 

191 f"instead found {this_deleted_object_id}", 

192 ) 

193 try: 

194 next_in_linked_list = deleted_keys[0] 

195 except IndexError: 

196 next_in_linked_list = 0 

197 f.write( 

198 b"%010d %05d f \n" 

199 % (next_in_linked_list, self.deleted_entries[object_id]) 

200 ) 

201 return startxref 

202 

203 

204class PdfName: 

205 def __init__(self, name): 

206 if isinstance(name, PdfName): 

207 self.name = name.name 

208 elif isinstance(name, bytes): 

209 self.name = name 

210 else: 

211 self.name = name.encode("us-ascii") 

212 

213 def name_as_str(self) -> str: 

214 return self.name.decode("us-ascii") 

215 

216 def __eq__(self, other): 

217 return ( 

218 isinstance(other, PdfName) and other.name == self.name 

219 ) or other == self.name 

220 

221 def __hash__(self) -> int: 

222 return hash(self.name) 

223 

224 def __repr__(self) -> str: 

225 return f"{self.__class__.__name__}({repr(self.name)})" 

226 

227 @classmethod 

228 def from_pdf_stream(cls, data): 

229 return cls(PdfParser.interpret_name(data)) 

230 

231 allowed_chars = set(range(33, 127)) - {ord(c) for c in "#%/()<>[]{}"} 

232 

233 def __bytes__(self) -> bytes: 

234 result = bytearray(b"/") 

235 for b in self.name: 

236 if b in self.allowed_chars: 

237 result.append(b) 

238 else: 

239 result.extend(b"#%02X" % b) 

240 return bytes(result) 

241 

242 

243class PdfArray(List[Any]): 

244 def __bytes__(self) -> bytes: 

245 return b"[ " + b" ".join(pdf_repr(x) for x in self) + b" ]" 

246 

247 

248if TYPE_CHECKING: 

249 _DictBase = collections.UserDict[Union[str, bytes], Any] 

250else: 

251 _DictBase = collections.UserDict 

252 

253 

254class PdfDict(_DictBase): 

255 def __setattr__(self, key, value): 

256 if key == "data": 

257 collections.UserDict.__setattr__(self, key, value) 

258 else: 

259 self[key.encode("us-ascii")] = value 

260 

261 def __getattr__(self, key): 

262 try: 

263 value = self[key.encode("us-ascii")] 

264 except KeyError as e: 

265 raise AttributeError(key) from e 

266 if isinstance(value, bytes): 

267 value = decode_text(value) 

268 if key.endswith("Date"): 

269 if value.startswith("D:"): 

270 value = value[2:] 

271 

272 relationship = "Z" 

273 if len(value) > 17: 

274 relationship = value[14] 

275 offset = int(value[15:17]) * 60 

276 if len(value) > 20: 

277 offset += int(value[18:20]) 

278 

279 format = "%Y%m%d%H%M%S"[: len(value) - 2] 

280 value = time.strptime(value[: len(format) + 2], format) 

281 if relationship in ["+", "-"]: 

282 offset *= 60 

283 if relationship == "+": 

284 offset *= -1 

285 value = time.gmtime(calendar.timegm(value) + offset) 

286 return value 

287 

288 def __bytes__(self) -> bytes: 

289 out = bytearray(b"<<") 

290 for key, value in self.items(): 

291 if value is None: 

292 continue 

293 value = pdf_repr(value) 

294 out.extend(b"\n") 

295 out.extend(bytes(PdfName(key))) 

296 out.extend(b" ") 

297 out.extend(value) 

298 out.extend(b"\n>>") 

299 return bytes(out) 

300 

301 

302class PdfBinary: 

303 def __init__(self, data): 

304 self.data = data 

305 

306 def __bytes__(self) -> bytes: 

307 return b"<%s>" % b"".join(b"%02X" % b for b in self.data) 

308 

309 

310class PdfStream: 

311 def __init__(self, dictionary, buf): 

312 self.dictionary = dictionary 

313 self.buf = buf 

314 

315 def decode(self): 

316 try: 

317 filter = self.dictionary.Filter 

318 except AttributeError: 

319 return self.buf 

320 if filter == b"FlateDecode": 

321 try: 

322 expected_length = self.dictionary.DL 

323 except AttributeError: 

324 expected_length = self.dictionary.Length 

325 return zlib.decompress(self.buf, bufsize=int(expected_length)) 

326 else: 

327 msg = f"stream filter {repr(self.dictionary.Filter)} unknown/unsupported" 

328 raise NotImplementedError(msg) 

329 

330 

331def pdf_repr(x): 

332 if x is True: 

333 return b"true" 

334 elif x is False: 

335 return b"false" 

336 elif x is None: 

337 return b"null" 

338 elif isinstance(x, (PdfName, PdfDict, PdfArray, PdfBinary)): 

339 return bytes(x) 

340 elif isinstance(x, (int, float)): 

341 return str(x).encode("us-ascii") 

342 elif isinstance(x, time.struct_time): 

343 return b"(D:" + time.strftime("%Y%m%d%H%M%SZ", x).encode("us-ascii") + b")" 

344 elif isinstance(x, dict): 

345 return bytes(PdfDict(x)) 

346 elif isinstance(x, list): 

347 return bytes(PdfArray(x)) 

348 elif isinstance(x, str): 

349 return pdf_repr(encode_text(x)) 

350 elif isinstance(x, bytes): 

351 # XXX escape more chars? handle binary garbage 

352 x = x.replace(b"\\", b"\\\\") 

353 x = x.replace(b"(", b"\\(") 

354 x = x.replace(b")", b"\\)") 

355 return b"(" + x + b")" 

356 else: 

357 return bytes(x) 

358 

359 

360class PdfParser: 

361 """Based on 

362 https://www.adobe.com/content/dam/acom/en/devnet/acrobat/pdfs/PDF32000_2008.pdf 

363 Supports PDF up to 1.4 

364 """ 

365 

366 def __init__(self, filename=None, f=None, buf=None, start_offset=0, mode="rb"): 

367 if buf and f: 

368 msg = "specify buf or f or filename, but not both buf and f" 

369 raise RuntimeError(msg) 

370 self.filename = filename 

371 self.buf = buf 

372 self.f = f 

373 self.start_offset = start_offset 

374 self.should_close_buf = False 

375 self.should_close_file = False 

376 if filename is not None and f is None: 

377 self.f = f = open(filename, mode) 

378 self.should_close_file = True 

379 if f is not None: 

380 self.buf = buf = self.get_buf_from_file(f) 

381 self.should_close_buf = True 

382 if not filename and hasattr(f, "name"): 

383 self.filename = f.name 

384 self.cached_objects = {} 

385 if buf: 

386 self.read_pdf_info() 

387 else: 

388 self.file_size_total = self.file_size_this = 0 

389 self.root = PdfDict() 

390 self.root_ref = None 

391 self.info = PdfDict() 

392 self.info_ref = None 

393 self.page_tree_root = {} 

394 self.pages = [] 

395 self.orig_pages = [] 

396 self.pages_ref = None 

397 self.last_xref_section_offset = None 

398 self.trailer_dict = {} 

399 self.xref_table = XrefTable() 

400 self.xref_table.reading_finished = True 

401 if f: 

402 self.seek_end() 

403 

404 def __enter__(self) -> PdfParser: 

405 return self 

406 

407 def __exit__(self, *args: object) -> None: 

408 self.close() 

409 

410 def start_writing(self) -> None: 

411 self.close_buf() 

412 self.seek_end() 

413 

414 def close_buf(self) -> None: 

415 try: 

416 self.buf.close() 

417 except AttributeError: 

418 pass 

419 self.buf = None 

420 

421 def close(self) -> None: 

422 if self.should_close_buf: 

423 self.close_buf() 

424 if self.f is not None and self.should_close_file: 

425 self.f.close() 

426 self.f = None 

427 

428 def seek_end(self) -> None: 

429 self.f.seek(0, os.SEEK_END) 

430 

431 def write_header(self) -> None: 

432 self.f.write(b"%PDF-1.4\n") 

433 

434 def write_comment(self, s): 

435 self.f.write(f"% {s}\n".encode()) 

436 

437 def write_catalog(self) -> IndirectReference: 

438 self.del_root() 

439 self.root_ref = self.next_object_id(self.f.tell()) 

440 self.pages_ref = self.next_object_id(0) 

441 self.rewrite_pages() 

442 self.write_obj(self.root_ref, Type=PdfName(b"Catalog"), Pages=self.pages_ref) 

443 self.write_obj( 

444 self.pages_ref, 

445 Type=PdfName(b"Pages"), 

446 Count=len(self.pages), 

447 Kids=self.pages, 

448 ) 

449 return self.root_ref 

450 

451 def rewrite_pages(self) -> None: 

452 pages_tree_nodes_to_delete = [] 

453 for i, page_ref in enumerate(self.orig_pages): 

454 page_info = self.cached_objects[page_ref] 

455 del self.xref_table[page_ref.object_id] 

456 pages_tree_nodes_to_delete.append(page_info[PdfName(b"Parent")]) 

457 if page_ref not in self.pages: 

458 # the page has been deleted 

459 continue 

460 # make dict keys into strings for passing to write_page 

461 stringified_page_info = {} 

462 for key, value in page_info.items(): 

463 # key should be a PdfName 

464 stringified_page_info[key.name_as_str()] = value 

465 stringified_page_info["Parent"] = self.pages_ref 

466 new_page_ref = self.write_page(None, **stringified_page_info) 

467 for j, cur_page_ref in enumerate(self.pages): 

468 if cur_page_ref == page_ref: 

469 # replace the page reference with the new one 

470 self.pages[j] = new_page_ref 

471 # delete redundant Pages tree nodes from xref table 

472 for pages_tree_node_ref in pages_tree_nodes_to_delete: 

473 while pages_tree_node_ref: 

474 pages_tree_node = self.cached_objects[pages_tree_node_ref] 

475 if pages_tree_node_ref.object_id in self.xref_table: 

476 del self.xref_table[pages_tree_node_ref.object_id] 

477 pages_tree_node_ref = pages_tree_node.get(b"Parent", None) 

478 self.orig_pages = [] 

479 

480 def write_xref_and_trailer(self, new_root_ref=None): 

481 if new_root_ref: 

482 self.del_root() 

483 self.root_ref = new_root_ref 

484 if self.info: 

485 self.info_ref = self.write_obj(None, self.info) 

486 start_xref = self.xref_table.write(self.f) 

487 num_entries = len(self.xref_table) 

488 trailer_dict = {b"Root": self.root_ref, b"Size": num_entries} 

489 if self.last_xref_section_offset is not None: 

490 trailer_dict[b"Prev"] = self.last_xref_section_offset 

491 if self.info: 

492 trailer_dict[b"Info"] = self.info_ref 

493 self.last_xref_section_offset = start_xref 

494 self.f.write( 

495 b"trailer\n" 

496 + bytes(PdfDict(trailer_dict)) 

497 + b"\nstartxref\n%d\n%%%%EOF" % start_xref 

498 ) 

499 

500 def write_page(self, ref, *objs, **dict_obj): 

501 if isinstance(ref, int): 

502 ref = self.pages[ref] 

503 if "Type" not in dict_obj: 

504 dict_obj["Type"] = PdfName(b"Page") 

505 if "Parent" not in dict_obj: 

506 dict_obj["Parent"] = self.pages_ref 

507 return self.write_obj(ref, *objs, **dict_obj) 

508 

509 def write_obj(self, ref, *objs, **dict_obj): 

510 f = self.f 

511 if ref is None: 

512 ref = self.next_object_id(f.tell()) 

513 else: 

514 self.xref_table[ref.object_id] = (f.tell(), ref.generation) 

515 f.write(bytes(IndirectObjectDef(*ref))) 

516 stream = dict_obj.pop("stream", None) 

517 if stream is not None: 

518 dict_obj["Length"] = len(stream) 

519 if dict_obj: 

520 f.write(pdf_repr(dict_obj)) 

521 for obj in objs: 

522 f.write(pdf_repr(obj)) 

523 if stream is not None: 

524 f.write(b"stream\n") 

525 f.write(stream) 

526 f.write(b"\nendstream\n") 

527 f.write(b"endobj\n") 

528 return ref 

529 

530 def del_root(self) -> None: 

531 if self.root_ref is None: 

532 return 

533 del self.xref_table[self.root_ref.object_id] 

534 del self.xref_table[self.root[b"Pages"].object_id] 

535 

536 @staticmethod 

537 def get_buf_from_file(f): 

538 if hasattr(f, "getbuffer"): 

539 return f.getbuffer() 

540 elif hasattr(f, "getvalue"): 

541 return f.getvalue() 

542 else: 

543 try: 

544 return mmap.mmap(f.fileno(), 0, access=mmap.ACCESS_READ) 

545 except ValueError: # cannot mmap an empty file 

546 return b"" 

547 

548 def read_pdf_info(self) -> None: 

549 self.file_size_total = len(self.buf) 

550 self.file_size_this = self.file_size_total - self.start_offset 

551 self.read_trailer() 

552 self.root_ref = self.trailer_dict[b"Root"] 

553 self.info_ref = self.trailer_dict.get(b"Info", None) 

554 self.root = PdfDict(self.read_indirect(self.root_ref)) 

555 if self.info_ref is None: 

556 self.info = PdfDict() 

557 else: 

558 self.info = PdfDict(self.read_indirect(self.info_ref)) 

559 check_format_condition(b"Type" in self.root, "/Type missing in Root") 

560 check_format_condition( 

561 self.root[b"Type"] == b"Catalog", "/Type in Root is not /Catalog" 

562 ) 

563 check_format_condition(b"Pages" in self.root, "/Pages missing in Root") 

564 check_format_condition( 

565 isinstance(self.root[b"Pages"], IndirectReference), 

566 "/Pages in Root is not an indirect reference", 

567 ) 

568 self.pages_ref = self.root[b"Pages"] 

569 self.page_tree_root = self.read_indirect(self.pages_ref) 

570 self.pages = self.linearize_page_tree(self.page_tree_root) 

571 # save the original list of page references 

572 # in case the user modifies, adds or deletes some pages 

573 # and we need to rewrite the pages and their list 

574 self.orig_pages = self.pages[:] 

575 

576 def next_object_id(self, offset=None): 

577 try: 

578 # TODO: support reuse of deleted objects 

579 reference = IndirectReference(max(self.xref_table.keys()) + 1, 0) 

580 except ValueError: 

581 reference = IndirectReference(1, 0) 

582 if offset is not None: 

583 self.xref_table[reference.object_id] = (offset, 0) 

584 return reference 

585 

586 delimiter = rb"[][()<>{}/%]" 

587 delimiter_or_ws = rb"[][()<>{}/%\000\011\012\014\015\040]" 

588 whitespace = rb"[\000\011\012\014\015\040]" 

589 whitespace_or_hex = rb"[\000\011\012\014\015\0400-9a-fA-F]" 

590 whitespace_optional = whitespace + b"*" 

591 whitespace_mandatory = whitespace + b"+" 

592 # No "\012" aka "\n" or "\015" aka "\r": 

593 whitespace_optional_no_nl = rb"[\000\011\014\040]*" 

594 newline_only = rb"[\r\n]+" 

595 newline = whitespace_optional_no_nl + newline_only + whitespace_optional_no_nl 

596 re_trailer_end = re.compile( 

597 whitespace_mandatory 

598 + rb"trailer" 

599 + whitespace_optional 

600 + rb"<<(.*>>)" 

601 + newline 

602 + rb"startxref" 

603 + newline 

604 + rb"([0-9]+)" 

605 + newline 

606 + rb"%%EOF" 

607 + whitespace_optional 

608 + rb"$", 

609 re.DOTALL, 

610 ) 

611 re_trailer_prev = re.compile( 

612 whitespace_optional 

613 + rb"trailer" 

614 + whitespace_optional 

615 + rb"<<(.*?>>)" 

616 + newline 

617 + rb"startxref" 

618 + newline 

619 + rb"([0-9]+)" 

620 + newline 

621 + rb"%%EOF" 

622 + whitespace_optional, 

623 re.DOTALL, 

624 ) 

625 

626 def read_trailer(self): 

627 search_start_offset = len(self.buf) - 16384 

628 if search_start_offset < self.start_offset: 

629 search_start_offset = self.start_offset 

630 m = self.re_trailer_end.search(self.buf, search_start_offset) 

631 check_format_condition(m, "trailer end not found") 

632 # make sure we found the LAST trailer 

633 last_match = m 

634 while m: 

635 last_match = m 

636 m = self.re_trailer_end.search(self.buf, m.start() + 16) 

637 if not m: 

638 m = last_match 

639 trailer_data = m.group(1) 

640 self.last_xref_section_offset = int(m.group(2)) 

641 self.trailer_dict = self.interpret_trailer(trailer_data) 

642 self.xref_table = XrefTable() 

643 self.read_xref_table(xref_section_offset=self.last_xref_section_offset) 

644 if b"Prev" in self.trailer_dict: 

645 self.read_prev_trailer(self.trailer_dict[b"Prev"]) 

646 

647 def read_prev_trailer(self, xref_section_offset): 

648 trailer_offset = self.read_xref_table(xref_section_offset=xref_section_offset) 

649 m = self.re_trailer_prev.search( 

650 self.buf[trailer_offset : trailer_offset + 16384] 

651 ) 

652 check_format_condition(m, "previous trailer not found") 

653 trailer_data = m.group(1) 

654 check_format_condition( 

655 int(m.group(2)) == xref_section_offset, 

656 "xref section offset in previous trailer doesn't match what was expected", 

657 ) 

658 trailer_dict = self.interpret_trailer(trailer_data) 

659 if b"Prev" in trailer_dict: 

660 self.read_prev_trailer(trailer_dict[b"Prev"]) 

661 

662 re_whitespace_optional = re.compile(whitespace_optional) 

663 re_name = re.compile( 

664 whitespace_optional 

665 + rb"/([!-$&'*-.0-;=?-Z\\^-z|~]+)(?=" 

666 + delimiter_or_ws 

667 + rb")" 

668 ) 

669 re_dict_start = re.compile(whitespace_optional + rb"<<") 

670 re_dict_end = re.compile(whitespace_optional + rb">>" + whitespace_optional) 

671 

672 @classmethod 

673 def interpret_trailer(cls, trailer_data): 

674 trailer = {} 

675 offset = 0 

676 while True: 

677 m = cls.re_name.match(trailer_data, offset) 

678 if not m: 

679 m = cls.re_dict_end.match(trailer_data, offset) 

680 check_format_condition( 

681 m and m.end() == len(trailer_data), 

682 "name not found in trailer, remaining data: " 

683 + repr(trailer_data[offset:]), 

684 ) 

685 break 

686 key = cls.interpret_name(m.group(1)) 

687 value, offset = cls.get_value(trailer_data, m.end()) 

688 trailer[key] = value 

689 check_format_condition( 

690 b"Size" in trailer and isinstance(trailer[b"Size"], int), 

691 "/Size not in trailer or not an integer", 

692 ) 

693 check_format_condition( 

694 b"Root" in trailer and isinstance(trailer[b"Root"], IndirectReference), 

695 "/Root not in trailer or not an indirect reference", 

696 ) 

697 return trailer 

698 

699 re_hashes_in_name = re.compile(rb"([^#]*)(#([0-9a-fA-F]{2}))?") 

700 

701 @classmethod 

702 def interpret_name(cls, raw, as_text=False): 

703 name = b"" 

704 for m in cls.re_hashes_in_name.finditer(raw): 

705 if m.group(3): 

706 name += m.group(1) + bytearray.fromhex(m.group(3).decode("us-ascii")) 

707 else: 

708 name += m.group(1) 

709 if as_text: 

710 return name.decode("utf-8") 

711 else: 

712 return bytes(name) 

713 

714 re_null = re.compile(whitespace_optional + rb"null(?=" + delimiter_or_ws + rb")") 

715 re_true = re.compile(whitespace_optional + rb"true(?=" + delimiter_or_ws + rb")") 

716 re_false = re.compile(whitespace_optional + rb"false(?=" + delimiter_or_ws + rb")") 

717 re_int = re.compile( 

718 whitespace_optional + rb"([-+]?[0-9]+)(?=" + delimiter_or_ws + rb")" 

719 ) 

720 re_real = re.compile( 

721 whitespace_optional 

722 + rb"([-+]?([0-9]+\.[0-9]*|[0-9]*\.[0-9]+))(?=" 

723 + delimiter_or_ws 

724 + rb")" 

725 ) 

726 re_array_start = re.compile(whitespace_optional + rb"\[") 

727 re_array_end = re.compile(whitespace_optional + rb"]") 

728 re_string_hex = re.compile( 

729 whitespace_optional + rb"<(" + whitespace_or_hex + rb"*)>" 

730 ) 

731 re_string_lit = re.compile(whitespace_optional + rb"\(") 

732 re_indirect_reference = re.compile( 

733 whitespace_optional 

734 + rb"([-+]?[0-9]+)" 

735 + whitespace_mandatory 

736 + rb"([-+]?[0-9]+)" 

737 + whitespace_mandatory 

738 + rb"R(?=" 

739 + delimiter_or_ws 

740 + rb")" 

741 ) 

742 re_indirect_def_start = re.compile( 

743 whitespace_optional 

744 + rb"([-+]?[0-9]+)" 

745 + whitespace_mandatory 

746 + rb"([-+]?[0-9]+)" 

747 + whitespace_mandatory 

748 + rb"obj(?=" 

749 + delimiter_or_ws 

750 + rb")" 

751 ) 

752 re_indirect_def_end = re.compile( 

753 whitespace_optional + rb"endobj(?=" + delimiter_or_ws + rb")" 

754 ) 

755 re_comment = re.compile( 

756 rb"(" + whitespace_optional + rb"%[^\r\n]*" + newline + rb")*" 

757 ) 

758 re_stream_start = re.compile(whitespace_optional + rb"stream\r?\n") 

759 re_stream_end = re.compile( 

760 whitespace_optional + rb"endstream(?=" + delimiter_or_ws + rb")" 

761 ) 

762 

763 @classmethod 

764 def get_value(cls, data, offset, expect_indirect=None, max_nesting=-1): 

765 if max_nesting == 0: 

766 return None, None 

767 m = cls.re_comment.match(data, offset) 

768 if m: 

769 offset = m.end() 

770 m = cls.re_indirect_def_start.match(data, offset) 

771 if m: 

772 check_format_condition( 

773 int(m.group(1)) > 0, 

774 "indirect object definition: object ID must be greater than 0", 

775 ) 

776 check_format_condition( 

777 int(m.group(2)) >= 0, 

778 "indirect object definition: generation must be non-negative", 

779 ) 

780 check_format_condition( 

781 expect_indirect is None 

782 or expect_indirect 

783 == IndirectReference(int(m.group(1)), int(m.group(2))), 

784 "indirect object definition different than expected", 

785 ) 

786 object, offset = cls.get_value(data, m.end(), max_nesting=max_nesting - 1) 

787 if offset is None: 

788 return object, None 

789 m = cls.re_indirect_def_end.match(data, offset) 

790 check_format_condition(m, "indirect object definition end not found") 

791 return object, m.end() 

792 check_format_condition( 

793 not expect_indirect, "indirect object definition not found" 

794 ) 

795 m = cls.re_indirect_reference.match(data, offset) 

796 if m: 

797 check_format_condition( 

798 int(m.group(1)) > 0, 

799 "indirect object reference: object ID must be greater than 0", 

800 ) 

801 check_format_condition( 

802 int(m.group(2)) >= 0, 

803 "indirect object reference: generation must be non-negative", 

804 ) 

805 return IndirectReference(int(m.group(1)), int(m.group(2))), m.end() 

806 m = cls.re_dict_start.match(data, offset) 

807 if m: 

808 offset = m.end() 

809 result = {} 

810 m = cls.re_dict_end.match(data, offset) 

811 while not m: 

812 key, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) 

813 if offset is None: 

814 return result, None 

815 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) 

816 result[key] = value 

817 if offset is None: 

818 return result, None 

819 m = cls.re_dict_end.match(data, offset) 

820 offset = m.end() 

821 m = cls.re_stream_start.match(data, offset) 

822 if m: 

823 try: 

824 stream_len_str = result.get(b"Length") 

825 stream_len = int(stream_len_str) 

826 except (TypeError, ValueError) as e: 

827 msg = f"bad or missing Length in stream dict ({stream_len_str})" 

828 raise PdfFormatError(msg) from e 

829 stream_data = data[m.end() : m.end() + stream_len] 

830 m = cls.re_stream_end.match(data, m.end() + stream_len) 

831 check_format_condition(m, "stream end not found") 

832 offset = m.end() 

833 result = PdfStream(PdfDict(result), stream_data) 

834 else: 

835 result = PdfDict(result) 

836 return result, offset 

837 m = cls.re_array_start.match(data, offset) 

838 if m: 

839 offset = m.end() 

840 result = [] 

841 m = cls.re_array_end.match(data, offset) 

842 while not m: 

843 value, offset = cls.get_value(data, offset, max_nesting=max_nesting - 1) 

844 result.append(value) 

845 if offset is None: 

846 return result, None 

847 m = cls.re_array_end.match(data, offset) 

848 return result, m.end() 

849 m = cls.re_null.match(data, offset) 

850 if m: 

851 return None, m.end() 

852 m = cls.re_true.match(data, offset) 

853 if m: 

854 return True, m.end() 

855 m = cls.re_false.match(data, offset) 

856 if m: 

857 return False, m.end() 

858 m = cls.re_name.match(data, offset) 

859 if m: 

860 return PdfName(cls.interpret_name(m.group(1))), m.end() 

861 m = cls.re_int.match(data, offset) 

862 if m: 

863 return int(m.group(1)), m.end() 

864 m = cls.re_real.match(data, offset) 

865 if m: 

866 # XXX Decimal instead of float??? 

867 return float(m.group(1)), m.end() 

868 m = cls.re_string_hex.match(data, offset) 

869 if m: 

870 # filter out whitespace 

871 hex_string = bytearray( 

872 b for b in m.group(1) if b in b"0123456789abcdefABCDEF" 

873 ) 

874 if len(hex_string) % 2 == 1: 

875 # append a 0 if the length is not even - yes, at the end 

876 hex_string.append(ord(b"0")) 

877 return bytearray.fromhex(hex_string.decode("us-ascii")), m.end() 

878 m = cls.re_string_lit.match(data, offset) 

879 if m: 

880 return cls.get_literal_string(data, m.end()) 

881 # return None, offset # fallback (only for debugging) 

882 msg = f"unrecognized object: {repr(data[offset : offset + 32])}" 

883 raise PdfFormatError(msg) 

884 

885 re_lit_str_token = re.compile( 

886 rb"(\\[nrtbf()\\])|(\\[0-9]{1,3})|(\\(\r\n|\r|\n))|(\r\n|\r|\n)|(\()|(\))" 

887 ) 

888 escaped_chars = { 

889 b"n": b"\n", 

890 b"r": b"\r", 

891 b"t": b"\t", 

892 b"b": b"\b", 

893 b"f": b"\f", 

894 b"(": b"(", 

895 b")": b")", 

896 b"\\": b"\\", 

897 ord(b"n"): b"\n", 

898 ord(b"r"): b"\r", 

899 ord(b"t"): b"\t", 

900 ord(b"b"): b"\b", 

901 ord(b"f"): b"\f", 

902 ord(b"("): b"(", 

903 ord(b")"): b")", 

904 ord(b"\\"): b"\\", 

905 } 

906 

907 @classmethod 

908 def get_literal_string(cls, data, offset): 

909 nesting_depth = 0 

910 result = bytearray() 

911 for m in cls.re_lit_str_token.finditer(data, offset): 

912 result.extend(data[offset : m.start()]) 

913 if m.group(1): 

914 result.extend(cls.escaped_chars[m.group(1)[1]]) 

915 elif m.group(2): 

916 result.append(int(m.group(2)[1:], 8)) 

917 elif m.group(3): 

918 pass 

919 elif m.group(5): 

920 result.extend(b"\n") 

921 elif m.group(6): 

922 result.extend(b"(") 

923 nesting_depth += 1 

924 elif m.group(7): 

925 if nesting_depth == 0: 

926 return bytes(result), m.end() 

927 result.extend(b")") 

928 nesting_depth -= 1 

929 offset = m.end() 

930 msg = "unfinished literal string" 

931 raise PdfFormatError(msg) 

932 

933 re_xref_section_start = re.compile(whitespace_optional + rb"xref" + newline) 

934 re_xref_subsection_start = re.compile( 

935 whitespace_optional 

936 + rb"([0-9]+)" 

937 + whitespace_mandatory 

938 + rb"([0-9]+)" 

939 + whitespace_optional 

940 + newline_only 

941 ) 

942 re_xref_entry = re.compile(rb"([0-9]{10}) ([0-9]{5}) ([fn])( \r| \n|\r\n)") 

943 

944 def read_xref_table(self, xref_section_offset): 

945 subsection_found = False 

946 m = self.re_xref_section_start.match( 

947 self.buf, xref_section_offset + self.start_offset 

948 ) 

949 check_format_condition(m, "xref section start not found") 

950 offset = m.end() 

951 while True: 

952 m = self.re_xref_subsection_start.match(self.buf, offset) 

953 if not m: 

954 check_format_condition( 

955 subsection_found, "xref subsection start not found" 

956 ) 

957 break 

958 subsection_found = True 

959 offset = m.end() 

960 first_object = int(m.group(1)) 

961 num_objects = int(m.group(2)) 

962 for i in range(first_object, first_object + num_objects): 

963 m = self.re_xref_entry.match(self.buf, offset) 

964 check_format_condition(m, "xref entry not found") 

965 offset = m.end() 

966 is_free = m.group(3) == b"f" 

967 if not is_free: 

968 generation = int(m.group(2)) 

969 new_entry = (int(m.group(1)), generation) 

970 if i not in self.xref_table: 

971 self.xref_table[i] = new_entry 

972 return offset 

973 

974 def read_indirect(self, ref, max_nesting=-1): 

975 offset, generation = self.xref_table[ref[0]] 

976 check_format_condition( 

977 generation == ref[1], 

978 f"expected to find generation {ref[1]} for object ID {ref[0]} in xref " 

979 f"table, instead found generation {generation} at offset {offset}", 

980 ) 

981 value = self.get_value( 

982 self.buf, 

983 offset + self.start_offset, 

984 expect_indirect=IndirectReference(*ref), 

985 max_nesting=max_nesting, 

986 )[0] 

987 self.cached_objects[ref] = value 

988 return value 

989 

990 def linearize_page_tree(self, node=None): 

991 if node is None: 

992 node = self.page_tree_root 

993 check_format_condition( 

994 node[b"Type"] == b"Pages", "/Type of page tree node is not /Pages" 

995 ) 

996 pages = [] 

997 for kid in node[b"Kids"]: 

998 kid_object = self.read_indirect(kid) 

999 if kid_object[b"Type"] == b"Page": 

1000 pages.append(kid) 

1001 else: 

1002 pages.extend(self.linearize_page_tree(node=kid_object)) 

1003 return pages