Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/psparser.py: 95%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

465 statements  

1#!/usr/bin/env python3 

2import io 

3import logging 

4import re 

5from typing import ( 

6 Any, 

7 BinaryIO, 

8 Dict, 

9 Generic, 

10 Iterator, 

11 List, 

12 Optional, 

13 Tuple, 

14 Type, 

15 TypeVar, 

16 Union, 

17) 

18 

19from pdfminer import psexceptions, settings 

20from pdfminer.utils import choplist 

21 

22log = logging.getLogger(__name__) 

23 

24 

25# Adding aliases for these exceptions for backwards compatibility 

26PSException = psexceptions.PSException 

27PSEOF = psexceptions.PSEOF 

28PSSyntaxError = psexceptions.PSSyntaxError 

29PSTypeError = psexceptions.PSTypeError 

30PSValueError = psexceptions.PSValueError 

31 

32 

33class PSObject: 

34 """Base class for all PS or PDF-related data types.""" 

35 

36 

37class PSLiteral(PSObject): 

38 """A class that represents a PostScript literal. 

39 

40 Postscript literals are used as identifiers, such as 

41 variable names, property names and dictionary keys. 

42 Literals are case sensitive and denoted by a preceding 

43 slash sign (e.g. "/Name") 

44 

45 Note: Do not create an instance of PSLiteral directly. 

46 Always use PSLiteralTable.intern(). 

47 """ 

48 

49 NameType = Union[str, bytes] 

50 

51 def __init__(self, name: NameType) -> None: 

52 self.name = name 

53 

54 def __repr__(self) -> str: 

55 name = self.name 

56 return "/%r" % name 

57 

58 

59class PSKeyword(PSObject): 

60 """A class that represents a PostScript keyword. 

61 

62 PostScript keywords are a dozen of predefined words. 

63 Commands and directives in PostScript are expressed by keywords. 

64 They are also used to denote the content boundaries. 

65 

66 Note: Do not create an instance of PSKeyword directly. 

67 Always use PSKeywordTable.intern(). 

68 """ 

69 

70 def __init__(self, name: bytes) -> None: 

71 self.name = name 

72 

73 def __repr__(self) -> str: 

74 name = self.name 

75 return "/%r" % name 

76 

77 

78_SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword) 

79 

80 

81class PSSymbolTable(Generic[_SymbolT]): 

82 """A utility class for storing PSLiteral/PSKeyword objects. 

83 

84 Interned objects can be checked its identity with "is" operator. 

85 """ 

86 

87 def __init__(self, klass: Type[_SymbolT]) -> None: 

88 self.dict: Dict[PSLiteral.NameType, _SymbolT] = {} 

89 self.klass: Type[_SymbolT] = klass 

90 

91 def intern(self, name: PSLiteral.NameType) -> _SymbolT: 

92 if name in self.dict: 

93 lit = self.dict[name] 

94 else: 

95 # Type confusion issue: PSKeyword always takes bytes as name 

96 # PSLiteral uses either str or bytes 

97 lit = self.klass(name) # type: ignore[arg-type] 

98 self.dict[name] = lit 

99 return lit 

100 

101 

102PSLiteralTable = PSSymbolTable(PSLiteral) 

103PSKeywordTable = PSSymbolTable(PSKeyword) 

104LIT = PSLiteralTable.intern 

105KWD = PSKeywordTable.intern 

106KEYWORD_PROC_BEGIN = KWD(b"{") 

107KEYWORD_PROC_END = KWD(b"}") 

108KEYWORD_ARRAY_BEGIN = KWD(b"[") 

109KEYWORD_ARRAY_END = KWD(b"]") 

110KEYWORD_DICT_BEGIN = KWD(b"<<") 

111KEYWORD_DICT_END = KWD(b">>") 

112 

113 

114def literal_name(x: Any) -> str: 

115 if isinstance(x, PSLiteral): 

116 if isinstance(x.name, str): 

117 return x.name 

118 try: 

119 return str(x.name, "utf-8") 

120 except UnicodeDecodeError: 

121 return str(x.name) 

122 else: 

123 if settings.STRICT: 

124 raise PSTypeError(f"Literal required: {x!r}") 

125 return str(x) 

126 

127 

128def keyword_name(x: Any) -> Any: 

129 if not isinstance(x, PSKeyword): 

130 if settings.STRICT: 

131 raise PSTypeError("Keyword required: %r" % x) 

132 else: 

133 name = x 

134 else: 

135 name = str(x.name, "utf-8", "ignore") 

136 return name 

137 

138 

139EOL = re.compile(rb"[\r\n]") 

140SPC = re.compile(rb"\s") 

