Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/psparser.py: 96%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

455 statements  

1#!/usr/bin/env python3 

2import contextlib 

3import io 

4import logging 

5import re 

6from collections.abc import Iterator 

7from typing import ( 

8 Any, 

9 BinaryIO, 

10 Generic, 

11 TypeVar, 

12 Union, 

13) 

14 

15from pdfminer import psexceptions, settings 

16from pdfminer.utils import choplist 

17 

18log = logging.getLogger(__name__) 

19 

20 

21# Adding aliases for these exceptions for backwards compatibility 

22PSException = psexceptions.PSException 

23PSEOF = psexceptions.PSEOF 

24PSSyntaxError = psexceptions.PSSyntaxError 

25PSTypeError = psexceptions.PSTypeError 

26PSValueError = psexceptions.PSValueError 

27 

28 

29class PSObject: 

30 """Base class for all PS or PDF-related data types.""" 

31 

32 

33class PSLiteral(PSObject): 

34 """A class that represents a PostScript literal. 

35 

36 Postscript literals are used as identifiers, such as 

37 variable names, property names and dictionary keys. 

38 Literals are case sensitive and denoted by a preceding 

39 slash sign (e.g. "/Name") 

40 

41 Note: Do not create an instance of PSLiteral directly. 

42 Always use PSLiteralTable.intern(). 

43 """ 

44 

45 NameType = Union[str, bytes] 

46 

47 def __init__(self, name: NameType) -> None: 

48 self.name = name 

49 

50 def __repr__(self) -> str: 

51 name = self.name 

52 return f"/{name!r}" 

53 

54 

55class PSKeyword(PSObject): 

56 """A class that represents a PostScript keyword. 

57 

58 PostScript keywords are a dozen of predefined words. 

59 Commands and directives in PostScript are expressed by keywords. 

60 They are also used to denote the content boundaries. 

61 

62 Note: Do not create an instance of PSKeyword directly. 

63 Always use PSKeywordTable.intern(). 

64 """ 

65 

66 def __init__(self, name: bytes) -> None: 

67 self.name = name 

68 

69 def __repr__(self) -> str: 

70 name = self.name 

71 return f"/{name!r}" 

72 

73 

74_SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword) 

75 

76 

77class PSSymbolTable(Generic[_SymbolT]): 

78 """A utility class for storing PSLiteral/PSKeyword objects. 

79 

80 Interned objects can be checked its identity with "is" operator. 

81 """ 

82 

83 def __init__(self, klass: type[_SymbolT]) -> None: 

84 self.dict: dict[PSLiteral.NameType, _SymbolT] = {} 

85 self.klass: type[_SymbolT] = klass 

86 

87 def intern(self, name: PSLiteral.NameType) -> _SymbolT: 

88 if name in self.dict: 

89 lit = self.dict[name] 

90 else: 

91 # Type confusion issue: PSKeyword always takes bytes as name 

92 # PSLiteral uses either str or bytes 

93 lit = self.klass(name) # type: ignore[arg-type] 

94 self.dict[name] = lit 

95 return lit 

96 

97 

98PSLiteralTable = PSSymbolTable(PSLiteral) 

99PSKeywordTable = PSSymbolTable(PSKeyword) 

100LIT = PSLiteralTable.intern 

101KWD = PSKeywordTable.intern 

102KEYWORD_PROC_BEGIN = KWD(b"{") 

103KEYWORD_PROC_END = KWD(b"}") 

104KEYWORD_ARRAY_BEGIN = KWD(b"[") 

105KEYWORD_ARRAY_END = KWD(b"]") 

106KEYWORD_DICT_BEGIN = KWD(b"<<") 

107KEYWORD_DICT_END = KWD(b">>") 

108 

109 

110def literal_name(x: Any) -> str: 

111 if isinstance(x, PSLiteral): 

112 if isinstance(x.name, str): 

113 return x.name 

114 try: 

115 return str(x.name, "utf-8") 

116 except UnicodeDecodeError: 

117 return str(x.name) 

118 else: 

119 if settings.STRICT: 

120 raise PSTypeError(f"Literal required: {x!r}") 

121 return str(x) 

122 

123 

124def keyword_name(x: Any) -> Any: 

125 if not isinstance(x, PSKeyword): 

126 if settings.STRICT: 

127 raise PSTypeError(f"Keyword required: {x!r}") 

128 else: 

129 name = x 

130 else: 

131 name = str(x.name, "utf-8", "ignore") 

132 return name 

133 

134 

135EOL = re.compile(rb"[\r\n]") 

136SPC = re.compile(rb"\s") 

