Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/psparser.py: 94%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

466 statements  

1#!/usr/bin/env python3 

2import contextlib 

3import io 

4import logging 

5import re 

6from collections.abc import Iterator 

7from typing import ( 

8 Any, 

9 BinaryIO, 

10 Generic, 

11 TypeVar, 

12 Union, 

13) 

14 

15from pdfminer import psexceptions, settings 

16from pdfminer.utils import choplist 

17 

18log = logging.getLogger(__name__) 

19 

20 

21# Adding aliases for these exceptions for backwards compatibility 

22PSException = psexceptions.PSException 

23PSEOF = psexceptions.PSEOF 

24PSSyntaxError = psexceptions.PSSyntaxError 

25PSTypeError = psexceptions.PSTypeError 

26PSValueError = psexceptions.PSValueError 

27 

28 

29class PSObject: 

30 """Base class for all PS or PDF-related data types.""" 

31 

32 

33class PSLiteral(PSObject): 

34 """A class that represents a PostScript literal. 

35 

36 Postscript literals are used as identifiers, such as 

37 variable names, property names and dictionary keys. 

38 Literals are case sensitive and denoted by a preceding 

39 slash sign (e.g. "/Name") 

40 

41 Note: Do not create an instance of PSLiteral directly. 

42 Always use PSLiteralTable.intern(). 

43 """ 

44 

45 NameType = Union[str, bytes] 

46 

47 def __init__(self, name: NameType) -> None: 

48 self.name = name 

49 

50 def __repr__(self) -> str: 

51 name = self.name 

52 return f"/{name!r}" 

53 

54 

55class PSKeyword(PSObject): 

56 """A class that represents a PostScript keyword. 

57 

58 PostScript keywords are a dozen of predefined words. 

59 Commands and directives in PostScript are expressed by keywords. 

60 They are also used to denote the content boundaries. 

61 

62 Note: Do not create an instance of PSKeyword directly. 

63 Always use PSKeywordTable.intern(). 

64 """ 

65 

66 def __init__(self, name: bytes) -> None: 

67 self.name = name 

68 

69 def __repr__(self) -> str: 

70 name = self.name 

71 return f"/{name!r}" 

72 

73 

74_SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword) 

75 

76 

77class PSSymbolTable(Generic[_SymbolT]): 

78 """A utility class for storing PSLiteral/PSKeyword objects. 

79 

80 Interned objects can be checked its identity with "is" operator. 

81 """ 

82 

83 def __init__(self, klass: type[_SymbolT]) -> None: 

84 self.dict: dict[PSLiteral.NameType, _SymbolT] = {} 

85 self.klass: type[_SymbolT] = klass 

86 

87 def intern(self, name: PSLiteral.NameType) -> _SymbolT: 

88 if name in self.dict: 

89 lit = self.dict[name] 

90 else: 

91 # Type confusion issue: PSKeyword always takes bytes as name 

92 # PSLiteral uses either str or bytes 

93 lit = self.klass(name) # type: ignore[arg-type] 

94 self.dict[name] = lit 

95 return lit 

96 

97 

98PSLiteralTable = PSSymbolTable(PSLiteral) 

99PSKeywordTable = PSSymbolTable(PSKeyword) 

100LIT = PSLiteralTable.intern 

101KWD = PSKeywordTable.intern 

102KEYWORD_PROC_BEGIN = KWD(b"{") 

103KEYWORD_PROC_END = KWD(b"}") 

104KEYWORD_ARRAY_BEGIN = KWD(b"[") 

105KEYWORD_ARRAY_END = KWD(b"]") 

106KEYWORD_DICT_BEGIN = KWD(b"<<") 

107KEYWORD_DICT_END = KWD(b">>") 

108 

109 

110def literal_name(x: Any) -> str: 

111 if isinstance(x, PSLiteral): 

112 if isinstance(x.name, str): 

113 return x.name 

114 try: 

