Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pdfminer/lzw.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1import logging
2from collections.abc import Iterator
3from io import BytesIO
4from typing import BinaryIO, cast
6from pdfminer.pdfexceptions import PDFEOFError, PDFException
8logger = logging.getLogger(__name__)
11class CorruptDataError(PDFException):
12 pass
15class LZWDecoder:
16 def __init__(self, fp: BinaryIO) -> None:
17 self.fp = fp
18 self.buff = 0
19 self.bpos = 8
20 self.nbits = 9
21 # NB: self.table stores None only in indices 256 and 257
22 self.table: list[bytes | None] = []
23 self.prevbuf: bytes | None = None
25 def readbits(self, bits: int) -> int:
26 v = 0
27 while 1:
28 # the number of remaining bits we can get from the current buffer.
29 r = 8 - self.bpos
30 if bits <= r:
31 # |-----8-bits-----|
32 # |-bpos-|-bits-| |
33 # | |----r----|
34 v = (v << bits) | ((self.buff >> (r - bits)) & ((1 << bits) - 1))
35 self.bpos += bits
36 break
37 else:
38 # |-----8-bits-----|
39 # |-bpos-|---bits----...
40 # | |----r----|
41 v = (v << r) | (self.buff & ((1 << r) - 1))
42 bits -= r
43 x = self.fp.read(1)
44 if not x:
45 raise PDFEOFError
46 self.buff = ord(x)
47 self.bpos = 0
48 return v
50 def feed(self, code: int) -> bytes:
51 x = b""
52 if code == 256:
53 self.table = [bytes((c,)) for c in range(256)] # 0-255
54 self.table.append(None) # 256
55 self.table.append(None) # 257
56 self.prevbuf = b""
57 self.nbits = 9
58 elif code == 257:
59 pass
60 elif not self.prevbuf:
61 x = self.prevbuf = cast(bytes, self.table[code]) # assume not None
62 else:
63 if code < len(self.table):
64 x = cast(bytes, self.table[code]) # assume not None
65 self.table.append(self.prevbuf + x[:1])
66 elif code == len(self.table):
67 self.table.append(self.prevbuf + self.prevbuf[:1])
68 x = cast(bytes, self.table[code])
69 else:
70 raise CorruptDataError
71 table_length = len(self.table)
72 if table_length == 511:
73 self.nbits = 10
74 elif table_length == 1023:
75 self.nbits = 11
76 elif table_length == 2047:
77 self.nbits = 12
78 self.prevbuf = x
79 return x
81 def run(self) -> Iterator[bytes]:
82 while 1:
83 try:
84 code = self.readbits(self.nbits)
85 except EOFError:
86 break
87 try:
88 x = self.feed(code)
89 except CorruptDataError:
90 # just ignore corrupt data and stop yielding there
91 break
92 yield x
94 logger.debug(
95 "nbits=%d, code=%d, output=%r, table=%r",
96 self.nbits,
97 code,
98 x,
99 self.table[258:],
100 )
103def lzwdecode(data: bytes) -> bytes:
104 fp = BytesIO(data)
105 s = LZWDecoder(fp).run()
106 return b"".join(s)