137NONSPC = re.compile(rb"\S") 

138HEX = re.compile(rb"[0-9a-fA-F]") 

139END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]") 

140END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]") 

141HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.") 

142END_NUMBER = re.compile(rb"[^0-9]") 

143END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]") 

144END_STRING = re.compile(rb"[()\134]") 

145OCT_STRING = re.compile(rb"[0-7]") 

146ESC_STRING = { 

147 b"b": 8, 

148 b"t": 9, 

149 b"n": 10, 

150 b"f": 12, 

151 b"r": 13, 

152 b"(": 40, 

153 b")": 41, 

154 b"\\": 92, 

155} 

156 

157 

158PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes] 

159 

160 

161class PSBaseParser: 

162 """Most basic PostScript parser that performs only tokenization.""" 

163 

164 BUFSIZ = 4096 

165 

166 def __init__(self, fp: BinaryIO) -> None: 

167 self.fp = fp 

168 self.eof = False 

169 self.seek(0) 

170 

171 def __repr__(self) -> str: 

172 return f"<{self.__class__.__name__}: {self.fp!r}, bufpos={self.bufpos}>" 

173 

174 def flush(self) -> None: 

175 pass 

176 

177 def seek(self, pos: int) -> None: 

178 """Seeks the parser to the given position.""" 

179 log.debug("seek: %r", pos) 

180 self.fp.seek(pos) 

181 # reset the status for nextline() 

182 self.bufpos = pos 

183 self.buf = b"" 

184 self.charpos = 0 

185 # reset the status for nexttoken() 

186 self._parse1 = self._parse_main 

187 self._curtoken = b"" 

188 self._curtokenpos = 0 

189 self._tokens: list[tuple[int, PSBaseParserToken]] = [] 

190 self.eof = False 

191 

192 def fillbuf(self) -> bool: 

193 if self.charpos < len(self.buf): 

194 return False 

195 # fetch next chunk. 

196 self.bufpos = self.fp.tell() 

197 self.buf = self.fp.read(self.BUFSIZ) 

198 if not self.buf: 

199 raise PSEOF("Unexpected EOF") 

200 self.charpos = 0 

201 return False 

202 

203 def nextline(self) -> tuple[int, bytes]: 

204 """Fetches a next line that ends either with \\r or \\n.""" 

205 linebuf = b"" 

206 linepos = self.bufpos + self.charpos 

207 eol = False 

208 while 1: 

209 self.fillbuf() 

210 if eol: 

211 c = self.buf[self.charpos : self.charpos + 1] 

212 # handle b'\r\n' 

213 if c == b"\n": 

214 linebuf += c 

215 self.charpos += 1 

216 break 

217 m = EOL.search(self.buf, self.charpos) 

218 if m: 

219 linebuf += self.buf[self.charpos : m.end(0)] 

220 self.charpos = m.end(0) 

221 if linebuf[-1:] == b"\r": 

222 eol = True 

223 else: 

224 break 

225 else: 

226 linebuf += self.buf[self.charpos :] 

227 self.charpos = len(self.buf) 

228 log.debug("nextline: %r, %r", linepos, linebuf) 

229 

230 return (linepos, linebuf) 

231 

232 def revreadlines(self) -> Iterator[bytes]: 

233 """Fetches a next line backward. 

234 

235 This is used to locate the trailers at the end of a file. 

236 """ 

237 self.fp.seek(0, io.SEEK_END) 

238 pos = self.fp.tell() 

239 buf = b"" 

240 while pos > 0: 

241 prevpos = pos 

242 pos = max(0, pos - self.BUFSIZ) 

243 self.fp.seek(pos) 

244 s = self.fp.read(prevpos - pos) 

245 if not s: 

246 break 

247 while 1: 

248 n = max(s.rfind(b"\r"), s.rfind(b"\n")) 

249 if n == -1: 

250 buf = s + buf 

251 break 

252 yield s[n:] + buf 

253 s = s[:n] 

254 buf = b"" 

255 

256 def _parse_main(self, s: bytes, i: int) -> int: 

257 m = NONSPC.search(s, i) 

258 if not m: 

259 return len(s) 

260 j = m.start(0) 

261 c = s[j : j + 1] 

262 self._curtokenpos = self.bufpos + j 

263 if c == b"%": 

264 self._curtoken = b"%" 

265 self._parse1 = self._parse_comment 

266 return j + 1 

267 elif c == b"/": 

268 self._curtoken = b"" 

269 self._parse1 = self._parse_literal 

270 return j + 1 

271 elif c in b"-+" or c.isdigit(): 