115 return str(x.name, "utf-8") 

116 except UnicodeDecodeError: 

117 return str(x.name) 

118 else: 

119 if settings.STRICT: 

120 raise PSTypeError(f"Literal required: {x!r}") 

121 return str(x) 

122 

123 

124def keyword_name(x: Any) -> Any: 

125 if not isinstance(x, PSKeyword): 

126 if settings.STRICT: 

127 raise PSTypeError(f"Keyword required: {x!r}") 

128 else: 

129 name = x 

130 else: 

131 name = str(x.name, "utf-8", "ignore") 

132 return name 

133 

134 

135EOL = re.compile(rb"[\r\n]") 

136SPC = re.compile(rb"\s") 

137NONSPC = re.compile(rb"\S") 

138HEX = re.compile(rb"[0-9a-fA-F]") 

139END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]") 

140END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]") 

141HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.") 

142END_NUMBER = re.compile(rb"[^0-9]") 

143END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]") 

144END_STRING = re.compile(rb"[()\134]") 

145OCT_STRING = re.compile(rb"[0-7]") 

146ESC_STRING = { 

147 b"b": 8, 

148 b"t": 9, 

149 b"n": 10, 

150 b"f": 12, 

151 b"r": 13, 

152 b"(": 40, 

153 b")": 41, 

154 b"\\": 92, 

155} 

156 

157 

158PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes] 

159 

160 

161class PSBaseParser: 

162 """Most basic PostScript parser that performs only tokenization.""" 

163 

164 BUFSIZ = 4096 

165 

166 def __init__(self, fp: BinaryIO) -> None: 

167 self.fp = fp 

168 self.eof = False 

169 self.seek(0) 

170 

171 def __repr__(self) -> str: 

172 return f"<{self.__class__.__name__}: {self.fp!r}, bufpos={self.bufpos}>" 

173 

174 def flush(self) -> None: 

175 pass 

176 

177 def close(self) -> None: 

178 self.flush() 

179 

180 def tell(self) -> int: 

181 return self.bufpos + self.charpos 

182 

183 def poll(self, pos: int | None = None, n: int = 80) -> None: 

184 pos0 = self.fp.tell() 

185 if not pos: 

186 pos = self.bufpos + self.charpos 

187 self.fp.seek(pos) 

188 log.debug(f"poll({pos}): {self.fp.read(n)!r}") 

189 self.fp.seek(pos0) 

190 

191 def seek(self, pos: int) -> None: 

192 """Seeks the parser to the given position.""" 

193 log.debug(f"seek: {pos!r}") 

194 self.fp.seek(pos) 

195 # reset the status for nextline() 

196 self.bufpos = pos 

197 self.buf = b"" 

198 self.charpos = 0 

199 # reset the status for nexttoken() 

200 self._parse1 = self._parse_main 

201 self._curtoken = b"" 

202 self._curtokenpos = 0 

203 self._tokens: list[tuple[int, PSBaseParserToken]] = [] 

204 self.eof = False 

205 

206 def fillbuf(self) -> bool: 

207 if self.charpos < len(self.buf): 

208 return False 

209 # fetch next chunk. 

210 self.bufpos = self.fp.tell() 

211 self.buf = self.fp.read(self.BUFSIZ) 

212 if not self.buf: 

213 raise PSEOF("Unexpected EOF") 

214 self.charpos = 0 

215 return False 

216 

217 def nextline(self) -> tuple[int, bytes]: 

218 """Fetches a next line that ends either with \\r or \\n.""" 

219 linebuf = b"" 

220 linepos = self.bufpos + self.charpos 

221 eol = False 

222 while 1: 

223 self.fillbuf() 

224 if eol: 

225 c = self.buf[self.charpos : self.charpos + 1] 

226 # handle b'\r\n' 

227 if c == b"\n": 

228 linebuf += c 

229 self.charpos += 1 

230 break 

231 m = EOL.search(self.buf, self.charpos) 

