Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/jbig2.py: 21%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

211 statements  

1import math 

2import os 

3from struct import calcsize, pack, unpack 

4from typing import BinaryIO, Dict, Iterable, List, Optional, Tuple, Union, cast 

5 

6from pdfminer.pdfexceptions import PDFValueError 

7 

8# segment structure base 

9SEG_STRUCT = [ 

10 (">L", "number"), 

11 (">B", "flags"), 

12 (">B", "retention_flags"), 

13 (">B", "page_assoc"), 

14 (">L", "data_length"), 

15] 

16 

17# segment header literals 

18HEADER_FLAG_DEFERRED = 0b10000000 

19HEADER_FLAG_PAGE_ASSOC_LONG = 0b01000000 

20 

21SEG_TYPE_MASK = 0b00111111 

22 

23REF_COUNT_SHORT_MASK = 0b11100000 

24REF_COUNT_LONG_MASK = 0x1FFFFFFF 

25REF_COUNT_LONG = 7 

26 

27DATA_LEN_UNKNOWN = 0xFFFFFFFF 

28 

29# segment types 

30SEG_TYPE_IMMEDIATE_GEN_REGION = 38 

31SEG_TYPE_END_OF_PAGE = 49 

32SEG_TYPE_END_OF_FILE = 51 

33 

34# file literals 

35FILE_HEADER_ID = b"\x97\x4a\x42\x32\x0d\x0a\x1a\x0a" 

36FILE_HEAD_FLAG_SEQUENTIAL = 0b00000001 

37 

38 

39def bit_set(bit_pos: int, value: int) -> bool: 

40 return bool((value >> bit_pos) & 1) 

41 

42 

43def check_flag(flag: int, value: int) -> bool: 

44 return bool(flag & value) 

45 

46 

47def masked_value(mask: int, value: int) -> int: 

48 for bit_pos in range(31): 

49 if bit_set(bit_pos, mask): 

50 return (value & mask) >> bit_pos 

51 

52 raise PDFValueError("Invalid mask or value") 

53 

54 

55def mask_value(mask: int, value: int) -> int: 

56 for bit_pos in range(31): 

57 if bit_set(bit_pos, mask): 

58 return (value & (mask >> bit_pos)) << bit_pos 

59 

60 raise PDFValueError("Invalid mask or value") 

61 

62 

63def unpack_int(format: str, buffer: bytes) -> int: 

64 assert format in {">B", ">I", ">L"} 

65 [result] = cast(Tuple[int], unpack(format, buffer)) 

66 return result 

67 

68 

69JBIG2SegmentFlags = Dict[str, Union[int, bool]] 

70JBIG2RetentionFlags = Dict[str, Union[int, List[int], List[bool]]] 

71JBIG2Segment = Dict[ 

72 str, 

73 Union[bool, int, bytes, JBIG2SegmentFlags, JBIG2RetentionFlags], 

74] 

75 

76 

77class JBIG2StreamReader: 

78 """Read segments from a JBIG2 byte stream""" 

79 

80 def __init__(self, stream: BinaryIO) -> None: 

81 self.stream = stream 

82 

83 def get_segments(self) -> List[JBIG2Segment]: 

84 segments: List[JBIG2Segment] = [] 

85 while not self.is_eof(): 

86 segment: JBIG2Segment = {} 

87 for field_format, name in SEG_STRUCT: 

88 field_len = calcsize(field_format) 

89 field = self.stream.read(field_len) 

90 if len(field) < field_len: 

91 segment["_error"] = True 

92 break 

93 value = unpack_int(field_format, field) 

94 parser = getattr(self, "parse_%s" % name, None) 

95 if callable(parser): 

96 value = parser(segment, value, field) 

97 segment[name] = value 

98 

99 if not segment.get("_error"): 

100 segments.append(segment) 

101 return segments 

102 

103 def is_eof(self) -> bool: 

104 if self.stream.read(1) == b"": 

105 return True 

106 else: 

107 self.stream.seek(-1, os.SEEK_CUR) 

108 return False 

109 

110 def parse_flags( 

111 self, 

112 segment: JBIG2Segment, 

113 flags: int, 

114 field: bytes, 

115 ) -> JBIG2SegmentFlags: 

116 return { 

117 "deferred": check_flag(HEADER_FLAG_DEFERRED, flags), 

118 "page_assoc_long": check_flag(HEADER_FLAG_PAGE_ASSOC_LONG, flags), 

119 "type": masked_value(SEG_TYPE_MASK, flags), 

120 } 

121 

