Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/psparser.py: 94%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

452 statements  

1#!/usr/bin/env python3 

2import io 

3import logging 

4import re 

5from typing import ( 

6 Any, 

7 BinaryIO, 

8 Dict, 

9 Generic, 

10 Iterator, 

11 List, 

12 Optional, 

13 Tuple, 

14 Type, 

15 TypeVar, 

16 Union, 

17) 

18 

19from pdfminer import psexceptions, settings 

20from pdfminer.utils import choplist 

21 

22log = logging.getLogger(__name__) 

23 

24 

25# Adding aliases for these exceptions for backwards compatibility 

26PSException = psexceptions.PSException 

27PSEOF = psexceptions.PSEOF 

28PSSyntaxError = psexceptions.PSSyntaxError 

29PSTypeError = psexceptions.PSTypeError 

30PSValueError = psexceptions.PSValueError 

31 

32 

33class PSObject: 

34 """Base class for all PS or PDF-related data types.""" 

35 

36 

37class PSLiteral(PSObject): 

38 """A class that represents a PostScript literal. 

39 

40 Postscript literals are used as identifiers, such as 

41 variable names, property names and dictionary keys. 

42 Literals are case sensitive and denoted by a preceding 

43 slash sign (e.g. "/Name") 

44 

45 Note: Do not create an instance of PSLiteral directly. 

46 Always use PSLiteralTable.intern(). 

47 """ 

48 

49 NameType = Union[str, bytes] 

50 

51 def __init__(self, name: NameType) -> None: 

52 self.name = name 

53 

54 def __repr__(self) -> str: 

55 name = self.name 

56 return "/%r" % name 

57 

58 

59class PSKeyword(PSObject): 

60 """A class that represents a PostScript keyword. 

61 

62 PostScript keywords are a dozen of predefined words. 

63 Commands and directives in PostScript are expressed by keywords. 

64 They are also used to denote the content boundaries. 

65 

66 Note: Do not create an instance of PSKeyword directly. 

67 Always use PSKeywordTable.intern(). 

68 """ 

69 

70 def __init__(self, name: bytes) -> None: 

71 self.name = name 

72 

73 def __repr__(self) -> str: 

74 name = self.name 

75 return "/%r" % name 

76 

77 

78_SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword) 

79 

80 

81class PSSymbolTable(Generic[_SymbolT]): 

82 """A utility class for storing PSLiteral/PSKeyword objects. 

83 

84 Interned objects can be checked its identity with "is" operator. 

85 """ 

86 

87 def __init__(self, klass: Type[_SymbolT]) -> None: 

88 self.dict: Dict[PSLiteral.NameType, _SymbolT] = {} 

89 self.klass: Type[_SymbolT] = klass 

90 

91 def intern(self, name: PSLiteral.NameType) -> _SymbolT: 

92 if name in self.dict: 

93 lit = self.dict[name] 

94 else: 

95 # Type confusion issue: PSKeyword always takes bytes as name 

96 # PSLiteral uses either str or bytes 

97 lit = self.klass(name) # type: ignore[arg-type] 

98 self.dict[name] = lit 

99 return lit 

100 

101 

102PSLiteralTable = PSSymbolTable(PSLiteral) 

103PSKeywordTable = PSSymbolTable(PSKeyword) 

104LIT = PSLiteralTable.intern 

105KWD = PSKeywordTable.intern 

106KEYWORD_PROC_BEGIN = KWD(b"{") 

107KEYWORD_PROC_END = KWD(b"}") 

108KEYWORD_ARRAY_BEGIN = KWD(b"[") 

109KEYWORD_ARRAY_END = KWD(b"]") 

110KEYWORD_DICT_BEGIN = KWD(b"<<") 

111KEYWORD_DICT_END = KWD(b">>") 

112 

113 

114def literal_name(x: Any) -> str: 

115 if isinstance(x, PSLiteral): 

116 if isinstance(x.name, str): 

117 return x.name 

118 try: 

119 return str(x.name, "utf-8") 

120 except UnicodeDecodeError: 

121 return str(x.name) 

122 else: 

123 if settings.STRICT: 

124 raise PSTypeError(f"Literal required: {x!r}") 

125 return str(x) 

126 

127 

128def keyword_name(x: Any) -> Any: 

129 if not isinstance(x, PSKeyword): 

130 if settings.STRICT: 

131 raise PSTypeError("Keyword required: %r" % x) 

132 else: 

133 name = x 

134 else: 

135 name = str(x.name, "utf-8", "ignore") 

136 return name 

137 

138 