232 if m: 

233 linebuf += self.buf[self.charpos : m.end(0)] 

234 self.charpos = m.end(0) 

235 if linebuf[-1:] == b"\r": 

236 eol = True 

237 else: 

238 break 

239 else: 

240 linebuf += self.buf[self.charpos :] 

241 self.charpos = len(self.buf) 

242 log.debug(f"nextline: {linepos!r}, {linebuf!r}") 

243 

244 return (linepos, linebuf) 

245 

246 def revreadlines(self) -> Iterator[bytes]: 

247 """Fetches a next line backward. 

248 

249 This is used to locate the trailers at the end of a file. 

250 """ 

251 self.fp.seek(0, io.SEEK_END) 

252 pos = self.fp.tell() 

253 buf = b"" 

254 while pos > 0: 

255 prevpos = pos 

256 pos = max(0, pos - self.BUFSIZ) 

257 self.fp.seek(pos) 

258 s = self.fp.read(prevpos - pos) 

259 if not s: 

260 break 

261 while 1: 

262 n = max(s.rfind(b"\r"), s.rfind(b"\n")) 

263 if n == -1: 

264 buf = s + buf 

265 break 

266 yield s[n:] + buf 

267 s = s[:n] 

268 buf = b"" 

269 

270 def _parse_main(self, s: bytes, i: int) -> int: 

271 m = NONSPC.search(s, i) 

272 if not m: 

273 return len(s) 

274 j = m.start(0) 

275 c = s[j : j + 1] 

276 self._curtokenpos = self.bufpos + j 

277 if c == b"%": 

278 self._curtoken = b"%" 

279 self._parse1 = self._parse_comment 

280 return j + 1 

281 elif c == b"/": 

282 self._curtoken = b"" 

283 self._parse1 = self._parse_literal 

284 return j + 1 

285 elif c in b"-+" or c.isdigit(): 

286 self._curtoken = c 

287 self._parse1 = self._parse_number 

288 return j + 1 

289 elif c == b".": 

290 self._curtoken = c 

291 self._parse1 = self._parse_float 

292 return j + 1 

293 elif c.isalpha(): 

294 self._curtoken = c 

295 self._parse1 = self._parse_keyword 

296 return j + 1 

297 elif c == b"(": 

298 self._curtoken = b"" 

299 self.paren = 1 

300 self._parse1 = self._parse_string 

301 return j + 1 

302 elif c == b"<": 

303 self._curtoken = b"" 

304 self._parse1 = self._parse_wopen 

305 return j + 1 

306 elif c == b">": 

307 self._curtoken = b"" 

308 self._parse1 = self._parse_wclose 

309 return j + 1 

310 elif c == b"\x00": 

311 return j + 1 

312 else: 

313 self._add_token(KWD(c)) 

314 return j + 1 

315 

316 def _add_token(self, obj: PSBaseParserToken) -> None: 

317 self._tokens.append((self._curtokenpos, obj)) 

318 

319 def _parse_comment(self, s: bytes, i: int) -> int: 

320 m = EOL.search(s, i) 

321 if not m: 

322 self._curtoken += s[i:] 

323 return len(s) 

324 j = m.start(0) 

325 self._curtoken += s[i:j] 

326 self._parse1 = self._parse_main 

327 # We ignore comments. 

328 # self._tokens.append(self._curtoken) 

329 return j 

330 

331 def _parse_literal(self, s: bytes, i: int) -> int: 

332 m = END_LITERAL.search(s, i) 

333 if not m: 

334 self._curtoken += s[i:] 

335 return len(s) 

336 j = m.start(0) 

337 self._curtoken += s[i:j] 

338 c = s[j : j + 1] 

339 if c == b"#": 

340 self.hex = b"" 

341 self._parse1 = self._parse_literal_hex 

342 return j + 1 

343 try: 

344 name: str | bytes = str(self._curtoken, "utf-8") 