122 def parse_retention_flags( 

123 self, 

124 segment: JBIG2Segment, 

125 flags: int, 

126 field: bytes, 

127 ) -> JBIG2RetentionFlags: 

128 ref_count = masked_value(REF_COUNT_SHORT_MASK, flags) 

129 retain_segments = [] 

130 ref_segments = [] 

131 

132 if ref_count < REF_COUNT_LONG: 

133 for bit_pos in range(5): 

134 retain_segments.append(bit_set(bit_pos, flags)) 

135 else: 

136 field += self.stream.read(3) 

137 ref_count = unpack_int(">L", field) 

138 ref_count = masked_value(REF_COUNT_LONG_MASK, ref_count) 

139 ret_bytes_count = int(math.ceil((ref_count + 1) / 8)) 

140 for ret_byte_index in range(ret_bytes_count): 

141 ret_byte = unpack_int(">B", self.stream.read(1)) 

142 for bit_pos in range(7): 

143 retain_segments.append(bit_set(bit_pos, ret_byte)) 

144 

145 seg_num = segment["number"] 

146 assert isinstance(seg_num, int) 

147 if seg_num <= 256: 

148 ref_format = ">B" 

149 elif seg_num <= 65536: 

150 ref_format = ">I" 

151 else: 

152 ref_format = ">L" 

153 

154 ref_size = calcsize(ref_format) 

155 

156 for ref_index in range(ref_count): 

157 ref_data = self.stream.read(ref_size) 

158 ref = unpack_int(ref_format, ref_data) 

159 ref_segments.append(ref) 

160 

161 return { 

162 "ref_count": ref_count, 

163 "retain_segments": retain_segments, 

164 "ref_segments": ref_segments, 

165 } 

166 

167 def parse_page_assoc(self, segment: JBIG2Segment, page: int, field: bytes) -> int: 

168 if cast(JBIG2SegmentFlags, segment["flags"])["page_assoc_long"]: 

169 field += self.stream.read(3) 

170 page = unpack_int(">L", field) 

171 return page 

172 

173 def parse_data_length( 

174 self, 

175 segment: JBIG2Segment, 

176 length: int, 

177 field: bytes, 

178 ) -> int: 

179 if length: 

180 if ( 

181 cast(JBIG2SegmentFlags, segment["flags"])["type"] 

182 == SEG_TYPE_IMMEDIATE_GEN_REGION 

183 ) and (length == DATA_LEN_UNKNOWN): 

184 raise NotImplementedError( 

185 "Working with unknown segment length is not implemented yet", 

186 ) 

187 else: 

188 segment["raw_data"] = self.stream.read(length) 

189 

190 return length 

191 

192 

193class JBIG2StreamWriter: 

194 """Write JBIG2 segments to a file in JBIG2 format""" 

195 

196 EMPTY_RETENTION_FLAGS: JBIG2RetentionFlags = { 

197 "ref_count": 0, 

198 "ref_segments": cast(List[int], []), 

199 "retain_segments": cast(List[bool], []), 

200 } 

201 

202 def __init__(self, stream: BinaryIO) -> None: 

203 self.stream = stream 

204 

205 def write_segments( 

206 self, 

207 segments: Iterable[JBIG2Segment], 

208 fix_last_page: bool = True, 

209 ) -> int: 

210 data_len = 0 

211 current_page: Optional[int] = None 

212 seg_num: Optional[int] = None 

213 

214 for segment in segments: 

215 data = self.encode_segment(segment) 

216 self.stream.write(data) 

217 data_len += len(data) 

218 

219 seg_num = cast(Optional[int], segment["number"]) 

220 

221 if fix_last_page: 

222 seg_page = cast(int, segment.get("page_assoc")) 

223 

224 if ( 

225 cast(JBIG2SegmentFlags, segment["flags"])["type"] 

226 == SEG_TYPE_END_OF_PAGE 

227 ): 

228 current_page = None 

229 elif seg_page: 

230 current_page = seg_page 

231 

232 if fix_last_page and current_page and (seg_num is not None): 

233 segment = self.get_eop_segment(seg_num + 1, current_page) 

234 data = self.encode_segment(segment) 

235 self.stream.write(data) 

236 data_len += len(data) 

237 

238 return data_len 

239 

240 def write_file( 

241 self, 

242 segments: Iterable[JBIG2Segment], 

243 fix_last_page: bool = True, 

244 ) -> int: 

245 header = FILE_HEADER_ID 

246 header_flags = FILE_HEAD_FLAG_SEQUENTIAL 

247 header += pack(">B", header_flags) 

248 # The embedded JBIG2 files in a PDF always 

