Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/psparser.py: 94%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#!/usr/bin/env python3
2import io
3import logging
4import re
5from typing import (
6 Any,
7 BinaryIO,
8 Dict,
9 Generic,
10 Iterator,
11 List,
12 Optional,
13 Tuple,
14 Type,
15 TypeVar,
16 Union,
17)
19from pdfminer import psexceptions, settings
20from pdfminer.utils import choplist
22log = logging.getLogger(__name__)
25# Adding aliases for these exceptions for backwards compatibility
26PSException = psexceptions.PSException
27PSEOF = psexceptions.PSEOF
28PSSyntaxError = psexceptions.PSSyntaxError
29PSTypeError = psexceptions.PSTypeError
30PSValueError = psexceptions.PSValueError
33class PSObject:
34 """Base class for all PS or PDF-related data types."""
37class PSLiteral(PSObject):
38 """A class that represents a PostScript literal.
40 Postscript literals are used as identifiers, such as
41 variable names, property names and dictionary keys.
42 Literals are case sensitive and denoted by a preceding
43 slash sign (e.g. "/Name")
45 Note: Do not create an instance of PSLiteral directly.
46 Always use PSLiteralTable.intern().
47 """
49 NameType = Union[str, bytes]
51 def __init__(self, name: NameType) -> None:
52 self.name = name
54 def __repr__(self) -> str:
55 name = self.name
56 return "/%r" % name
59class PSKeyword(PSObject):
60 """A class that represents a PostScript keyword.
62 PostScript keywords are a dozen of predefined words.
63 Commands and directives in PostScript are expressed by keywords.
64 They are also used to denote the content boundaries.
66 Note: Do not create an instance of PSKeyword directly.
67 Always use PSKeywordTable.intern().
68 """
70 def __init__(self, name: bytes) -> None:
71 self.name = name
73 def __repr__(self) -> str:
74 name = self.name
75 return "/%r" % name
78_SymbolT = TypeVar("_SymbolT", PSLiteral, PSKeyword)
81class PSSymbolTable(Generic[_SymbolT]):
82 """A utility class for storing PSLiteral/PSKeyword objects.
84 Interned objects can be checked its identity with "is" operator.
85 """
87 def __init__(self, klass: Type[_SymbolT]) -> None:
88 self.dict: Dict[PSLiteral.NameType, _SymbolT] = {}
89 self.klass: Type[_SymbolT] = klass
91 def intern(self, name: PSLiteral.NameType) -> _SymbolT:
92 if name in self.dict:
93 lit = self.dict[name]
94 else:
95 # Type confusion issue: PSKeyword always takes bytes as name
96 # PSLiteral uses either str or bytes
97 lit = self.klass(name) # type: ignore[arg-type]
98 self.dict[name] = lit
99 return lit
102PSLiteralTable = PSSymbolTable(PSLiteral)
103PSKeywordTable = PSSymbolTable(PSKeyword)
104LIT = PSLiteralTable.intern
105KWD = PSKeywordTable.intern
106KEYWORD_PROC_BEGIN = KWD(b"{")
107KEYWORD_PROC_END = KWD(b"}")
108KEYWORD_ARRAY_BEGIN = KWD(b"[")
109KEYWORD_ARRAY_END = KWD(b"]")
110KEYWORD_DICT_BEGIN = KWD(b"<<")
111KEYWORD_DICT_END = KWD(b">>")
114def literal_name(x: Any) -> str:
115 if isinstance(x, PSLiteral):
116 if isinstance(x.name, str):
117 return x.name
118 try:
119 return str(x.name, "utf-8")
120 except UnicodeDecodeError:
121 return str(x.name)
122 else:
123 if settings.STRICT:
124 raise PSTypeError(f"Literal required: {x!r}")
125 return str(x)
128def keyword_name(x: Any) -> Any:
129 if not isinstance(x, PSKeyword):
130 if settings.STRICT:
131 raise PSTypeError("Keyword required: %r" % x)
132 else:
133 name = x
134 else:
135 name = str(x.name, "utf-8", "ignore")
136 return name
139EOL = re.compile(rb"[\r\n]")
140SPC = re.compile(rb"\s")
141NONSPC = re.compile(rb"\S")
142HEX = re.compile(rb"[0-9a-fA-F]")
143END_LITERAL = re.compile(rb"[#/%\[\]()<>{}\s]")
144END_HEX_STRING = re.compile(rb"[^\s0-9a-fA-F]")
145HEX_PAIR = re.compile(rb"[0-9a-fA-F]{2}|.")
146END_NUMBER = re.compile(rb"[^0-9]")
147END_KEYWORD = re.compile(rb"[#/%\[\]()<>{}\s]")
148END_STRING = re.compile(rb"[()\134]")
149OCT_STRING = re.compile(rb"[0-7]")
150ESC_STRING = {
151 b"b": 8,
152 b"t": 9,
153 b"n": 10,
154 b"f": 12,
155 b"r": 13,
156 b"(": 40,
157 b")": 41,
158 b"\\": 92,
159}
162PSBaseParserToken = Union[float, bool, PSLiteral, PSKeyword, bytes]
165class PSBaseParser:
166 """Most basic PostScript parser that performs only tokenization."""
168 BUFSIZ = 4096
170 def __init__(self, fp: BinaryIO) -> None:
171 self.fp = fp
172 self.seek(0)
174 def __repr__(self) -> str:
175 return "<%s: %r, bufpos=%d>" % (self.__class__.__name__, self.fp, self.bufpos)
177 def flush(self) -> None:
178 pass
180 def close(self) -> None:
181 self.flush()
183 def tell(self) -> int:
184 return self.bufpos + self.charpos
186 def poll(self, pos: Optional[int] = None, n: int = 80) -> None:
187 pos0 = self.fp.tell()
188 if not pos:
189 pos = self.bufpos + self.charpos
190 self.fp.seek(pos)
191 log.debug("poll(%d): %r", pos, self.fp.read(n))
192 self.fp.seek(pos0)
194 def seek(self, pos: int) -> None:
195 """Seeks the parser to the given position."""
196 log.debug("seek: %r", pos)
197 self.fp.seek(pos)
198 # reset the status for nextline()
199 self.bufpos = pos
200 self.buf = b""
201 self.charpos = 0
202 # reset the status for nexttoken()
203 self._parse1 = self._parse_main
204 self._curtoken = b""
205 self._curtokenpos = 0
206 self._tokens: List[Tuple[int, PSBaseParserToken]] = []
208 def fillbuf(self) -> None:
209 if self.charpos < len(self.buf):
210 return
211 # fetch next chunk.
212 self.bufpos = self.fp.tell()
213 self.buf = self.fp.read(self.BUFSIZ)
214 if not self.buf:
215 raise PSEOF("Unexpected EOF")
216 self.charpos = 0
218 def nextline(self) -> Tuple[int, bytes]:
219 """Fetches a next line that ends either with \\r or \\n."""
220 linebuf = b""
221 linepos = self.bufpos + self.charpos
222 eol = False
223 while 1:
224 self.fillbuf()
225 if eol:
226 c = self.buf[self.charpos : self.charpos + 1]
227 # handle b'\r\n'
228 if c == b"\n":
229 linebuf += c
230 self.charpos += 1
231 break
232 m = EOL.search(self.buf, self.charpos)
233 if m:
234 linebuf += self.buf[self.charpos : m.end(0)]
235 self.charpos = m.end(0)
236 if linebuf[-1:] == b"\r":
237 eol = True
238 else:
239 break
240 else:
241 linebuf += self.buf[self.charpos :]
242 self.charpos = len(self.buf)
243 log.debug("nextline: %r, %r", linepos, linebuf)
245 return (linepos, linebuf)
247 def revreadlines(self) -> Iterator[bytes]:
248 """Fetches a next line backword.
250 This is used to locate the trailers at the end of a file.
251 """
252 self.fp.seek(0, io.SEEK_END)
253 pos = self.fp.tell()
254 buf = b""
255 while pos > 0:
256 prevpos = pos
257 pos = max(0, pos - self.BUFSIZ)
258 self.fp.seek(pos)
259 s = self.fp.read(prevpos - pos)
260 if not s:
261 break
262 while 1:
263 n = max(s.rfind(b"\r"), s.rfind(b"\n"))
264 if n == -1:
265 buf = s + buf
266 break
267 yield s[n:] + buf
268 s = s[:n]
269 buf = b""
271 def _parse_main(self, s: bytes, i: int) -> int:
272 m = NONSPC.search(s, i)
273 if not m:
274 return len(s)
275 j = m.start(0)
276 c = s[j : j + 1]
277 self._curtokenpos = self.bufpos + j
278 if c == b"%":
279 self._curtoken = b"%"
280 self._parse1 = self._parse_comment
281 return j + 1
282 elif c == b"/":
283 self._curtoken = b""
284 self._parse1 = self._parse_literal
285 return j + 1
286 elif c in b"-+" or c.isdigit():
287 self._curtoken = c
288 self._parse1 = self._parse_number
289 return j + 1
290 elif c == b".":
291 self._curtoken = c
292 self._parse1 = self._parse_float
293 return j + 1
294 elif c.isalpha():
295 self._curtoken = c
296 self._parse1 = self._parse_keyword
297 return j + 1
298 elif c == b"(":
299 self._curtoken = b""
300 self.paren = 1
301 self._parse1 = self._parse_string
302 return j + 1
303 elif c == b"<":
304 self._curtoken = b""
305 self._parse1 = self._parse_wopen
306 return j + 1
307 elif c == b">":
308 self._curtoken = b""
309 self._parse1 = self._parse_wclose
310 return j + 1
311 elif c == b"\x00":
312 return j + 1
313 else:
314 self._add_token(KWD(c))
315 return j + 1
317 def _add_token(self, obj: PSBaseParserToken) -> None:
318 self._tokens.append((self._curtokenpos, obj))
320 def _parse_comment(self, s: bytes, i: int) -> int:
321 m = EOL.search(s, i)
322 if not m:
323 self._curtoken += s[i:]
324 return len(s)
325 j = m.start(0)
326 self._curtoken += s[i:j]
327 self._parse1 = self._parse_main
328 # We ignore comments.
329 # self._tokens.append(self._curtoken)
330 return j
332 def _parse_literal(self, s: bytes, i: int) -> int:
333 m = END_LITERAL.search(s, i)
334 if not m:
335 self._curtoken += s[i:]
336 return len(s)
337 j = m.start(0)
338 self._curtoken += s[i:j]
339 c = s[j : j + 1]
340 if c == b"#":
341 self.hex = b""
342 self._parse1 = self._parse_literal_hex
343 return j + 1
344 try:
345 name: Union[str, bytes] = str(self._curtoken, "utf-8")
346 except Exception:
347 name = self._curtoken
348 self._add_token(LIT(name))
349 self._parse1 = self._parse_main
350 return j
352 def _parse_literal_hex(self, s: bytes, i: int) -> int:
353 c = s[i : i + 1]
354 if HEX.match(c) and len(self.hex) < 2:
355 self.hex += c
356 return i + 1
357 if self.hex:
358 self._curtoken += bytes((int(self.hex, 16),))
359 self._parse1 = self._parse_literal
360 return i
362 def _parse_number(self, s: bytes, i: int) -> int:
363 m = END_NUMBER.search(s, i)
364 if not m:
365 self._curtoken += s[i:]
366 return len(s)
367 j = m.start(0)
368 self._curtoken += s[i:j]
369 c = s[j : j + 1]
370 if c == b".":
371 self._curtoken += c
372 self._parse1 = self._parse_float
373 return j + 1
374 try:
375 self._add_token(int(self._curtoken))
376 except ValueError:
377 pass
378 self._parse1 = self._parse_main
379 return j
381 def _parse_float(self, s: bytes, i: int) -> int:
382 m = END_NUMBER.search(s, i)
383 if not m:
384 self._curtoken += s[i:]
385 return len(s)
386 j = m.start(0)
387 self._curtoken += s[i:j]
388 try:
389 self._add_token(float(self._curtoken))
390 except ValueError:
391 pass
392 self._parse1 = self._parse_main
393 return j
395 def _parse_keyword(self, s: bytes, i: int) -> int:
396 m = END_KEYWORD.search(s, i)
397 if m:
398 j = m.start(0)
399 self._curtoken += s[i:j]
400 else:
401 # Use the rest of the stream if no non-keyword character is found. This
402 # can happen if the keyword is the final bytes of the stream
403 # (https://github.com/pdfminer/pdfminer.six/issues/884).
404 j = len(s)
405 self._curtoken += s[i:]
406 if self._curtoken == b"true":
407 token: Union[bool, PSKeyword] = True
408 elif self._curtoken == b"false":
409 token = False
410 else:
411 token = KWD(self._curtoken)
412 self._add_token(token)
413 self._parse1 = self._parse_main
414 return j
416 def _parse_string(self, s: bytes, i: int) -> int:
417 m = END_STRING.search(s, i)
418 if not m:
419 self._curtoken += s[i:]
420 return len(s)
421 j = m.start(0)
422 self._curtoken += s[i:j]
423 c = s[j : j + 1]
424 if c == b"\\":
425 self.oct = b""
426 self._parse1 = self._parse_string_1
427 return j + 1
428 if c == b"(":
429 self.paren += 1
430 self._curtoken += c
431 return j + 1
432 if c == b")":
433 self.paren -= 1
434 if self.paren:
435 # WTF, they said balanced parens need no special treatment.
436 self._curtoken += c
437 return j + 1
438 self._add_token(self._curtoken)
439 self._parse1 = self._parse_main
440 return j + 1
442 def _parse_string_1(self, s: bytes, i: int) -> int:
443 """Parse literal strings
445 PDF Reference 3.2.3
446 """
447 c = s[i : i + 1]
448 if OCT_STRING.match(c) and len(self.oct) < 3:
449 self.oct += c
450 return i + 1
452 elif self.oct:
453 chrcode = int(self.oct, 8)
454 assert chrcode < 256, "Invalid octal %s (%d)" % (repr(self.oct), chrcode)
455 self._curtoken += bytes((chrcode,))
456 self._parse1 = self._parse_string
457 return i
459 elif c in ESC_STRING:
460 self._curtoken += bytes((ESC_STRING[c],))
462 elif c == b"\r" and len(s) > i + 1 and s[i + 1 : i + 2] == b"\n":
463 # If current and next character is \r\n skip both because enters
464 # after a \ are ignored
465 i += 1
467 # default action
468 self._parse1 = self._parse_string
469 return i + 1
471 def _parse_wopen(self, s: bytes, i: int) -> int:
472 c = s[i : i + 1]
473 if c == b"<":
474 self._add_token(KEYWORD_DICT_BEGIN)
475 self._parse1 = self._parse_main
476 i += 1
477 else:
478 self._parse1 = self._parse_hexstring
479 return i
481 def _parse_wclose(self, s: bytes, i: int) -> int:
482 c = s[i : i + 1]
483 if c == b">":
484 self._add_token(KEYWORD_DICT_END)
485 i += 1
486 self._parse1 = self._parse_main
487 return i
489 def _parse_hexstring(self, s: bytes, i: int) -> int:
490 m = END_HEX_STRING.search(s, i)
491 if not m:
492 self._curtoken += s[i:]
493 return len(s)
494 j = m.start(0)
495 self._curtoken += s[i:j]
496 token = HEX_PAIR.sub(
497 lambda m: bytes((int(m.group(0), 16),)),
498 SPC.sub(b"", self._curtoken),
499 )
500 self._add_token(token)
501 self._parse1 = self._parse_main
502 return j
504 def nexttoken(self) -> Tuple[int, PSBaseParserToken]:
505 while not self._tokens:
506 self.fillbuf()
507 self.charpos = self._parse1(self.buf, self.charpos)
508 token = self._tokens.pop(0)
509 log.debug("nexttoken: %r", token)
510 return token
513# Stack slots may by occupied by any of:
514# * the name of a literal
515# * the PSBaseParserToken types
516# * list (via KEYWORD_ARRAY)
517# * dict (via KEYWORD_DICT)
518# * subclass-specific extensions (e.g. PDFStream, PDFObjRef) via ExtraT
519ExtraT = TypeVar("ExtraT")
520PSStackType = Union[str, float, bool, PSLiteral, bytes, List, Dict, ExtraT]
521PSStackEntry = Tuple[int, PSStackType[ExtraT]]
524class PSStackParser(PSBaseParser, Generic[ExtraT]):
525 def __init__(self, fp: BinaryIO) -> None:
526 PSBaseParser.__init__(self, fp)
527 self.reset()
529 def reset(self) -> None:
530 self.context: List[Tuple[int, Optional[str], List[PSStackEntry[ExtraT]]]] = []
531 self.curtype: Optional[str] = None
532 self.curstack: List[PSStackEntry[ExtraT]] = []
533 self.results: List[PSStackEntry[ExtraT]] = []
535 def seek(self, pos: int) -> None:
536 PSBaseParser.seek(self, pos)
537 self.reset()
539 def push(self, *objs: PSStackEntry[ExtraT]) -> None:
540 self.curstack.extend(objs)
542 def pop(self, n: int) -> List[PSStackEntry[ExtraT]]:
543 objs = self.curstack[-n:]
544 self.curstack[-n:] = []
545 return objs
547 def popall(self) -> List[PSStackEntry[ExtraT]]:
548 objs = self.curstack
549 self.curstack = []
550 return objs
552 def add_results(self, *objs: PSStackEntry[ExtraT]) -> None:
553 try:
554 log.debug("add_results: %r", objs)
555 except Exception:
556 log.debug("add_results: (unprintable object)")
557 self.results.extend(objs)
559 def start_type(self, pos: int, type: str) -> None:
560 self.context.append((pos, self.curtype, self.curstack))
561 (self.curtype, self.curstack) = (type, [])
562 log.debug("start_type: pos=%r, type=%r", pos, type)
564 def end_type(self, type: str) -> Tuple[int, List[PSStackType[ExtraT]]]:
565 if self.curtype != type:
566 raise PSTypeError(f"Type mismatch: {self.curtype!r} != {type!r}")
567 objs = [obj for (_, obj) in self.curstack]
568 (pos, self.curtype, self.curstack) = self.context.pop()
569 log.debug("end_type: pos=%r, type=%r, objs=%r", pos, type, objs)
570 return (pos, objs)
572 def do_keyword(self, pos: int, token: PSKeyword) -> None:
573 pass
575 def nextobject(self) -> PSStackEntry[ExtraT]:
576 """Yields a list of objects.
578 Arrays and dictionaries are represented as Python lists and
579 dictionaries.
581 :return: keywords, literals, strings, numbers, arrays and dictionaries.
582 """
583 while not self.results:
584 (pos, token) = self.nexttoken()
585 if isinstance(token, (int, float, bool, str, bytes, PSLiteral)):
586 # normal token
587 self.push((pos, token))
588 elif token == KEYWORD_ARRAY_BEGIN:
589 # begin array
590 self.start_type(pos, "a")
591 elif token == KEYWORD_ARRAY_END:
592 # end array
593 try:
594 self.push(self.end_type("a"))
595 except PSTypeError:
596 if settings.STRICT:
597 raise
598 elif token == KEYWORD_DICT_BEGIN:
599 # begin dictionary
600 self.start_type(pos, "d")
601 elif token == KEYWORD_DICT_END:
602 # end dictionary
603 try:
604 (pos, objs) = self.end_type("d")
605 if len(objs) % 2 != 0:
606 error_msg = "Invalid dictionary construct: %r" % objs
607 raise PSSyntaxError(error_msg)
608 d = {
609 literal_name(k): v
610 for (k, v) in choplist(2, objs)
611 if v is not None
612 }
613 self.push((pos, d))
614 except PSTypeError:
615 if settings.STRICT:
616 raise
617 elif token == KEYWORD_PROC_BEGIN:
618 # begin proc
619 self.start_type(pos, "p")
620 elif token == KEYWORD_PROC_END:
621 # end proc
622 try:
623 self.push(self.end_type("p"))
624 except PSTypeError:
625 if settings.STRICT:
626 raise
627 elif isinstance(token, PSKeyword):
628 log.debug(
629 "do_keyword: pos=%r, token=%r, stack=%r",
630 pos,
631 token,
632 self.curstack,
633 )
634 self.do_keyword(pos, token)
635 else:
636 log.error(
637 "unknown token: pos=%r, token=%r, stack=%r",
638 pos,
639 token,
640 self.curstack,
641 )
642 self.do_keyword(pos, token)
643 raise PSException
644 if self.context:
645 continue
646 else:
647 self.flush()
648 obj = self.results.pop(0)
649 try:
650 log.debug("nextobject: %r", obj)
651 except Exception:
652 log.debug("nextobject: (unprintable object)")
653 return obj