272 self._curtoken = c 

273 self._parse1 = self._parse_number 

274 return j + 1 

275 elif c == b".": 

276 self._curtoken = c 

277 self._parse1 = self._parse_float 

278 return j + 1 

279 elif c.isalpha(): 

280 self._curtoken = c 

281 self._parse1 = self._parse_keyword 

282 return j + 1 

283 elif c == b"(": 

284 self._curtoken = b"" 

285 self.paren = 1 

286 self._parse1 = self._parse_string 

287 return j + 1 

288 elif c == b"<": 

289 self._curtoken = b"" 

290 self._parse1 = self._parse_wopen 

291 return j + 1 

292 elif c == b">": 

293 self._curtoken = b"" 

294 self._parse1 = self._parse_wclose 

295 return j + 1 

296 elif c == b"\x00": 

297 return j + 1 

298 else: 

299 self._add_token(KWD(c)) 

300 return j + 1 

301 

302 def _add_token(self, obj: PSBaseParserToken) -> None: 

303 self._tokens.append((self._curtokenpos, obj)) 

304 

305 def _parse_comment(self, s: bytes, i: int) -> int: 

306 m = EOL.search(s, i) 

307 if not m: 

308 self._curtoken += s[i:] 

309 return len(s) 

310 j = m.start(0) 

311 self._curtoken += s[i:j] 

312 self._parse1 = self._parse_main 

313 # We ignore comments. 

314 # self._tokens.append(self._curtoken) 

315 return j 

316 

317 def _parse_literal(self, s: bytes, i: int) -> int: 

318 m = END_LITERAL.search(s, i) 

319 if not m: 

320 self._curtoken += s[i:] 

321 return len(s) 

322 j = m.start(0) 

323 self._curtoken += s[i:j] 

324 c = s[j : j + 1] 

325 if c == b"#": 

326 self.hex = b"" 

327 self._parse1 = self._parse_literal_hex 

328 return j + 1 

329 try: 

330 name: str | bytes = str(self._curtoken, "utf-8") 

331 except Exception: 

332 name = self._curtoken 

333 self._add_token(LIT(name)) 

334 self._parse1 = self._parse_main 

335 return j 

336 

337 def _parse_literal_hex(self, s: bytes, i: int) -> int: 

338 c = s[i : i + 1] 

339 if HEX.match(c) and len(self.hex) < 2: 

340 self.hex += c 

341 return i + 1 

342 if self.hex: 

343 self._curtoken += bytes((int(self.hex, 16),)) 

344 self._parse1 = self._parse_literal 

345 return i 

346 

347 def _parse_number(self, s: bytes, i: int) -> int: 

348 m = END_NUMBER.search(s, i) 

349 if not m: 

350 self._curtoken += s[i:] 

351 return len(s) 

352 j = m.start(0) 

353 self._curtoken += s[i:j] 

354 c = s[j : j + 1] 

355 if c == b".": 

356 self._curtoken += c 

357 self._parse1 = self._parse_float 

358 return j + 1 

359 with contextlib.suppress(ValueError): 

360 self._add_token(int(self._curtoken)) 

361 self._parse1 = self._parse_main 

362 return j 

363 

364 def _parse_float(self, s: bytes, i: int) -> int: 

365 m = END_NUMBER.search(s, i) 

366 if not m: 

367 self._curtoken += s[i:] 

368 return len(s) 

369 j = m.start(0) 

370 self._curtoken += s[i:j] 

371 with contextlib.suppress(ValueError): 

372 self._add_token(float(self._curtoken)) 

373 self._parse1 = self._parse_main 

374 return j 

375 

376 def _parse_keyword(self, s: bytes, i: int) -> int: 

377 m = END_KEYWORD.search(s, i) 

378 if m: 

379 j = m.start(0) 

380 self._curtoken += s[i:j] 

381 else: 

382 self._curtoken += s[i:] 

383 return len(s) 

384 if self._curtoken == b"true": 

385 token: bool | PSKeyword = True 

386 elif self._curtoken == b"false": 

387 token = False 

388 else: 

389 token = KWD(self._curtoken) 

390 self._add_token(token) 

391 self._parse1 = self._parse_main 

392 return j 

393 

394 def _parse_string(self, s: bytes, i: int) -> int: 

395 m = END_STRING.search(s, i) 

396 if not m: 

397 self._curtoken += s[i:] 

398 return len(s) 

399 j = m.start(0) 

400 self._curtoken += s[i:j] 

401 c = s[j : j + 1] 

402 if c == b"\\": 

403 self.oct = b"" 

