Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/lzw.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2from io import BytesIO
3from typing import BinaryIO, Iterator, List, Optional, cast
5from pdfminer.pdfexceptions import PDFEOFError, PDFException
7logger = logging.getLogger(__name__)
10class CorruptDataError(PDFException):
11 pass
14class LZWDecoder:
15 def __init__(self, fp: BinaryIO) -> None:
16 self.fp = fp
17 self.buff = 0
18 self.bpos = 8
19 self.nbits = 9
20 # NB: self.table stores None only in indices 256 and 257
21 self.table: List[Optional[bytes]] = []
22 self.prevbuf: Optional[bytes] = None
24 def readbits(self, bits: int) -> int:
25 v = 0
26 while 1:
27 # the number of remaining bits we can get from the current buffer.
28 r = 8 - self.bpos
29 if bits <= r:
30 # |-----8-bits-----|
31 # |-bpos-|-bits-| |
32 # | |----r----|
33 v = (v << bits) | ((self.buff >> (r - bits)) & ((1 << bits) - 1))
34 self.bpos += bits
35 break
36 else:
37 # |-----8-bits-----|
38 # |-bpos-|---bits----...
39 # | |----r----|
40 v = (v << r) | (self.buff & ((1 << r) - 1))
41 bits -= r
42 x = self.fp.read(1)
43 if not x:
44 raise PDFEOFError
45 self.buff = ord(x)
46 self.bpos = 0
47 return v
49 def feed(self, code: int) -> bytes:
50 x = b""
51 if code == 256:
52 self.table = [bytes((c,)) for c in range(256)] # 0-255
53 self.table.append(None) # 256
54 self.table.append(None) # 257
55 self.prevbuf = b""
56 self.nbits = 9
57 elif code == 257:
58 pass
59 elif not self.prevbuf:
60 x = self.prevbuf = cast(bytes, self.table[code]) # assume not None
61 else:
62 if code < len(self.table):
63 x = cast(bytes, self.table[code]) # assume not None
64 self.table.append(self.prevbuf + x[:1])
65 elif code == len(self.table):
66 self.table.append(self.prevbuf + self.prevbuf[:1])
67 x = cast(bytes, self.table[code])
68 else:
69 raise CorruptDataError
70 table_length = len(self.table)
71 if table_length == 511:
72 self.nbits = 10
73 elif table_length == 1023:
74 self.nbits = 11
75 elif table_length == 2047:
76 self.nbits = 12
77 self.prevbuf = x
78 return x
80 def run(self) -> Iterator[bytes]:
81 while 1:
82 try:
83 code = self.readbits(self.nbits)
84 except EOFError:
85 break
86 try:
87 x = self.feed(code)
88 except CorruptDataError:
89 # just ignore corrupt data and stop yielding there
90 break
91 yield x
93 logger.debug(
94 "nbits=%d, code=%d, output=%r, table=%r",
95 self.nbits,
96 code,
97 x,
98 self.table[258:],
99 )
102def lzwdecode(data: bytes) -> bytes:
103 fp = BytesIO(data)
104 s = LZWDecoder(fp).run()
105 return b"".join(s)