141NONSPC = re.compile(rb"\S") 

142HEX = re.compile(rb"[0-9a-fA-F]") 

143END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]") 

144END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]") 

145HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.") 

146END_NUMBER = re.compile(rb"[^0-9]") 

147END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]") 

148END_STRING = re.compile(rb"[()\134]") 

149OCT_STRING = re.compile(rb"[0-7]") 

150ESC_STRING = { 

151 b"b": 8, 

152 b"t": 9, 

153 b"n": 10, 

154 b"f": 12, 

155 b"r": 13, 

156 b"(": 40, 

157 b")": 41, 

158 b"\\": 92, 

159} 

160 

161 

162PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes] 

163 

164 

165class PSBaseParser: 

166 """Most basic PostScript parser that performs only tokenization.""" 

167 

168 BUFSIZ = 4096 

169 

170 def __init__(self, fp: BinaryIO) -> None: 

171 self.fp = fp 

172 self.eof = False 

173 self.seek(0) 

174 

175 def __repr__(self) -> str: 

176 return "<%s: %r, bufpos=%d>" % (self.__class__.__name__, self.fp, self.bufpos) 

177 

178 def flush(self) -> None: 

179 pass 

180 

181 def close(self) -> None: 

182 self.flush() 

183 

184 def tell(self) -> int: 

185 return self.bufpos + self.charpos 

186 

187 def poll(self, pos: Optional[int] = None, n: int = 80) -> None: 

188 pos0 = self.fp.tell() 

189 if not pos: 

190 pos = self.bufpos + self.charpos 

191 self.fp.seek(pos) 

192 log.debug("poll(%d): %r", pos, self.fp.read(n)) 

193 self.fp.seek(pos0) 

194 

195 def seek(self, pos: int) -> None: 

196 """Seeks the parser to the given position.""" 

197 log.debug("seek: %r", pos) 

198 self.fp.seek(pos) 

199 # reset the status for nextline() 

200 self.bufpos = pos 

201 self.buf = b"" 

202 self.charpos = 0 

203 # reset the status for nexttoken() 

204 self._parse1 = self._parse_main 

205 self._curtoken = b"" 

206 self._curtokenpos = 0 

207 self._tokens: List[Tuple[int, PSBaseParserToken]] = [] 

208 self.eof = False 

209 

210 def fillbuf(self) -> None: 

211 if self.charpos < len(self.buf): 

212 return 

213 # fetch next chunk. 

214 self.bufpos = self.fp.tell() 

215 self.buf = self.fp.read(self.BUFSIZ) 

216 if not self.buf: 

217 raise PSEOF("Unexpected EOF") 

218 self.charpos = 0 

219 

220 def nextline(self) -> Tuple[int, bytes]: 

221 """Fetches a next line that ends either with \\r or \\n.""" 

222 linebuf = b"" 

223 linepos = self.bufpos + self.charpos 

224 eol = False 

225 while 1: 

226 self.fillbuf() 

227 if eol: 

228 c = self.buf[self.charpos : self.charpos + 1] 

229 # handle b'\r\n' 

230 if c == b"\n": 

231 linebuf += c 

232 self.charpos += 1 

233 break 

234 m = EOL.search(self.buf, self.charpos) 

235 if m: 

236 linebuf += self.buf[self.charpos : m.end(0)] 

237 self.charpos = m.end(0) 

238 if linebuf[-1:] == b"\r": 

239 eol = True 

240 else: 

241 break 

242 else: 

243 linebuf += self.buf[self.charpos :] 

244 self.charpos = len(self.buf) 

245 log.debug("nextline: %r, %r", linepos, linebuf) 

246 

247 return (linepos, linebuf) 

248 

249 def revreadlines(self) -> Iterator[bytes]: 

250 """Fetches a next line backword. 

251 

252 This is used to locate the trailers at the end of a file. 

253 """ 

254 self.fp.seek(0, io.SEEK_END) 

255 pos = self.fp.tell() 

256 buf = b"" 

257 while pos > 0: 

258 prevpos = pos 

259 pos = max(0, pos - self.BUFSIZ) 

260 self.fp.seek(pos) 

261 s = self.fp.read(prevpos - pos) 

262 if not s: 

263 break 

264 while 1: 

265 n = max(s.rfind(b"\r"), s.rfind(b"\n")) 

266 if n == -1: 

267 buf = s + buf 

268 break 

269 yield s[n:] + buf 

270 s = s[:n] 

271 buf = b"" 

272 

273 def _parse_main(self, s: bytes, i: int) -> int: 

274 m = NONSPC.search(s, i) 

275 if not m: 

276 return len(s) 