345 except Exception: 

346 name = self._curtoken 

347 self._add_token(LIT(name)) 

348 self._parse1 = self._parse_main 

349 return j 

350 

351 def _parse_literal_hex(self, s: bytes, i: int) -> int: 

352 c = s[i : i + 1] 

353 if HEX.match(c) and len(self.hex) < 2: 

354 self.hex += c 

355 return i + 1 

356 if self.hex: 

357 self._curtoken += bytes((int(self.hex, 16),)) 

358 self._parse1 = self._parse_literal 

359 return i 

360 

361 def _parse_number(self, s: bytes, i: int) -> int: 

362 m = END_NUMBER.search(s, i) 

363 if not m: 

364 self._curtoken += s[i:] 

365 return len(s) 

366 j = m.start(0) 

367 self._curtoken += s[i:j] 

368 c = s[j : j + 1] 

369 if c == b".": 

370 self._curtoken += c 

371 self._parse1 = self._parse_float 

372 return j + 1 

373 with contextlib.suppress(ValueError): 

374 self._add_token(int(self._curtoken)) 

375 self._parse1 = self._parse_main 

376 return j 

377 

378 def _parse_float(self, s: bytes, i: int) -> int: 

379 m = END_NUMBER.search(s, i) 

380 if not m: 

381 self._curtoken += s[i:] 

382 return len(s) 

383 j = m.start(0) 

384 self._curtoken += s[i:j] 

385 with contextlib.suppress(ValueError): 

386 self._add_token(float(self._curtoken)) 

387 self._parse1 = self._parse_main 

388 return j 

389 

390 def _parse_keyword(self, s: bytes, i: int) -> int: 

391 m = END_KEYWORD.search(s, i) 

392 if m: 

393 j = m.start(0) 

394 self._curtoken += s[i:j] 

395 else: 

396 self._curtoken += s[i:] 

397 return len(s) 

398 if self._curtoken == b"true": 

399 token: bool | PSKeyword = True 

400 elif self._curtoken == b"false": 

401 token = False 

402 else: 

403 token = KWD(self._curtoken) 

404 self._add_token(token) 

405 self._parse1 = self._parse_main 

406 return j 

407 

408 def _parse_string(self, s: bytes, i: int) -> int: 

409 m = END_STRING.search(s, i) 

410 if not m: 

411 self._curtoken += s[i:] 

412 return len(s) 

413 j = m.start(0) 

414 self._curtoken += s[i:j] 

415 c = s[j : j + 1] 

416 if c == b"\\": 

417 self.oct = b"" 

418 self._parse1 = self._parse_string_1 

419 return j + 1 

420 if c == b"(": 

421 self.paren += 1 

422 self._curtoken += c 

423 return j + 1 

424 if c == b")": 

425 self.paren -= 1 

426 if self.paren: 

427 # WTF, they said balanced parens need no special treatment. 

428 self._curtoken += c 

429 return j + 1 

430 self._add_token(self._curtoken) 

431 self._parse1 = self._parse_main 

432 return j + 1 

433 

434 def _parse_string_1(self, s: bytes, i: int) -> int: 

435 """Parse literal strings 

436 

437 PDF Reference 3.2.3 

438 """ 

439 c = s[i : i + 1] 

440 if OCT_STRING.match(c) and len(self.oct) < 3: 

441 self.oct += c 

442 return i + 1 

443 

444 elif self.oct: 

445 chrcode = int(self.oct, 8) 

446 assert chrcode < 256, f"Invalid octal {self.oct!r} ({chrcode})" 

447 self._curtoken += bytes((chrcode,)) 

448 self._parse1 = self._parse_string 

449 return i 

450 

451 elif c in ESC_STRING: 

452 self._curtoken += bytes((ESC_STRING[c],)) 

453 

454 elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n": 

455 # If current and next character is \r\n skip both because enters 

456 # after a \ are ignored 