139EOL = re.compile(rb"[\r\n]") 

140SPC = re.compile(rb"\s") 

141NONSPC = re.compile(rb"\S") 

142HEX = re.compile(rb"[0-9a-fA-F]") 

143END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]") 

144END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]") 

145HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.") 

146END_NUMBER = re.compile(rb"[^0-9]") 

147END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]") 

148END_STRING = re.compile(rb"[()\134]") 

149OCT_STRING = re.compile(rb"[0-7]") 

150ESC_STRING = { 

151 b"b": 8, 

152 b"t": 9, 

153 b"n": 10, 

154 b"f": 12, 

155 b"r": 13, 

156 b"(": 40, 

157 b")": 41, 

158 b"\\": 92, 

159} 

160 

161 

162PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes] 

163 

164 

165class PSBaseParser: 

166 """Most basic PostScript parser that performs only tokenization.""" 

167 

168 BUFSIZ = 4096 

169 

170 def __init__(self, fp: BinaryIO) -> None: 

171 self.fp = fp 

172 self.seek(0) 

173 

174 def __repr__(self) -> str: 

175 return "<%s: %r, bufpos=%d>" % (self.__class__.__name__, self.fp, self.bufpos) 

176 

177 def flush(self) -> None: 

178 pass 

179 

180 def close(self) -> None: 

181 self.flush() 

182 

183 def tell(self) -> int: 

184 return self.bufpos + self.charpos 

185 

186 def poll(self, pos: Optional[int] = None, n: int = 80) -> None: 

187 pos0 = self.fp.tell() 

188 if not pos: 

189 pos = self.bufpos + self.charpos 

190 self.fp.seek(pos) 

191 log.debug("poll(%d): %r", pos, self.fp.read(n)) 

192 self.fp.seek(pos0) 

193 

194 def seek(self, pos: int) -> None: 

195 """Seeks the parser to the given position.""" 

196 log.debug("seek: %r", pos) 

197 self.fp.seek(pos) 

198 # reset the status for nextline() 

199 self.bufpos = pos 

200 self.buf = b"" 

201 self.charpos = 0 

202 # reset the status for nexttoken() 

203 self._parse1 = self._parse_main 

204 self._curtoken = b"" 

205 self._curtokenpos = 0 

206 self._tokens: List[Tuple[int, PSBaseParserToken]] = [] 

207 

208 def fillbuf(self) -> None: 

209 if self.charpos < len(self.buf): 

210 return 

211 # fetch next chunk. 

212 self.bufpos = self.fp.tell() 

213 self.buf = self.fp.read(self.BUFSIZ) 

214 if not self.buf: 

215 raise PSEOF("Unexpected EOF") 

216 self.charpos = 0 

217 

218 def nextline(self) -> Tuple[int, bytes]: 

219 """Fetches a next line that ends either with \\r or \\n.""" 

220 linebuf = b"" 

221 linepos = self.bufpos + self.charpos 

222 eol = False 

223 while 1: 

224 self.fillbuf() 

225 if eol: 

226 c = self.buf[self.charpos : self.charpos + 1] 

227 # handle b'\r\n' 

228 if c == b"\n": 

229 linebuf += c 

230 self.charpos += 1 

231 break 

232 m = EOL.search(self.buf, self.charpos) 

233 if m: 

234 linebuf += self.buf[self.charpos : m.end(0)] 

235 self.charpos = m.end(0) 

236 if linebuf[-1:] == b"\r": 

237 eol = True 

238 else: 

239 break 

240 else: 

241 linebuf += self.buf[self.charpos :] 

242 self.charpos = len(self.buf) 

243 log.debug("nextline: %r, %r", linepos, linebuf) 

244 

245 return (linepos, linebuf) 

246 

247 def revreadlines(self) -> Iterator[bytes]: 

248 """Fetches a next line backword. 

249 

250 This is used to locate the trailers at the end of a file. 

251 """ 

252 self.fp.seek(0, io.SEEK_END) 

253 pos = self.fp.tell() 

254 buf = b"" 

255 while pos > 0: 

256 prevpos = pos 

257 pos = max(0, pos - self.BUFSIZ) 

258 self.fp.seek(pos) 

259 s = self.fp.read(prevpos - pos) 

260 if not s: 

261 break 

262 while 1: 

263 n = max(s.rfind(b"\r"), s.rfind(b"\n")) 

264 if n == -1: 

265 buf = s + buf 

266 break 

267 yield s[n:] + buf 

268 s = s[:n] 

269 buf = b"" 

270 

271 def _parse_main(self, s: bytes, i: int) -> int: 