277 j = m.start(0) 

278 c = s[j : j + 1] 

279 self._curtokenpos = self.bufpos + j 

280 if c == b"%": 

281 self._curtoken = b"%" 

282 self._parse1 = self._parse_comment 

283 return j + 1 

284 elif c == b"/": 

285 self._curtoken = b"" 

286 self._parse1 = self._parse_literal 

287 return j + 1 

288 elif c in b"-+" or c.isdigit(): 

289 self._curtoken = c 

290 self._parse1 = self._parse_number 

291 return j + 1 

292 elif c == b".": 

293 self._curtoken = c 

294 self._parse1 = self._parse_float 

295 return j + 1 

296 elif c.isalpha(): 

297 self._curtoken = c 

298 self._parse1 = self._parse_keyword 

299 return j + 1 

300 elif c == b"(": 

301 self._curtoken = b"" 

302 self.paren = 1 

303 self._parse1 = self._parse_string 

304 return j + 1 

305 elif c == b"<": 

306 self._curtoken = b"" 

307 self._parse1 = self._parse_wopen 

308 return j + 1 

309 elif c == b">": 

310 self._curtoken = b"" 

311 self._parse1 = self._parse_wclose 

312 return j + 1 

313 elif c == b"\x00": 

314 return j + 1 

315 else: 

316 self._add_token(KWD(c)) 

317 return j + 1 

318 

319 def _add_token(self, obj: PSBaseParserToken) -> None: 

320 self._tokens.append((self._curtokenpos, obj)) 

321 

322 def _parse_comment(self, s: bytes, i: int) -> int: 

323 m = EOL.search(s, i) 

324 if not m: 

325 self._curtoken += s[i:] 

326 return len(s) 

327 j = m.start(0) 

328 self._curtoken += s[i:j] 

329 self._parse1 = self._parse_main 

330 # We ignore comments. 

331 # self._tokens.append(self._curtoken) 

332 return j 

333 

334 def _parse_literal(self, s: bytes, i: int) -> int: 

335 m = END_LITERAL.search(s, i) 

336 if not m: 

337 self._curtoken += s[i:] 

338 return len(s) 

339 j = m.start(0) 

340 self._curtoken += s[i:j] 

341 c = s[j : j + 1] 

342 if c == b"#": 

343 self.hex = b"" 

344 self._parse1 = self._parse_literal_hex 

345 return j + 1 

346 try: 

347 name: Union[str, bytes] = str(self._curtoken, "utf-8") 

348 except Exception: 

349 name = self._curtoken 

350 self._add_token(LIT(name)) 

351 self._parse1 = self._parse_main 

352 return j 

353 

354 def _parse_literal_hex(self, s: bytes, i: int) -> int: 

355 c = s[i : i + 1] 

356 if HEX.match(c) and len(self.hex) < 2: 

357 self.hex += c 

358 return i + 1 

359 if self.hex: 

360 self._curtoken += bytes((int(self.hex, 16),)) 

361 self._parse1 = self._parse_literal 

362 return i 

363 

364 def _parse_number(self, s: bytes, i: int) -> int: 

365 m = END_NUMBER.search(s, i) 

366 if not m: 

367 self._curtoken += s[i:] 

368 return len(s) 

369 j = m.start(0) 

370 self._curtoken += s[i:j] 

371 c = s[j : j + 1] 

372 if c == b".": 

373 self._curtoken += c 

374 self._parse1 = self._parse_float 

375 return j + 1 

376 try: 

377 self._add_token(int(self._curtoken)) 

378 except ValueError: 

379 pass 

380 self._parse1 = self._parse_main 

381 return j 

382 

383 def _parse_float(self, s: bytes, i: int) -> int: 

384 m = END_NUMBER.search(s, i) 

385 if not m: 

386 self._curtoken += s[i:] 

387 return len(s) 

388 j = m.start(0) 

389 self._curtoken += s[i:j] 

390 try: 

391 self._add_token(float(self._curtoken)) 

392 except ValueError: 

393 pass 

394 self._parse1 = self._parse_main 

395 return j 

396 

397 def _parse_keyword(self, s: bytes, i: int) -> int: 

398 m = END_KEYWORD.search(s, i) 

399 if m: 

400 j = m.start(0) 

401 self._curtoken += s[i:j] 

402 else: 

403 self._curtoken += s[i:] 

404 return len(s) 

405 if self._curtoken == b"true": 

406 token: Union[bool, PSKeyword] = True 

407 elif self._curtoken == b"false": 

408 token = False 

409 else: 

410 token = KWD(self._curtoken) 

411 self._add_token(token) 

412 self._parse1 = self._parse_main 