404 self._parse1 = self._parse_string_1 

405 return j + 1 

406 if c == b"(": 

407 self.paren += 1 

408 self._curtoken += c 

409 return j + 1 

410 if c == b")": 

411 self.paren -= 1 

412 if self.paren: 

413 # WTF, they said balanced parens need no special treatment. 

414 self._curtoken += c 

415 return j + 1 

416 self._add_token(self._curtoken) 

417 self._parse1 = self._parse_main 

418 return j + 1 

419 

420 def _parse_string_1(self, s: bytes, i: int) -> int: 

421 """Parse literal strings 

422 

423 PDF Reference 3.2.3 

424 """ 

425 c = s[i : i + 1] 

426 if OCT_STRING.match(c) and len(self.oct) < 3: 

427 self.oct += c 

428 return i + 1 

429 

430 elif self.oct: 

431 chrcode = int(self.oct, 8) 

432 assert chrcode < 256, f"Invalid octal {self.oct!r} ({chrcode})" 

433 self._curtoken += bytes((chrcode,)) 

434 self._parse1 = self._parse_string 

435 return i 

436 

437 elif c in ESC_STRING: 

438 self._curtoken += bytes((ESC_STRING[c],)) 

439 

440 elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n": 

441 # If current and next character is \r\n skip both because enters 

442 # after a \ are ignored 

443 i += 1 

444 

445 # default action 

446 self._parse1 = self._parse_string 

447 return i + 1 

448 

449 def _parse_wopen(self, s: bytes, i: int) -> int: 

450 c = s[i : i + 1] 

451 if c == b"<": 

452 self._add_token(KEYWORD_DICT_BEGIN) 

453 self._parse1 = self._parse_main 

454 i += 1 

455 else: 

456 self._parse1 = self._parse_hexstring 

457 return i 

458 

459 def _parse_wclose(self, s: bytes, i: int) -> int: 

460 c = s[i : i + 1] 

461 if c == b">": 

462 self._add_token(KEYWORD_DICT_END) 

463 i += 1 

464 self._parse1 = self._parse_main 

465 return i 

466 

467 def _parse_hexstring(self, s: bytes, i: int) -> int: 

468 m = END_HEX_STRING.search(s, i) 

469 if not m: 

470 self._curtoken += s[i:] 

471 return len(s) 

472 j = m.start(0) 

473 self._curtoken += s[i:j] 

474 token = HEX_PAIR.sub( 

475 lambda m: bytes((int(m.group(0), 16),)), 

476 SPC.sub(b"", self._curtoken), 

477 ) 

478 self._add_token(token) 

479 self._parse1 = self._parse_main 

480 return j 

481 

482 def nexttoken(self) -> tuple[int, PSBaseParserToken]: 

483 if self.eof: 

484 # It's not really unexpected, come on now... 

485 raise PSEOF("Unexpected EOF") 

486 while not self._tokens: 

487 try: 

488 changed_stream = self.fillbuf() 

489 if changed_stream and self._curtoken: 

490 # Fixes #1157: if the stream is changed in the middle of a token, 

491 # try to parse it by tacking on whitespace. 

492 self._parse1(b"\n", 0) 

493 else: 

494 self.charpos = self._parse1(self.buf, self.charpos) 

495 except PSEOF: 

496 # If we hit EOF in the middle of a token, try to parse 

497 # it by tacking on whitespace, and delay raising PSEOF 

498 # until next time around 

499 self.charpos = self._parse1(b"\n", 0) 

500 self.eof = True 

501 # Oh, so there wasn't actually a token there? OK. 

502 if not self._tokens: 

503 raise 

504 token = self._tokens.pop(0) 

505 log.debug("nexttoken: %r", token) 

506 return token 

507 

508 

509# Stack slots may by occupied by any of: 

510# * the name of a literal 

511# * the PSBaseParserToken types 

512# * list (via KEYWORD_ARRAY) 

513# * dict (via KEYWORD_DICT) 

514# * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT 

515ExtraT = TypeVar("ExtraT") 

516PSStackType = Union[ 

517 str, float, bool, PSLiteral, bytes, list[Any], dict[Any, Any], ExtraT 

518] 

519PSStackEntry = tuple[int, PSStackType[ExtraT]] 

520 

521 

522class PSStackParser(PSBaseParser, Generic[ExtraT]): 

523 def __init__(self, fp: BinaryIO) -> None: 

524 PSBaseParser.__init__(self, fp) 

525 self.reset() 

526 

527 def reset(self) -> None: 