272 m = NONSPC.search(s, i) 

273 if not m: 

274 return len(s) 

275 j = m.start(0) 

276 c = s[j : j + 1] 

277 self._curtokenpos = self.bufpos + j 

278 if c == b"%": 

279 self._curtoken = b"%" 

280 self._parse1 = self._parse_comment 

281 return j + 1 

282 elif c == b"/": 

283 self._curtoken = b"" 

284 self._parse1 = self._parse_literal 

285 return j + 1 

286 elif c in b"-+" or c.isdigit(): 

287 self._curtoken = c 

288 self._parse1 = self._parse_number 

289 return j + 1 

290 elif c == b".": 

291 self._curtoken = c 

292 self._parse1 = self._parse_float 

293 return j + 1 

294 elif c.isalpha(): 

295 self._curtoken = c 

296 self._parse1 = self._parse_keyword 

297 return j + 1 

298 elif c == b"(": 

299 self._curtoken = b"" 

300 self.paren = 1 

301 self._parse1 = self._parse_string 

302 return j + 1 

303 elif c == b"<": 

304 self._curtoken = b"" 

305 self._parse1 = self._parse_wopen 

306 return j + 1 

307 elif c == b">": 

308 self._curtoken = b"" 

309 self._parse1 = self._parse_wclose 

310 return j + 1 

311 elif c == b"\x00": 

312 return j + 1 

313 else: 

314 self._add_token(KWD(c)) 

315 return j + 1 

316 

317 def _add_token(self, obj: PSBaseParserToken) -> None: 

318 self._tokens.append((self._curtokenpos, obj)) 

319 

320 def _parse_comment(self, s: bytes, i: int) -> int: 

321 m = EOL.search(s, i) 

322 if not m: 

323 self._curtoken += s[i:] 

324 return len(s) 

325 j = m.start(0) 

326 self._curtoken += s[i:j] 

327 self._parse1 = self._parse_main 

328 # We ignore comments. 

329 # self._tokens.append(self._curtoken) 

330 return j 

331 

332 def _parse_literal(self, s: bytes, i: int) -> int: 

333 m = END_LITERAL.search(s, i) 

334 if not m: 

335 self._curtoken += s[i:] 

336 return len(s) 

337 j = m.start(0) 

338 self._curtoken += s[i:j] 

339 c = s[j : j + 1] 

340 if c == b"#": 

341 self.hex = b"" 

342 self._parse1 = self._parse_literal_hex 

343 return j + 1 

344 try: 

345 name: Union[str, bytes] = str(self._curtoken, "utf-8") 

346 except Exception: 

347 name = self._curtoken 

348 self._add_token(LIT(name)) 

349 self._parse1 = self._parse_main 

350 return j 

351 

352 def _parse_literal_hex(self, s: bytes, i: int) -> int: 

353 c = s[i : i + 1] 

354 if HEX.match(c) and len(self.hex) < 2: 

355 self.hex += c 

356 return i + 1 

357 if self.hex: 

358 self._curtoken += bytes((int(self.hex, 16),)) 

359 self._parse1 = self._parse_literal 

360 return i 

361 

362 def _parse_number(self, s: bytes, i: int) -> int: 

363 m = END_NUMBER.search(s, i) 

364 if not m: 

365 self._curtoken += s[i:] 

366 return len(s) 

367 j = m.start(0) 

368 self._curtoken += s[i:j] 

369 c = s[j : j + 1] 

370 if c == b".": 

371 self._curtoken += c 

372 self._parse1 = self._parse_float 

373 return j + 1 

374 try: 

375 self._add_token(int(self._curtoken)) 

376 except ValueError: 

377 pass 

378 self._parse1 = self._parse_main 

379 return j 

380 

381 def _parse_float(self, s: bytes, i: int) -> int: 

382 m = END_NUMBER.search(s, i) 

383 if not m: 

384 self._curtoken += s[i:] 

385 return len(s) 

386 j = m.start(0) 

387 self._curtoken += s[i:j] 

388 try: 

389 self._add_token(float(self._curtoken)) 

390 except ValueError: 

391 pass 

392 self._parse1 = self._parse_main 

393 return j 

394 

395 def _parse_keyword(self, s: bytes, i: int) -> int: 

396 m = END_KEYWORD.search(s, i) 

397 if m: 

398 j = m.start(0) 

399 self._curtoken += s[i:j] 

400 else: 

401 # Use the rest of the stream if no non-keyword character is found. This 

402 # can happen if the keyword is the final bytes of the stream 

403 # (https://github.com/pdfminer/pdfminer.six/issues/884). 

