Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/lzw.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

73 statements  

1import logging 

2from io import BytesIO 

3from typing import BinaryIO, Iterator, List, Optional, cast 

4 

5from pdfminer.pdfexceptions import PDFEOFError, PDFException 

6 

7logger = logging.getLogger(__name__) 

8 

9 

10class CorruptDataError(PDFException): 

11 pass 

12 

13 

14class LZWDecoder: 

15 def __init__(self, fp: BinaryIO) -> None: 

16 self.fp = fp 

17 self.buff = 0 

18 self.bpos = 8 

19 self.nbits = 9 

20 # NB: self.table stores None only in indices 256 and 257 

21 self.table: List[Optional[bytes]] = [] 

22 self.prevbuf: Optional[bytes] = None 

23 

24 def readbits(self, bits: int) -> int: 

25 v = 0 

26 while 1: 

27 # the number of remaining bits we can get from the current buffer. 

28 r = 8 - self.bpos 

29 if bits <= r: 

30 # |-----8-bits-----| 

31 # |-bpos-|-bits-| | 

32 # | |----r----| 

33 v = (v << bits) | ((self.buff >> (r - bits)) & ((1 << bits) - 1)) 

34 self.bpos += bits 

35 break 

36 else: 

37 # |-----8-bits-----| 

38 # |-bpos-|---bits----... 

39 # | |----r----| 

40 v = (v << r) | (self.buff & ((1 << r) - 1)) 

41 bits -= r 

42 x = self.fp.read(1) 

43 if not x: 

44 raise PDFEOFError 

45 self.buff = ord(x) 

46 self.bpos = 0 

47 return v 

48 

49 def feed(self, code: int) -> bytes: 

50 x = b"" 

51 if code == 256: 

52 self.table = [bytes((c,)) for c in range(256)] # 0-255 

53 self.table.append(None) # 256 

54 self.table.append(None) # 257 

55 self.prevbuf = b"" 

56 self.nbits = 9 

57 elif code == 257: 

58 pass 

59 elif not self.prevbuf: 

60 x = self.prevbuf = cast(bytes, self.table[code]) # assume not None 

61 else: 

62 if code < len(self.table): 

63 x = cast(bytes, self.table[code]) # assume not None 

64 self.table.append(self.prevbuf + x[:1]) 

65 elif code == len(self.table): 

66 self.table.append(self.prevbuf + self.prevbuf[:1]) 

67 x = cast(bytes, self.table[code]) 

68 else: 

69 raise CorruptDataError 

70 table_length = len(self.table) 

71 if table_length == 511: 

72 self.nbits = 10 

73 elif table_length == 1023: 

74 self.nbits = 11 

75 elif table_length == 2047: 

76 self.nbits = 12 

77 self.prevbuf = x 

78 return x 

79 

80 def run(self) -> Iterator[bytes]: 

81 while 1: 

82 try: 

83 code = self.readbits(self.nbits) 

84 except EOFError: 

85 break 

86 try: 

87 x = self.feed(code) 

88 except CorruptDataError: 

89 # just ignore corrupt data and stop yielding there 

90 break 

91 yield x 

92 

93 logger.debug( 

94 "nbits=%d, code=%d, output=%r, table=%r", 

95 self.nbits, 

96 code, 

97 x, 

98 self.table[258:], 

99 ) 

100 

101 

102def lzwdecode(data: bytes) -> bytes: 

103 fp = BytesIO(data) 

104 s = LZWDecoder(fp).run() 

105 return b"".join(s)