457 i += 1 

458 

459 # default action 

460 self._parse1 = self._parse_string 

461 return i + 1 

462 

463 def _parse_wopen(self, s: bytes, i: int) -> int: 

464 c = s[i : i + 1] 

465 if c == b"<": 

466 self._add_token(KEYWORD_DICT_BEGIN) 

467 self._parse1 = self._parse_main 

468 i += 1 

469 else: 

470 self._parse1 = self._parse_hexstring 

471 return i 

472 

473 def _parse_wclose(self, s: bytes, i: int) -> int: 

474 c = s[i : i + 1] 

475 if c == b">": 

476 self._add_token(KEYWORD_DICT_END) 

477 i += 1 

478 self._parse1 = self._parse_main 

479 return i 

480 

481 def _parse_hexstring(self, s: bytes, i: int) -> int: 

482 m = END_HEX_STRING.search(s, i) 

483 if not m: 

484 self._curtoken += s[i:] 

485 return len(s) 

486 j = m.start(0) 

487 self._curtoken += s[i:j] 

488 token = HEX_PAIR.sub( 

489 lambda m: bytes((int(m.group(0), 16),)), 

490 SPC.sub(b"", self._curtoken), 

491 ) 

492 self._add_token(token) 

493 self._parse1 = self._parse_main 

494 return j 

495 

496 def nexttoken(self) -> tuple[int, PSBaseParserToken]: 

497 if self.eof: 

498 # It's not really unexpected, come on now... 

499 raise PSEOF("Unexpected EOF") 

500 while not self._tokens: 

501 try: 

502 changed_stream = self.fillbuf() 

503 if changed_stream and self._curtoken: 

504 # Fixes #1157: if the stream is changed in the middle of a token, 

505 # try to parse it by tacking on whitespace. 

506 self._parse1(b"\n", 0) 

507 else: 

508 self.charpos = self._parse1(self.buf, self.charpos) 

509 except PSEOF: 

510 # If we hit EOF in the middle of a token, try to parse 

511 # it by tacking on whitespace, and delay raising PSEOF 

512 # until next time around 

513 self.charpos = self._parse1(b"\n", 0) 

514 self.eof = True 

515 # Oh, so there wasn't actually a token there? OK. 

516 if not self._tokens: 

517 raise 

518 token = self._tokens.pop(0) 

519 log.debug(f"nexttoken: {token!r}") 

520 return token 

521 

522 

523# Stack slots may by occupied by any of: 

524# * the name of a literal 

525# * the PSBaseParserToken types 

526# * list (via KEYWORD_ARRAY) 

527# * dict (via KEYWORD_DICT) 

528# * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT 

529ExtraT = TypeVar("ExtraT") 

530PSStackType = Union[ 

531 str, float, bool, PSLiteral, bytes, list[Any], dict[Any, Any], ExtraT 

532] 

533PSStackEntry = tuple[int, PSStackType[ExtraT]] 

534 

535 

536class PSStackParser(PSBaseParser, Generic[ExtraT]): 

537 def __init__(self, fp: BinaryIO) -> None: 

538 PSBaseParser.__init__(self, fp) 

539 self.reset() 

540 

541 def reset(self) -> None: 

542 self.context: list[tuple[int, str | None, list[PSStackEntry[ExtraT]]]] = [] 

543 self.curtype: str | None = None 

544 self.curstack: list[PSStackEntry[ExtraT]] = [] 

545 self.results: list[PSStackEntry[ExtraT]] = [] 

546 

547 def seek(self, pos: int) -> None: 

548 PSBaseParser.seek(self, pos) 

549 self.reset() 

550 

551 def push(self, *objs: PSStackEntry[ExtraT]) -> None: 

552 self.curstack.extend(objs) 

553 

554 def pop(self, n: int) -> list[PSStackEntry[ExtraT]]: 

555 objs = self.curstack[-n:] 

556 self.curstack[-n:] = [] 

