Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/psparser.py: 94%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2import contextlib
3import io
4import logging
5import re
6from collections.abc import Iterator
7from typing import (
8 Any,
9 BinaryIO,
10 Generic,
11 TypeVar,
12 Union,
13)
15from pdfminer import psexceptions, settings
16from pdfminer.utils import choplist
18log = logging.getLogger(__name__)
21# Adding aliases for these exceptions for backwards compatibility
22PSException = psexceptions.PSException
23PSEOF = psexceptions.PSEOF
24PSSyntaxError = psexceptions.PSSyntaxError
25PSTypeError = psexceptions.PSTypeError
26PSValueError = psexceptions.PSValueError
29class PSObject:
30 """Base class for all PS or PDF-related data types."""
33class PSLiteral(PSObject):
34 """A class that represents a PostScript literal.
36 Postscript literals are used as identifiers, such as
37 variable names, property names and dictionary keys.
38 Literals are case sensitive and denoted by a preceding
39 slash sign (e.g. "/Name")
41 Note: Do not create an instance of PSLiteral directly.
42 Always use PSLiteralTable.intern().
43 """
45 NameType = Union[str, bytes]
47 def __init__(self, name: NameType) -> None:
48 self.name = name
50 def __repr__(self) -> str:
51 name = self.name
52 return f"/{name!r}"
55class PSKeyword(PSObject):
56 """A class that represents a PostScript keyword.
58 PostScript keywords are a dozen of predefined words.
59 Commands and directives in PostScript are expressed by keywords.
60 They are also used to denote the content boundaries.
62 Note: Do not create an instance of PSKeyword directly.
63 Always use PSKeywordTable.intern().
64 """
66 def __init__(self, name: bytes) -> None:
67 self.name = name
69 def __repr__(self) -> str:
70 name = self.name
71 return f"/{name!r}"
74_SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword)
77class PSSymbolTable(Generic[_SymbolT]):
78 """A utility class for storing PSLiteral/PSKeyword objects.
80 Interned objects can be checked its identity with "is" operator.
81 """
83 def __init__(self, klass: type[_SymbolT]) -> None:
84 self.dict: dict[PSLiteral.NameType, _SymbolT] = {}
85 self.klass: type[_SymbolT] = klass
87 def intern(self, name: PSLiteral.NameType) -> _SymbolT:
88 if name in self.dict:
89 lit = self.dict[name]
90 else:
91 # Type confusion issue: PSKeyword always takes bytes as name
92 # PSLiteral uses either str or bytes
93 lit = self.klass(name) # type: ignore[arg-type]
94 self.dict[name] = lit
95 return lit
98PSLiteralTable = PSSymbolTable(PSLiteral)
99PSKeywordTable = PSSymbolTable(PSKeyword)
100LIT = PSLiteralTable.intern
101KWD = PSKeywordTable.intern
102KEYWORD_PROC_BEGIN = KWD(b"{")
103KEYWORD_PROC_END = KWD(b"}")
104KEYWORD_ARRAY_BEGIN = KWD(b"[")
105KEYWORD_ARRAY_END = KWD(b"]")
106KEYWORD_DICT_BEGIN = KWD(b"<<")
107KEYWORD_DICT_END = KWD(b">>")
110def literal_name(x: Any) -> str:
111 if isinstance(x, PSLiteral):
112 if isinstance(x.name, str):
113 return x.name
114 try:
115 return str(x.name, "utf-8")
116 except UnicodeDecodeError:
117 return str(x.name)
118 else:
119 if settings.STRICT:
120 raise PSTypeError(f"Literal required: {x!r}")
121 return str(x)
124def keyword_name(x: Any) -> Any:
125 if not isinstance(x, PSKeyword):
126 if settings.STRICT:
127 raise PSTypeError(f"Keyword required: {x!r}")
128 else:
129 name = x
130 else:
131 name = str(x.name, "utf-8", "ignore")
132 return name
135EOL = re.compile(rb"[\r\n]")
136SPC = re.compile(rb"\s")
137NONSPC = re.compile(rb"\S")
138HEX = re.compile(rb"[0-9a-fA-F]")
139END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]")
140END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]")
141HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.")
142END_NUMBER = re.compile(rb"[^0-9]")
143END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]")
144END_STRING = re.compile(rb"[()\134]")
145OCT_STRING = re.compile(rb"[0-7]")
146ESC_STRING = {
147 b"b": 8,
148 b"t": 9,
149 b"n": 10,
150 b"f": 12,
151 b"r": 13,
152 b"(": 40,
153 b")": 41,
154 b"\\": 92,
155}
158PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes]
161class PSBaseParser:
162 """Most basic PostScript parser that performs only tokenization."""
164 BUFSIZ = 4096
166 def __init__(self, fp: BinaryIO) -> None:
167 self.fp = fp
168 self.eof = False
169 self.seek(0)
171 def __repr__(self) -> str:
172 return f"<{self.__class__.__name__}: {self.fp!r}, bufpos={self.bufpos}>"
174 def flush(self) -> None:
175 pass
177 def close(self) -> None:
178 self.flush()
180 def tell(self) -> int:
181 return self.bufpos + self.charpos
183 def poll(self, pos: int | None = None, n: int = 80) -> None:
184 pos0 = self.fp.tell()
185 if not pos:
186 pos = self.bufpos + self.charpos
187 self.fp.seek(pos)
188 log.debug(f"poll({pos}): {self.fp.read(n)!r}")
189 self.fp.seek(pos0)
191 def seek(self, pos: int) -> None:
192 """Seeks the parser to the given position."""
193 log.debug(f"seek: {pos!r}")
194 self.fp.seek(pos)
195 # reset the status for nextline()
196 self.bufpos = pos
197 self.buf = b""
198 self.charpos = 0
199 # reset the status for nexttoken()
200 self._parse1 = self._parse_main
201 self._curtoken = b""
202 self._curtokenpos = 0
203 self._tokens: list[tuple[int, PSBaseParserToken]] = []
204 self.eof = False
206 def fillbuf(self) -> bool:
207 if self.charpos < len(self.buf):
208 return False
209 # fetch next chunk.
210 self.bufpos = self.fp.tell()
211 self.buf = self.fp.read(self.BUFSIZ)
212 if not self.buf:
213 raise PSEOF("Unexpected EOF")
214 self.charpos = 0
215 return False
217 def nextline(self) -> tuple[int, bytes]:
218 """Fetches a next line that ends either with \\r or \\n."""
219 linebuf = b""
220 linepos = self.bufpos + self.charpos
221 eol = False
222 while 1:
223 self.fillbuf()
224 if eol:
225 c = self.buf[self.charpos : self.charpos + 1]
226 # handle b'\r\n'
227 if c == b"\n":
228 linebuf += c
229 self.charpos += 1
230 break
231 m = EOL.search(self.buf, self.charpos)
232 if m:
233 linebuf += self.buf[self.charpos : m.end(0)]
234 self.charpos = m.end(0)
235 if linebuf[-1:] == b"\r":
236 eol = True
237 else:
238 break
239 else:
240 linebuf += self.buf[self.charpos :]
241 self.charpos = len(self.buf)
242 log.debug(f"nextline: {linepos!r}, {linebuf!r}")
244 return (linepos, linebuf)
246 def revreadlines(self) -> Iterator[bytes]:
247 """Fetches a next line backward.
249 This is used to locate the trailers at the end of a file.
250 """
251 self.fp.seek(0, io.SEEK_END)
252 pos = self.fp.tell()
253 buf = b""
254 while pos > 0:
255 prevpos = pos
256 pos = max(0, pos - self.BUFSIZ)
257 self.fp.seek(pos)
258 s = self.fp.read(prevpos - pos)
259 if not s:
260 break
261 while 1:
262 n = max(s.rfind(b"\r"), s.rfind(b"\n"))
263 if n == -1:
264 buf = s + buf
265 break
266 yield s[n:] + buf
267 s = s[:n]
268 buf = b""
270 def _parse_main(self, s: bytes, i: int) -> int:
271 m = NONSPC.search(s, i)
272 if not m:
273 return len(s)
274 j = m.start(0)
275 c = s[j : j + 1]
276 self._curtokenpos = self.bufpos + j
277 if c == b"%":
278 self._curtoken = b"%"
279 self._parse1 = self._parse_comment
280 return j + 1
281 elif c == b"/":
282 self._curtoken = b""
283 self._parse1 = self._parse_literal
284 return j + 1
285 elif c in b"-+" or c.isdigit():
286 self._curtoken = c
287 self._parse1 = self._parse_number
288 return j + 1
289 elif c == b".":
290 self._curtoken = c
291 self._parse1 = self._parse_float
292 return j + 1
293 elif c.isalpha():
294 self._curtoken = c
295 self._parse1 = self._parse_keyword
296 return j + 1
297 elif c == b"(":
298 self._curtoken = b""
299 self.paren = 1
300 self._parse1 = self._parse_string
301 return j + 1
302 elif c == b"<":
303 self._curtoken = b""
304 self._parse1 = self._parse_wopen
305 return j + 1
306 elif c == b">":
307 self._curtoken = b""
308 self._parse1 = self._parse_wclose
309 return j + 1
310 elif c == b"\x00":
311 return j + 1
312 else:
313 self._add_token(KWD(c))
314 return j + 1
316 def _add_token(self, obj: PSBaseParserToken) -> None:
317 self._tokens.append((self._curtokenpos, obj))
319 def _parse_comment(self, s: bytes, i: int) -> int:
320 m = EOL.search(s, i)
321 if not m:
322 self._curtoken += s[i:]
323 return len(s)
324 j = m.start(0)
325 self._curtoken += s[i:j]
326 self._parse1 = self._parse_main
327 # We ignore comments.
328 # self._tokens.append(self._curtoken)
329 return j
331 def _parse_literal(self, s: bytes, i: int) -> int:
332 m = END_LITERAL.search(s, i)
333 if not m:
334 self._curtoken += s[i:]
335 return len(s)
336 j = m.start(0)
337 self._curtoken += s[i:j]
338 c = s[j : j + 1]
339 if c == b"#":
340 self.hex = b""
341 self._parse1 = self._parse_literal_hex
342 return j + 1
343 try:
344 name: str | bytes = str(self._curtoken, "utf-8")
345 except Exception:
346 name = self._curtoken
347 self._add_token(LIT(name))
348 self._parse1 = self._parse_main
349 return j
351 def _parse_literal_hex(self, s: bytes, i: int) -> int:
352 c = s[i : i + 1]
353 if HEX.match(c) and len(self.hex) < 2:
354 self.hex += c
355 return i + 1
356 if self.hex:
357 self._curtoken += bytes((int(self.hex, 16),))
358 self._parse1 = self._parse_literal
359 return i
361 def _parse_number(self, s: bytes, i: int) -> int:
362 m = END_NUMBER.search(s, i)
363 if not m:
364 self._curtoken += s[i:]
365 return len(s)
366 j = m.start(0)
367 self._curtoken += s[i:j]
368 c = s[j : j + 1]
369 if c == b".":
370 self._curtoken += c
371 self._parse1 = self._parse_float
372 return j + 1
373 with contextlib.suppress(ValueError):
374 self._add_token(int(self._curtoken))
375 self._parse1 = self._parse_main
376 return j
378 def _parse_float(self, s: bytes, i: int) -> int:
379 m = END_NUMBER.search(s, i)
380 if not m:
381 self._curtoken += s[i:]
382 return len(s)
383 j = m.start(0)
384 self._curtoken += s[i:j]
385 with contextlib.suppress(ValueError):
386 self._add_token(float(self._curtoken))
387 self._parse1 = self._parse_main
388 return j
390 def _parse_keyword(self, s: bytes, i: int) -> int:
391 m = END_KEYWORD.search(s, i)
392 if m:
393 j = m.start(0)
394 self._curtoken += s[i:j]
395 else:
396 self._curtoken += s[i:]
397 return len(s)
398 if self._curtoken == b"true":
399 token: bool | PSKeyword = True
400 elif self._curtoken == b"false":
401 token = False
402 else:
403 token = KWD(self._curtoken)
404 self._add_token(token)
405 self._parse1 = self._parse_main
406 return j
408 def _parse_string(self, s: bytes, i: int) -> int:
409 m = END_STRING.search(s, i)
410 if not m:
411 self._curtoken += s[i:]
412 return len(s)
413 j = m.start(0)
414 self._curtoken += s[i:j]
415 c = s[j : j + 1]
416 if c == b"\\":
417 self.oct = b""
418 self._parse1 = self._parse_string_1
419 return j + 1
420 if c == b"(":
421 self.paren += 1
422 self._curtoken += c
423 return j + 1
424 if c == b")":
425 self.paren -= 1
426 if self.paren:
427 # WTF, they said balanced parens need no special treatment.
428 self._curtoken += c
429 return j + 1
430 self._add_token(self._curtoken)
431 self._parse1 = self._parse_main
432 return j + 1
434 def _parse_string_1(self, s: bytes, i: int) -> int:
435 """Parse literal strings
437 PDF Reference 3.2.3
438 """
439 c = s[i : i + 1]
440 if OCT_STRING.match(c) and len(self.oct) < 3:
441 self.oct += c
442 return i + 1
444 elif self.oct:
445 chrcode = int(self.oct, 8)
446 assert chrcode < 256, f"Invalid octal {self.oct!r} ({chrcode})"
447 self._curtoken += bytes((chrcode,))
448 self._parse1 = self._parse_string
449 return i
451 elif c in ESC_STRING:
452 self._curtoken += bytes((ESC_STRING[c],))
454 elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n":
455 # If current and next character is \r\n skip both because enters
456 # after a \ are ignored
457 i += 1
459 # default action
460 self._parse1 = self._parse_string
461 return i + 1
463 def _parse_wopen(self, s: bytes, i: int) -> int:
464 c = s[i : i + 1]
465 if c == b"<":
466 self._add_token(KEYWORD_DICT_BEGIN)
467 self._parse1 = self._parse_main
468 i += 1
469 else:
470 self._parse1 = self._parse_hexstring
471 return i
473 def _parse_wclose(self, s: bytes, i: int) -> int:
474 c = s[i : i + 1]
475 if c == b">":
476 self._add_token(KEYWORD_DICT_END)
477 i += 1
478 self._parse1 = self._parse_main
479 return i
481 def _parse_hexstring(self, s: bytes, i: int) -> int:
482 m = END_HEX_STRING.search(s, i)
483 if not m:
484 self._curtoken += s[i:]
485 return len(s)
486 j = m.start(0)
487 self._curtoken += s[i:j]
488 token = HEX_PAIR.sub(
489 lambda m: bytes((int(m.group(0), 16),)),
490 SPC.sub(b"", self._curtoken),
491 )
492 self._add_token(token)
493 self._parse1 = self._parse_main
494 return j
496 def nexttoken(self) -> tuple[int, PSBaseParserToken]:
497 if self.eof:
498 # It's not really unexpected, come on now...
499 raise PSEOF("Unexpected EOF")
500 while not self._tokens:
501 try:
502 changed_stream = self.fillbuf()
503 if changed_stream and self._curtoken:
504 # Fixes #1157: if the stream is changed in the middle of a token,
505 # try to parse it by tacking on whitespace.
506 self._parse1(b"\n", 0)
507 else:
508 self.charpos = self._parse1(self.buf, self.charpos)
509 except PSEOF:
510 # If we hit EOF in the middle of a token, try to parse
511 # it by tacking on whitespace, and delay raising PSEOF
512 # until next time around
513 self.charpos = self._parse1(b"\n", 0)
514 self.eof = True
515 # Oh, so there wasn't actually a token there? OK.
516 if not self._tokens:
517 raise
518 token = self._tokens.pop(0)
519 log.debug(f"nexttoken: {token!r}")
520 return token
523# Stack slots may by occupied by any of:
524# * the name of a literal
525# * the PSBaseParserToken types
526# * list (via KEYWORD_ARRAY)
527# * dict (via KEYWORD_DICT)
528# * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT
529ExtraT = TypeVar("ExtraT")
530PSStackType = Union[
531 str, float, bool, PSLiteral, bytes, list[Any], dict[Any, Any], ExtraT
532]
533PSStackEntry = tuple[int, PSStackType[ExtraT]]
536class PSStackParser(PSBaseParser, Generic[ExtraT]):
537 def __init__(self, fp: BinaryIO) -> None:
538 PSBaseParser.__init__(self, fp)
539 self.reset()
541 def reset(self) -> None:
542 self.context: list[tuple[int, str | None, list[PSStackEntry[ExtraT]]]] = []
543 self.curtype: str | None = None
544 self.curstack: list[PSStackEntry[ExtraT]] = []
545 self.results: list[PSStackEntry[ExtraT]] = []
547 def seek(self, pos: int) -> None:
548 PSBaseParser.seek(self, pos)
549 self.reset()
551 def push(self, *objs: PSStackEntry[ExtraT]) -> None:
552 self.curstack.extend(objs)
554 def pop(self, n: int) -> list[PSStackEntry[ExtraT]]:
555 objs = self.curstack[-n:]
556 self.curstack[-n:] = []
557 return objs
559 def popall(self) -> list[PSStackEntry[ExtraT]]:
560 objs = self.curstack
561 self.curstack = []
562 return objs
564 def add_results(self, *objs: PSStackEntry[ExtraT]) -> None:
565 try:
566 log.debug(f"add_results: {objs!r}")
567 except Exception:
568 log.debug("add_results: (unprintable object)")
569 self.results.extend(objs)
571 def start_type(self, pos: int, type: str) -> None:
572 self.context.append((pos, self.curtype, self.curstack))
573 (self.curtype, self.curstack) = (type, [])
574 log.debug(f"start_type: pos={pos!r}, type={type!r}")
576 def end_type(self, type: str) -> tuple[int, list[PSStackType[ExtraT]]]:
577 if self.curtype != type:
578 raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}")
579 objs = [obj for (_, obj) in self.curstack]
580 (pos, self.curtype, self.curstack) = self.context.pop()
581 log.debug(f"end_type: pos={pos!r}, type={type!r}, objs={objs!r}")
582 return (pos, objs)
584 def do_keyword(self, pos: int, token: PSKeyword) -> None:
585 pass
587 def nextobject(self) -> PSStackEntry[ExtraT]:
588 """Yields a list of objects.
590 Arrays and dictionaries are represented as Python lists and
591 dictionaries.
593 :return: keywords, literals, strings, numbers, arrays and dictionaries.
594 """
595 while not self.results:
596 (pos, token) = self.nexttoken()
597 if isinstance(token, (int, float, bool, str, bytes, PSLiteral)):
598 # normal token
599 self.push((pos, token))
600 elif token == KEYWORD_ARRAY_BEGIN:
601 # begin array
602 self.start_type(pos, "a")
603 elif token == KEYWORD_ARRAY_END:
604 # end array
605 try:
606 self.push(self.end_type("a"))
607 except PSTypeError:
608 if settings.STRICT:
609 raise
610 elif token == KEYWORD_DICT_BEGIN:
611 # begin dictionary
612 self.start_type(pos, "d")
613 elif token == KEYWORD_DICT_END:
614 # end dictionary
615 try:
616 (pos, objs) = self.end_type("d")
617 if len(objs) % 2 != 0:
618 error_msg = f"Invalid dictionary construct: {objs!r}"
619 raise PSSyntaxError(error_msg)
620 d = {
621 literal_name(k): v
622 for (k, v) in choplist(2, objs)
623 if v is not None
624 }
625 self.push((pos, d))
626 except PSTypeError:
627 if settings.STRICT:
628 raise
629 elif token == KEYWORD_PROC_BEGIN:
630 # begin proc
631 self.start_type(pos, "p")
632 elif token == KEYWORD_PROC_END:
633 # end proc
634 try:
635 self.push(self.end_type("p"))
636 except PSTypeError:
637 if settings.STRICT:
638 raise
639 elif isinstance(token, PSKeyword):
640 log.debug(
641 f"do_keyword: pos={pos!r}, token={token!r}, stack={self.curstack!r}"
642 )
643 self.do_keyword(pos, token)
644 else:
645 log.error(
646 f"unknown token: pos={pos!r}, "
647 f"token={token!r}, stack={self.curstack!r}"
648 )
649 self.do_keyword(pos, token)
650 raise PSException
651 if self.context:
652 continue
653 else:
654 self.flush()
655 obj = self.results.pop(0)
656 try:
657 log.debug(f"nextobject: {obj!r}")
658 except Exception:
659 log.debug("nextobject: (unprintable object)")
660 return obj