413 return j 

414 

415 def _parse_string(self, s: bytes, i: int) -> int: 

416 m = END_STRING.search(s, i) 

417 if not m: 

418 self._curtoken += s[i:] 

419 return len(s) 

420 j = m.start(0) 

421 self._curtoken += s[i:j] 

422 c = s[j : j + 1] 

423 if c == b"\\": 

424 self.oct = b"" 

425 self._parse1 = self._parse_string_1 

426 return j + 1 

427 if c == b"(": 

428 self.paren += 1 

429 self._curtoken += c 

430 return j + 1 

431 if c == b")": 

432 self.paren -= 1 

433 if self.paren: 

434 # WTF, they said balanced parens need no special treatment. 

435 self._curtoken += c 

436 return j + 1 

437 self._add_token(self._curtoken) 

438 self._parse1 = self._parse_main 

439 return j + 1 

440 

441 def _parse_string_1(self, s: bytes, i: int) -> int: 

442 """Parse literal strings 

443 

444 PDF Reference 3.2.3 

445 """ 

446 c = s[i : i + 1] 

447 if OCT_STRING.match(c) and len(self.oct) < 3: 

448 self.oct += c 

449 return i + 1 

450 

451 elif self.oct: 

452 chrcode = int(self.oct, 8) 

453 assert chrcode < 256, "Invalid octal %s (%d)" % (repr(self.oct), chrcode) 

454 self._curtoken += bytes((chrcode,)) 

455 self._parse1 = self._parse_string 

456 return i 

457 

458 elif c in ESC_STRING: 

459 self._curtoken += bytes((ESC_STRING[c],)) 

460 

461 elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n": 

462 # If current and next character is \r\n skip both because enters 

463 # after a \ are ignored 

464 i += 1 

465 

466 # default action 

467 self._parse1 = self._parse_string 

468 return i + 1 

469 

470 def _parse_wopen(self, s: bytes, i: int) -> int: 

471 c = s[i : i + 1] 

472 if c == b"<": 

473 self._add_token(KEYWORD_DICT_BEGIN) 

474 self._parse1 = self._parse_main 

475 i += 1 

476 else: 

477 self._parse1 = self._parse_hexstring 

478 return i 

479 

480 def _parse_wclose(self, s: bytes, i: int) -> int: 

481 c = s[i : i + 1] 

482 if c == b">": 

483 self._add_token(KEYWORD_DICT_END) 

484 i += 1 

485 self._parse1 = self._parse_main 

486 return i 

487 

488 def _parse_hexstring(self, s: bytes, i: int) -> int: 

489 m = END_HEX_STRING.search(s, i) 

490 if not m: 

491 self._curtoken += s[i:] 

492 return len(s) 

493 j = m.start(0) 

494 self._curtoken += s[i:j] 

495 token = HEX_PAIR.sub( 

496 lambda m: bytes((int(m.group(0), 16),)), 

497 SPC.sub(b"", self._curtoken), 

498 ) 

499 self._add_token(token) 

500 self._parse1 = self._parse_main 

501 return j 

502 

503 def nexttoken(self) -> Tuple[int, PSBaseParserToken]: 

504 if self.eof: 

505 # It's not really unexpected, come on now... 

506 raise PSEOF("Unexpected EOF") 

507 while not self._tokens: 

508 try: 

509 self.fillbuf() 

510 self.charpos = self._parse1(self.buf, self.charpos) 

511 except PSEOF: 

512 # If we hit EOF in the middle of a token, try to parse 

513 # it by tacking on whitespace, and delay raising PSEOF 

514 # until next time around 

515 self.charpos = self._parse1(b"\n", 0) 

516 self.eof = True 

517 # Oh, so there wasn't actually a token there? OK. 

518 if not self._tokens: 

519 raise 

520 token = self._tokens.pop(0) 

521 log.debug("nexttoken: %r", token) 

522 return token 

523 

524 

525# Stack slots may by occupied by any of: 

526# * the name of a literal 

527# * the PSBaseParserToken types 

528# * list (via KEYWORD_ARRAY) 

529# * dict (via KEYWORD_DICT) 

530# * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT 

531ExtraT = TypeVar("ExtraT") 

532PSStackType = Union[str, float, bool, PSLiteral, bytes, List, Dict, ExtraT] 

533PSStackEntry = Tuple[int, PSStackType[ExtraT]] 

534 

535 

536class PSStackParser(PSBaseParser, Generic[ExtraT]): 

537 def __init__(self, fp: BinaryIO) -> None: 

538 PSBaseParser.__init__(self, fp) 

539 self.reset() 

540 

541 def reset(self) -> None: 