404 j = len(s) 

405 self._curtoken += s[i:] 

406 if self._curtoken == b"true": 

407 token: Union[bool, PSKeyword] = True 

408 elif self._curtoken == b"false": 

409 token = False 

410 else: 

411 token = KWD(self._curtoken) 

412 self._add_token(token) 

413 self._parse1 = self._parse_main 

414 return j 

415 

416 def _parse_string(self, s: bytes, i: int) -> int: 

417 m = END_STRING.search(s, i) 

418 if not m: 

419 self._curtoken += s[i:] 

420 return len(s) 

421 j = m.start(0) 

422 self._curtoken += s[i:j] 

423 c = s[j : j + 1] 

424 if c == b"\\": 

425 self.oct = b"" 

426 self._parse1 = self._parse_string_1 

427 return j + 1 

428 if c == b"(": 

429 self.paren += 1 

430 self._curtoken += c 

431 return j + 1 

432 if c == b")": 

433 self.paren -= 1 

434 if self.paren: 

435 # WTF, they said balanced parens need no special treatment. 

436 self._curtoken += c 

437 return j + 1 

438 self._add_token(self._curtoken) 

439 self._parse1 = self._parse_main 

440 return j + 1 

441 

442 def _parse_string_1(self, s: bytes, i: int) -> int: 

443 """Parse literal strings 

444 

445 PDF Reference 3.2.3 

446 """ 

447 c = s[i : i + 1] 

448 if OCT_STRING.match(c) and len(self.oct) < 3: 

449 self.oct += c 

450 return i + 1 

451 

452 elif self.oct: 

453 chrcode = int(self.oct, 8) 

454 assert chrcode < 256, "Invalid octal %s (%d)" % (repr(self.oct), chrcode) 

455 self._curtoken += bytes((chrcode,)) 

456 self._parse1 = self._parse_string 

457 return i 

458 

459 elif c in ESC_STRING: 

460 self._curtoken += bytes((ESC_STRING[c],)) 

461 

462 elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n": 

463 # If current and next character is \r\n skip both because enters 

464 # after a \ are ignored 

465 i += 1 

466 

467 # default action 

468 self._parse1 = self._parse_string 

469 return i + 1 

470 

471 def _parse_wopen(self, s: bytes, i: int) -> int: 

472 c = s[i : i + 1] 

473 if c == b"<": 

474 self._add_token(KEYWORD_DICT_BEGIN) 

475 self._parse1 = self._parse_main 

476 i += 1 

477 else: 

478 self._parse1 = self._parse_hexstring 

479 return i 

480 

481 def _parse_wclose(self, s: bytes, i: int) -> int: 

482 c = s[i : i + 1] 

483 if c == b">": 

484 self._add_token(KEYWORD_DICT_END) 

485 i += 1 

486 self._parse1 = self._parse_main 

487 return i 

488 

489 def _parse_hexstring(self, s: bytes, i: int) -> int: 

490 m = END_HEX_STRING.search(s, i) 

491 if not m: 

492 self._curtoken += s[i:] 

493 return len(s) 

494 j = m.start(0) 

495 self._curtoken += s[i:j] 

496 token = HEX_PAIR.sub( 

497 lambda m: bytes((int(m.group(0), 16),)), 

498 SPC.sub(b"", self._curtoken), 

499 ) 

500 self._add_token(token) 

501 self._parse1 = self._parse_main 

502 return j 

503 

504 def nexttoken(self) -> Tuple[int, PSBaseParserToken]: 

505 while not self._tokens: 

506 self.fillbuf() 

507 self.charpos = self._parse1(self.buf, self.charpos) 

508 token = self._tokens.pop(0) 

509 log.debug("nexttoken: %r", token) 

510 return token 

511 

512 

513# Stack slots may by occupied by any of: 

514# * the name of a literal 

515# * the PSBaseParserToken types 

516# * list (via KEYWORD_ARRAY) 

517# * dict (via KEYWORD_DICT) 

518# * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT 

519ExtraT = TypeVar("ExtraT") 

520PSStackType = Union[str, float, bool, PSLiteral, bytes, List, Dict, ExtraT] 

521PSStackEntry = Tuple[int, PSStackType[ExtraT]] 

522 

523 

524class PSStackParser(PSBaseParser, Generic[ExtraT]): 

525 def __init__(self, fp: BinaryIO) -> None: 

526 PSBaseParser.__init__(self, fp) 

527 self.reset() 

528 

529 def reset(self) -> None: 

530 self.context: List[Tuple[int, Optional[str], List[PSStackEntry[ExtraT]]]] = [] 