557 return objs 

558 

559 def popall(self) -> list[PSStackEntry[ExtraT]]: 

560 objs = self.curstack 

561 self.curstack = [] 

562 return objs 

563 

564 def add_results(self, *objs: PSStackEntry[ExtraT]) -> None: 

565 try: 

566 log.debug(f"add_results: {objs!r}") 

567 except Exception: 

568 log.debug("add_results: (unprintable object)") 

569 self.results.extend(objs) 

570 

571 def start_type(self, pos: int, type: str) -> None: 

572 self.context.append((pos, self.curtype, self.curstack)) 

573 (self.curtype, self.curstack) = (type, []) 

574 log.debug(f"start_type: pos={pos!r}, type={type!r}") 

575 

576 def end_type(self, type: str) -> tuple[int, list[PSStackType[ExtraT]]]: 

577 if self.curtype != type: 

578 raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}") 

579 objs = [obj for (_, obj) in self.curstack] 

580 (pos, self.curtype, self.curstack) = self.context.pop() 

581 log.debug(f"end_type: pos={pos!r}, type={type!r}, objs={objs!r}") 

582 return (pos, objs) 

583 

584 def do_keyword(self, pos: int, token: PSKeyword) -> None: 

585 pass 

586 

587 def nextobject(self) -> PSStackEntry[ExtraT]: 

588 """Yields a list of objects. 

589 

590 Arrays and dictionaries are represented as Python lists and 

591 dictionaries. 

592 

593 :return: keywords, literals, strings, numbers, arrays and dictionaries. 

594 """ 

595 while not self.results: 

596 (pos, token) = self.nexttoken() 

597 if isinstance(token, (int, float, bool, str, bytes, PSLiteral)): 

598 # normal token 

599 self.push((pos, token)) 

600 elif token == KEYWORD_ARRAY_BEGIN: 

601 # begin array 

602 self.start_type(pos, "a") 

603 elif token == KEYWORD_ARRAY_END: 

604 # end array 

605 try: 

606 self.push(self.end_type("a")) 

607 except PSTypeError: 

608 if settings.STRICT: 

609 raise 

610 elif token == KEYWORD_DICT_BEGIN: 

611 # begin dictionary 

612 self.start_type(pos, "d") 

613 elif token == KEYWORD_DICT_END: 

614 # end dictionary 

615 try: 

616 (pos, objs) = self.end_type("d") 

617 if len(objs) % 2 != 0: 

618 error_msg = f"Invalid dictionary construct: {objs!r}" 

619 raise PSSyntaxError(error_msg) 

620 d = { 

621 literal_name(k): v 

622 for (k, v) in choplist(2, objs) 

623 if v is not None 

624 } 

625 self.push((pos, d)) 

626 except PSTypeError: 

627 if settings.STRICT: 

628 raise 

629 elif token == KEYWORD_PROC_BEGIN: 

630 # begin proc 

631 self.start_type(pos, "p") 

632 elif token == KEYWORD_PROC_END: 

633 # end proc 

634 try: 

635 self.push(self.end_type("p")) 

636 except PSTypeError: 

637 if settings.STRICT: 

638 raise 

639 elif isinstance(token, PSKeyword): 

640 log.debug( 

641 f"do_keyword: pos={pos!r}, token={token!r}, stack={self.curstack!r}" 

642 ) 

643 self.do_keyword(pos, token) 

644 else: 

645 log.error( 

646 f"unknown token: pos={pos!r}, " 

647 f"token={token!r}, stack={self.curstack!r}" 

648 ) 

649 self.do_keyword(pos, token) 

650 raise PSException 

651 if self.context: 

652 continue 

653 else: 

654 self.flush() 

655 obj = self.results.pop(0) 

656 try: 

657 log.debug(f"nextobject: {obj!r}") 

658 except Exception: 

659 log.debug("nextobject: (unprintable object)") 

660 return obj