542 self.context: List[Tuple[int, Optional[str], List[PSStackEntry[ExtraT]]]] = [] 

543 self.curtype: Optional[str] = None 

544 self.curstack: List[PSStackEntry[ExtraT]] = [] 

545 self.results: List[PSStackEntry[ExtraT]] = [] 

546 

547 def seek(self, pos: int) -> None: 

548 PSBaseParser.seek(self, pos) 

549 self.reset() 

550 

551 def push(self, *objs: PSStackEntry[ExtraT]) -> None: 

552 self.curstack.extend(objs) 

553 

554 def pop(self, n: int) -> List[PSStackEntry[ExtraT]]: 

555 objs = self.curstack[-n:] 

556 self.curstack[-n:] = [] 

557 return objs 

558 

559 def popall(self) -> List[PSStackEntry[ExtraT]]: 

560 objs = self.curstack 

561 self.curstack = [] 

562 return objs 

563 

564 def add_results(self, *objs: PSStackEntry[ExtraT]) -> None: 

565 try: 

566 log.debug("add_results: %r", objs) 

567 except Exception: 

568 log.debug("add_results: (unprintable object)") 

569 self.results.extend(objs) 

570 

571 def start_type(self, pos: int, type: str) -> None: 

572 self.context.append((pos, self.curtype, self.curstack)) 

573 (self.curtype, self.curstack) = (type, []) 

574 log.debug("start_type: pos=%r, type=%r", pos, type) 

575 

576 def end_type(self, type: str) -> Tuple[int, List[PSStackType[ExtraT]]]: 

577 if self.curtype != type: 

578 raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}") 

579 objs = [obj for (_, obj) in self.curstack] 

580 (pos, self.curtype, self.curstack) = self.context.pop() 

581 log.debug("end_type: pos=%r, type=%r, objs=%r", pos, type, objs) 

582 return (pos, objs) 

583 

584 def do_keyword(self, pos: int, token: PSKeyword) -> None: 

585 pass 

586 

587 def nextobject(self) -> PSStackEntry[ExtraT]: 

588 """Yields a list of objects. 

589 

590 Arrays and dictionaries are represented as Python lists and 

591 dictionaries. 

592 

593 :return: keywords, literals, strings, numbers, arrays and dictionaries. 

594 """ 

595 while not self.results: 

596 (pos, token) = self.nexttoken() 

597 if isinstance(token, (int, float, bool, str, bytes, PSLiteral)): 

598 # normal token 

599 self.push((pos, token)) 

600 elif token == KEYWORD_ARRAY_BEGIN: 

601 # begin array 

602 self.start_type(pos, "a") 

603 elif token == KEYWORD_ARRAY_END: 

604 # end array 

605 try: 

606 self.push(self.end_type("a")) 

607 except PSTypeError: 

608 if settings.STRICT: 

609 raise 

610 elif token == KEYWORD_DICT_BEGIN: 

611 # begin dictionary 

612 self.start_type(pos, "d") 

613 elif token == KEYWORD_DICT_END: 

614 # end dictionary 

615 try: 

616 (pos, objs) = self.end_type("d") 

617 if len(objs) % 2 != 0: 

618 error_msg = "Invalid dictionary construct: %r" % objs 

619 raise PSSyntaxError(error_msg) 

620 d = { 

621 literal_name(k): v 

622 for (k, v) in choplist(2, objs) 

623 if v is not None 

624 } 

625 self.push((pos, d)) 

626 except PSTypeError: 

627 if settings.STRICT: 

628 raise 

629 elif token == KEYWORD_PROC_BEGIN: 

630 # begin proc 

631 self.start_type(pos, "p") 

632 elif token == KEYWORD_PROC_END: 

633 # end proc 

634 try: 

635 self.push(self.end_type("p")) 

636 except PSTypeError: 

637 if settings.STRICT: 

638 raise 

639 elif isinstance(token, PSKeyword): 

640 log.debug( 

641 "do_keyword: pos=%r, token=%r, stack=%r", 

642 pos, 

643 token, 

644 self.curstack, 

645 ) 

646 self.do_keyword(pos, token) 

647 else: 

648 log.error( 

649 "unknown token: pos=%r, token=%r, stack=%r", 

650 pos, 

651 token, 

652 self.curstack, 

653 ) 

654 self.do_keyword(pos, token) 

655 raise PSException 

656 if self.context: 

657 continue 

658 else: 

659 self.flush() 

660 obj = self.results.pop(0) 

661 try: 

662 log.debug("nextobject: %r", obj) 

663 except Exception: 

664 log.debug("nextobject: (unprintable object)") 

665 return obj