528 self.context: list[tuple[int, str | None, list[PSStackEntry[ExtraT]]]] = [] 

529 self.curtype: str | None = None 

530 self.curstack: list[PSStackEntry[ExtraT]] = [] 

531 self.results: list[PSStackEntry[ExtraT]] = [] 

532 

533 def seek(self, pos: int) -> None: 

534 PSBaseParser.seek(self, pos) 

535 self.reset() 

536 

537 def push(self, *objs: PSStackEntry[ExtraT]) -> None: 

538 self.curstack.extend(objs) 

539 

540 def pop(self, n: int) -> list[PSStackEntry[ExtraT]]: 

541 objs = self.curstack[-n:] 

542 self.curstack[-n:] = [] 

543 return objs 

544 

545 def popall(self) -> list[PSStackEntry[ExtraT]]: 

546 objs = self.curstack 

547 self.curstack = [] 

548 return objs 

549 

550 def add_results(self, *objs: PSStackEntry[ExtraT]) -> None: 

551 try: 

552 log.debug("add_results: %r", objs) 

553 except Exception: 

554 log.debug("add_results: (unprintable object)") 

555 self.results.extend(objs) 

556 

557 def start_type(self, pos: int, type: str) -> None: 

558 self.context.append((pos, self.curtype, self.curstack)) 

559 (self.curtype, self.curstack) = (type, []) 

560 log.debug("start_type: pos=%r, type=%r", pos, type) 

561 

562 def end_type(self, type: str) -> tuple[int, list[PSStackType[ExtraT]]]: 

563 if self.curtype != type: 

564 raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}") 

565 objs = [obj for (_, obj) in self.curstack] 

566 (pos, self.curtype, self.curstack) = self.context.pop() 

567 log.debug("end_type: pos=%r, type=%r, objs=%r", pos, type, objs) 

568 return (pos, objs) 

569 

570 def do_keyword(self, pos: int, token: PSKeyword) -> None: 

571 pass 

572 

573 def nextobject(self) -> PSStackEntry[ExtraT]: 

574 """Yields a list of objects. 

575 

576 Arrays and dictionaries are represented as Python lists and 

577 dictionaries. 

578 

579 :return: keywords, literals, strings, numbers, arrays and dictionaries. 

580 """ 

581 while not self.results: 

582 (pos, token) = self.nexttoken() 

583 if isinstance(token, (int, float, bool, str, bytes, PSLiteral)): 

584 # normal token 

585 self.push((pos, token)) 

586 elif token == KEYWORD_ARRAY_BEGIN: 

587 # begin array 

588 self.start_type(pos, "a") 

589 elif token == KEYWORD_ARRAY_END: 

590 # end array 

591 try: 

592 self.push(self.end_type("a")) 

593 except PSTypeError: 

594 if settings.STRICT: 

595 raise 

596 elif token == KEYWORD_DICT_BEGIN: 

597 # begin dictionary 

598 self.start_type(pos, "d") 

599 elif token == KEYWORD_DICT_END: 

600 # end dictionary 

601 try: 

602 (pos, objs) = self.end_type("d") 

603 if len(objs) % 2 != 0: 

604 error_msg = f"Invalid dictionary construct: {objs!r}" 

605 raise PSSyntaxError(error_msg) 

606 d = { 

607 literal_name(k): v 

608 for (k, v) in choplist(2, objs) 

609 if v is not None 

610 } 

611 self.push((pos, d)) 

612 except PSTypeError: 

613 if settings.STRICT: 

614 raise 

615 elif token == KEYWORD_PROC_BEGIN: 

616 # begin proc 

617 self.start_type(pos, "p") 

618 elif token == KEYWORD_PROC_END: 

619 # end proc 

620 try: 

621 self.push(self.end_type("p")) 

622 except PSTypeError: 

623 if settings.STRICT: 

624 raise 

625 elif isinstance(token, PSKeyword): 

626 log.debug( 

627 "do_keyword: pos=%r, token=%r, stack=%r", 

628 pos, 

629 token, 

630 self.curstack, 

631 ) 

632 self.do_keyword(pos, token) 

633 else: 

634 log.error( 

635 "unknown token: pos=%r, token=%r, stack=%r", 

636 pos, 

637 token, 

638 self.curstack, 

639 ) 

640 self.do_keyword(pos, token) 

641 raise PSException 

642 if self.context: 

643 continue 

644 else: 

645 self.flush() 

646 obj = self.results.pop(0) 

647 try: 

648 log.debug("nextobject: %r", obj) 

649 except Exception: 

650 log.debug("nextobject: (unprintable object)") 

651 return obj