249 # only have one page 

250 number_of_pages = pack(">L", 1) 

251 header += number_of_pages 

252 self.stream.write(header) 

253 data_len = len(header) 

254 

255 data_len += self.write_segments(segments, fix_last_page) 

256 

257 seg_num = 0 

258 for segment in segments: 

259 seg_num = cast(int, segment["number"]) 

260 

261 if fix_last_page: 

262 seg_num_offset = 2 

263 else: 

264 seg_num_offset = 1 

265 eof_segment = self.get_eof_segment(seg_num + seg_num_offset) 

266 data = self.encode_segment(eof_segment) 

267 

268 self.stream.write(data) 

269 data_len += len(data) 

270 

271 return data_len 

272 

273 def encode_segment(self, segment: JBIG2Segment) -> bytes: 

274 data = b"" 

275 for field_format, name in SEG_STRUCT: 

276 value = segment.get(name) 

277 encoder = getattr(self, "encode_%s" % name, None) 

278 if callable(encoder): 

279 field = encoder(value, segment) 

280 else: 

281 field = pack(field_format, value) 

282 data += field 

283 return data 

284 

285 def encode_flags(self, value: JBIG2SegmentFlags, segment: JBIG2Segment) -> bytes: 

286 flags = 0 

287 if value.get("deferred"): 

288 flags |= HEADER_FLAG_DEFERRED 

289 

290 if "page_assoc_long" in value: 

291 flags |= HEADER_FLAG_PAGE_ASSOC_LONG if value["page_assoc_long"] else flags 

292 else: 

293 flags |= ( 

294 HEADER_FLAG_PAGE_ASSOC_LONG 

295 if cast(int, segment.get("page", 0)) > 255 

296 else flags 

297 ) 

298 

299 flags |= mask_value(SEG_TYPE_MASK, value["type"]) 

300 

301 return pack(">B", flags) 

302 

303 def encode_retention_flags( 

304 self, 

305 value: JBIG2RetentionFlags, 

306 segment: JBIG2Segment, 

307 ) -> bytes: 

308 flags = [] 

309 flags_format = ">B" 

310 ref_count = value["ref_count"] 

311 assert isinstance(ref_count, int) 

312 retain_segments = cast(List[bool], value.get("retain_segments", [])) 

313 

314 if ref_count <= 4: 

315 flags_byte = mask_value(REF_COUNT_SHORT_MASK, ref_count) 

316 for ref_index, ref_retain in enumerate(retain_segments): 

317 if ref_retain: 

318 flags_byte |= 1 << ref_index 

319 flags.append(flags_byte) 

320 else: 

321 bytes_count = math.ceil((ref_count + 1) / 8) 

322 flags_format = ">L" + ("B" * bytes_count) 

323 flags_dword = mask_value(REF_COUNT_SHORT_MASK, REF_COUNT_LONG) << 24 

324 flags.append(flags_dword) 

325 

326 for byte_index in range(bytes_count): 

327 ret_byte = 0 

328 ret_part = retain_segments[byte_index * 8 : byte_index * 8 + 8] 

329 for bit_pos, ret_seg in enumerate(ret_part): 

330 ret_byte |= 1 << bit_pos if ret_seg else ret_byte 

331 

332 flags.append(ret_byte) 

333 

334 ref_segments = cast(List[int], value.get("ref_segments", [])) 

335 

336 seg_num = cast(int, segment["number"]) 

337 if seg_num <= 256: 

338 ref_format = "B" 

339 elif seg_num <= 65536: 

340 ref_format = "I" 

341 else: 

342 ref_format = "L" 

343 

344 for ref in ref_segments: 

345 flags_format += ref_format 

346 flags.append(ref) 

347 

348 return pack(flags_format, *flags) 

349 

350 def encode_data_length(self, value: int, segment: JBIG2Segment) -> bytes: 

351 data = pack(">L", value) 

352 data += cast(bytes, segment["raw_data"]) 

353 return data 

354 

355 def get_eop_segment(self, seg_number: int, page_number: int) -> JBIG2Segment: 

356 return { 

357 "data_length": 0, 

358 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_PAGE}, 

359 "number": seg_number, 

360 "page_assoc": page_number, 

361 "raw_data": b"", 

362 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS, 

363 } 

364 

365 def get_eof_segment(self, seg_number: int) -> JBIG2Segment: 

366 return { 

367 "data_length": 0, 

368 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_FILE}, 

369 "number": seg_number, 

370 "page_assoc": 0, 

371 "raw_data": b"", 

372 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS, 

373 }