531 self.curtype: Optional[str] = None 

532 self.curstack: List[PSStackEntry[ExtraT]] = [] 

533 self.results: List[PSStackEntry[ExtraT]] = [] 

534 

535 def seek(self, pos: int) -> None: 

536 PSBaseParser.seek(self, pos) 

537 self.reset() 

538 

539 def push(self, *objs: PSStackEntry[ExtraT]) -> None: 

540 self.curstack.extend(objs) 

541 

542 def pop(self, n: int) -> List[PSStackEntry[ExtraT]]: 

543 objs = self.curstack[-n:] 

544 self.curstack[-n:] = [] 

545 return objs 

546 

547 def popall(self) -> List[PSStackEntry[ExtraT]]: 

548 objs = self.curstack 

549 self.curstack = [] 

550 return objs 

551 

552 def add_results(self, *objs: PSStackEntry[ExtraT]) -> None: 

553 try: 

554 log.debug("add_results: %r", objs) 

555 except Exception: 

556 log.debug("add_results: (unprintable object)") 

557 self.results.extend(objs) 

558 

559 def start_type(self, pos: int, type: str) -> None: 

560 self.context.append((pos, self.curtype, self.curstack)) 

561 (self.curtype, self.curstack) = (type, []) 

562 log.debug("start_type: pos=%r, type=%r", pos, type) 

563 

564 def end_type(self, type: str) -> Tuple[int, List[PSStackType[ExtraT]]]: 

565 if self.curtype != type: 

566 raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}") 

567 objs = [obj for (_, obj) in self.curstack] 

568 (pos, self.curtype, self.curstack) = self.context.pop() 

569 log.debug("end_type: pos=%r, type=%r, objs=%r", pos, type, objs) 

570 return (pos, objs) 

571 

572 def do_keyword(self, pos: int, token: PSKeyword) -> None: 

573 pass 

574 

575 def nextobject(self) -> PSStackEntry[ExtraT]: 

576 """Yields a list of objects. 

577 

578 Arrays and dictionaries are represented as Python lists and 

579 dictionaries. 

580 

581 :return: keywords, literals, strings, numbers, arrays and dictionaries. 

582 """ 

583 while not self.results: 

584 (pos, token) = self.nexttoken() 

585 if isinstance(token, (int, float, bool, str, bytes, PSLiteral)): 

586 # normal token 

587 self.push((pos, token)) 

588 elif token == KEYWORD_ARRAY_BEGIN: 

589 # begin array 

590 self.start_type(pos, "a") 

591 elif token == KEYWORD_ARRAY_END: 

592 # end array 

593 try: 

594 self.push(self.end_type("a")) 

595 except PSTypeError: 

596 if settings.STRICT: 

597 raise 

598 elif token == KEYWORD_DICT_BEGIN: 

599 # begin dictionary 

600 self.start_type(pos, "d") 

601 elif token == KEYWORD_DICT_END: 

602 # end dictionary 

603 try: 

604 (pos, objs) = self.end_type("d") 

605 if len(objs) % 2 != 0: 

606 error_msg = "Invalid dictionary construct: %r" % objs 

607 raise PSSyntaxError(error_msg) 

608 d = { 

609 literal_name(k): v 

610 for (k, v) in choplist(2, objs) 

611 if v is not None 

612 } 

613 self.push((pos, d)) 

614 except PSTypeError: 

615 if settings.STRICT: 

616 raise 

617 elif token == KEYWORD_PROC_BEGIN: 

618 # begin proc 

619 self.start_type(pos, "p") 

620 elif token == KEYWORD_PROC_END: 

621 # end proc 

622 try: 

623 self.push(self.end_type("p")) 

624 except PSTypeError: 

625 if settings.STRICT: 

626 raise 

627 elif isinstance(token, PSKeyword): 

628 log.debug( 

629 "do_keyword: pos=%r, token=%r, stack=%r", 

630 pos, 

631 token, 

632 self.curstack, 

633 ) 

634 self.do_keyword(pos, token) 

635 else: 

636 log.error( 

637 "unknown token: pos=%r, token=%r, stack=%r", 

638 pos, 

639 token, 

640 self.curstack, 

641 ) 

642 self.do_keyword(pos, token) 

643 raise PSException 

644 if self.context: 

645 continue 

646 else: 

647 self.flush() 

648 obj = self.results.pop(0) 

649 try: 

650 log.debug("nextobject: %r", obj) 

651 except Exception: 

652 log.debug("nextobject: (unprintable object)") 

653 return obj