Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pdfminer/lzw.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

76 statements  

1import logging 

2from collections.abc import Iterator 

3from io import BytesIO 

4from typing import BinaryIO, cast 

5 

6from pdfminer.pdfexceptions import PDFEOFError, PDFException 

7 

8logger = logging.getLogger(__name__) 

9 

10 

11class CorruptDataError(PDFException): 

12 pass 

13 

14 

15class LZWDecoder: 

16 def __init__(self, fp: BinaryIO) -> None: 

17 self.fp = fp 

18 self.buff = 0 

19 self.bpos = 8 

20 self.nbits = 9 

21 # NB: self.table stores None only in indices 256 and 257 

22 self.table: list[bytes | None] = [] 

23 self.prevbuf: bytes | None = None 

24 

25 def readbits(self, bits: int) -> int: 

26 v = 0 

27 while 1: 

28 # the number of remaining bits we can get from the current buffer. 

29 r = 8 - self.bpos 

30 if bits <= r: 

31 # |-----8-bits-----| 

32 # |-bpos-|-bits-| | 

33 # | |----r----| 

34 v = (v << bits) | ((self.buff >> (r - bits)) & ((1 << bits) - 1)) 

35 self.bpos += bits 

36 break 

37 else: 

38 # |-----8-bits-----| 

39 # |-bpos-|---bits----... 

40 # | |----r----| 

41 v = (v << r) | (self.buff & ((1 << r) - 1)) 

42 bits -= r 

43 x = self.fp.read(1) 

44 if not x: 

45 raise PDFEOFError 

46 self.buff = ord(x) 

47 self.bpos = 0 

48 return v 

49 

50 def feed(self, code: int) -> bytes: 

51 x = b"" 

52 if code == 256: 

53 self.table = [bytes((c,)) for c in range(256)] # 0-255 

54 self.table.append(None) # 256 

55 self.table.append(None) # 257 

56 self.prevbuf = b"" 

57 self.nbits = 9 

58 elif code == 257: 

59 pass 

60 elif not self.prevbuf: 

61 x = self.prevbuf = cast(bytes, self.table[code]) # assume not None 

62 else: 

63 if code < len(self.table): 

64 x = cast(bytes, self.table[code]) # assume not None 

65 self.table.append(self.prevbuf + x[:1]) 

66 elif code == len(self.table): 

67 self.table.append(self.prevbuf + self.prevbuf[:1]) 

68 x = cast(bytes, self.table[code]) 

69 else: 

70 raise CorruptDataError 

71 table_length = len(self.table) 

72 if table_length == 511: 

73 self.nbits = 10 

74 elif table_length == 1023: 

75 self.nbits = 11 

76 elif table_length == 2047: 

77 self.nbits = 12 

78 self.prevbuf = x 

79 return x 

80 

81 def run(self) -> Iterator[bytes]: 

82 while 1: 

83 try: 

84 code = self.readbits(self.nbits) 

85 except EOFError: 

86 break 

87 try: 

88 x = self.feed(code) 

89 except CorruptDataError: 

90 # just ignore corrupt data and stop yielding there 

91 break 

92 yield x 

93 

94 logger.debug( 

95 "nbits=%d, code=%d, output=%r, table=%r", 

96 self.nbits, 

97 code, 

98 x, 

99 self.table[258:], 

100 ) 

101 

102 

103def lzwdecode(data: bytes) -> bytes: 

104 fp = BytesIO(data) 

105 s = LZWDecoder(fp).run() 

106 return b"".join(s)