Coverage for /pythoncovmergedfiles/medio/medio/src/pdfminer.six/pdfminer/jbig2.py: 22%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

210 statements  

1import math 

2import os 

3from collections.abc import Iterable 

4from struct import calcsize, pack, unpack 

5from typing import BinaryIO, ClassVar, cast 

6 

7from pdfminer.pdfexceptions import PDFValueError 

8 

9# segment structure base 

10SEG_STRUCT = [ 

11 (">L", "number"), 

12 (">B", "flags"), 

13 (">B", "retention_flags"), 

14 (">B", "page_assoc"), 

15 (">L", "data_length"), 

16] 

17 

18# segment header literals 

19HEADER_FLAG_DEFERRED = 0b10000000 

20HEADER_FLAG_PAGE_ASSOC_LONG = 0b01000000 

21 

22SEG_TYPE_MASK = 0b00111111 

23 

24REF_COUNT_SHORT_MASK = 0b11100000 

25REF_COUNT_LONG_MASK = 0x1FFFFFFF 

26REF_COUNT_LONG = 7 

27 

28DATA_LEN_UNKNOWN = 0xFFFFFFFF 

29 

30# segment types 

31SEG_TYPE_IMMEDIATE_GEN_REGION = 38 

32SEG_TYPE_END_OF_PAGE = 49 

33SEG_TYPE_END_OF_FILE = 51 

34 

35# file literals 

36FILE_HEADER_ID = b"\x97\x4a\x42\x32\x0d\x0a\x1a\x0a" 

37FILE_HEAD_FLAG_SEQUENTIAL = 0b00000001 

38 

39 

40def bit_set(bit_pos: int, value: int) -> bool: 

41 return bool((value >> bit_pos) & 1) 

42 

43 

44def check_flag(flag: int, value: int) -> bool: 

45 return bool(flag & value) 

46 

47 

48def masked_value(mask: int, value: int) -> int: 

49 for bit_pos in range(31): 

50 if bit_set(bit_pos, mask): 

51 return (value & mask) >> bit_pos 

52 

53 raise PDFValueError("Invalid mask or value") 

54 

55 

56def mask_value(mask: int, value: int) -> int: 

57 for bit_pos in range(31): 

58 if bit_set(bit_pos, mask): 

59 return (value & (mask >> bit_pos)) << bit_pos 

60 

61 raise PDFValueError("Invalid mask or value") 

62 

63 

64def unpack_int(format: str, buffer: bytes) -> int: 

65 assert format in {">B", ">I", ">L"} 

66 [result] = cast(tuple[int], unpack(format, buffer)) 

67 return result 

68 

69 

70JBIG2SegmentFlags = dict[str, int | bool] 

71JBIG2RetentionFlags = dict[str, int | list[int] | list[bool]] 

72JBIG2Segment = dict[ 

73 str, 

74 bool | int | bytes | JBIG2SegmentFlags | JBIG2RetentionFlags, 

75] 

76 

77 

78class JBIG2StreamReader: 

79 """Read segments from a JBIG2 byte stream""" 

80 

81 def __init__(self, stream: BinaryIO) -> None: 

82 self.stream = stream 

83 

84 def get_segments(self) -> list[JBIG2Segment]: 

85 segments: list[JBIG2Segment] = [] 

86 while not self.is_eof(): 

87 segment: JBIG2Segment = {} 

88 for field_format, name in SEG_STRUCT: 

89 field_len = calcsize(field_format) 

90 field = self.stream.read(field_len) 

91 if len(field) < field_len: 

92 segment["_error"] = True 

93 break 

94 value = unpack_int(field_format, field) 

95 parser = getattr(self, f"parse_{name}", None) 

96 if callable(parser): 

97 value = parser(segment, value, field) 

98 segment[name] = value 

99 

100 if not segment.get("_error"): 

101 segments.append(segment) 

102 return segments 

103 

104 def is_eof(self) -> bool: 

105 if self.stream.read(1) == b"": 

106 return True 

107 else: 

108 self.stream.seek(-1, os.SEEK_CUR) 

109 return False 

110 

111 def parse_flags( 

112 self, 

113 segment: JBIG2Segment, 

114 flags: int, 

115 field: bytes, 

116 ) -> JBIG2SegmentFlags: 

117 return { 

118 "deferred": check_flag(HEADER_FLAG_DEFERRED, flags), 

119 "page_assoc_long": check_flag(HEADER_FLAG_PAGE_ASSOC_LONG, flags), 

120 "type": masked_value(SEG_TYPE_MASK, flags), 

121 } 

122 

123 def parse_retention_flags( 

124 self, 

125 segment: JBIG2Segment, 

126 flags: int, 

127 field: bytes, 

128 ) -> JBIG2RetentionFlags: 

129 ref_count = masked_value(REF_COUNT_SHORT_MASK, flags) 

130 retain_segments = [] 

131 ref_segments = [] 

132 

133 if ref_count < REF_COUNT_LONG: 

134 for bit_pos in range(5): 

135 retain_segments.append(bit_set(bit_pos, flags)) 

136 else: 

137 field += self.stream.read(3) 

138 ref_count = unpack_int(">L", field) 

139 ref_count = masked_value(REF_COUNT_LONG_MASK, ref_count) 

140 ret_bytes_count = math.ceil((ref_count + 1) / 8) 

141 for _ret_byte_index in range(ret_bytes_count): 

142 ret_byte = unpack_int(">B", self.stream.read(1)) 

143 for bit_pos in range(7): 

144 retain_segments.append(bit_set(bit_pos, ret_byte)) 

145 

146 seg_num = segment["number"] 

147 assert isinstance(seg_num, int) 

148 if seg_num <= 256: 

149 ref_format = ">B" 

150 elif seg_num <= 65536: 

151 ref_format = ">I" 

152 else: 

153 ref_format = ">L" 

154 

155 ref_size = calcsize(ref_format) 

156 

157 for _ref_index in range(ref_count): 

158 ref_data = self.stream.read(ref_size) 

159 ref = unpack_int(ref_format, ref_data) 

160 ref_segments.append(ref) 

161 

162 return { 

163 "ref_count": ref_count, 

164 "retain_segments": retain_segments, 

165 "ref_segments": ref_segments, 

166 } 

167 

168 def parse_page_assoc(self, segment: JBIG2Segment, page: int, field: bytes) -> int: 

169 if cast(JBIG2SegmentFlags, segment["flags"])["page_assoc_long"]: 

170 field += self.stream.read(3) 

171 page = unpack_int(">L", field) 

172 return page 

173 

174 def parse_data_length( 

175 self, 

176 segment: JBIG2Segment, 

177 length: int, 

178 field: bytes, 

179 ) -> int: 

180 if length: 

181 if ( 

182 cast(JBIG2SegmentFlags, segment["flags"])["type"] 

183 == SEG_TYPE_IMMEDIATE_GEN_REGION 

184 ) and (length == DATA_LEN_UNKNOWN): 

185 raise NotImplementedError( 

186 "Working with unknown segment length is not implemented yet", 

187 ) 

188 else: 

189 segment["raw_data"] = self.stream.read(length) 

190 

191 return length 

192 

193 

194class JBIG2StreamWriter: 

195 """Write JBIG2 segments to a file in JBIG2 format""" 

196 

197 EMPTY_RETENTION_FLAGS: ClassVar[JBIG2RetentionFlags] = { 

198 "ref_count": 0, 

199 "ref_segments": cast(list[int], []), 

200 "retain_segments": cast(list[bool], []), 

201 } 

202 

203 def __init__(self, stream: BinaryIO) -> None: 

204 self.stream = stream 

205 

206 def write_segments( 

207 self, 

208 segments: Iterable[JBIG2Segment], 

209 fix_last_page: bool = True, 

210 ) -> int: 

211 data_len = 0 

212 current_page: int | None = None 

213 seg_num: int | None = None 

214 

215 for segment in segments: 

216 data = self.encode_segment(segment) 

217 self.stream.write(data) 

218 data_len += len(data) 

219 

220 seg_num = cast(int | None, segment["number"]) 

221 

222 if fix_last_page: 

223 seg_page = cast(int, segment.get("page_assoc")) 

224 

225 if ( 

226 cast(JBIG2SegmentFlags, segment["flags"])["type"] 

227 == SEG_TYPE_END_OF_PAGE 

228 ): 

229 current_page = None 

230 elif seg_page: 

231 current_page = seg_page 

232 

233 if fix_last_page and current_page and (seg_num is not None): 

234 segment = self.get_eop_segment(seg_num + 1, current_page) 

235 data = self.encode_segment(segment) 

236 self.stream.write(data) 

237 data_len += len(data) 

238 

239 return data_len 

240 

241 def write_file( 

242 self, 

243 segments: Iterable[JBIG2Segment], 

244 fix_last_page: bool = True, 

245 ) -> int: 

246 header = FILE_HEADER_ID 

247 header_flags = FILE_HEAD_FLAG_SEQUENTIAL 

248 header += pack(">B", header_flags) 

249 # The embedded JBIG2 files in a PDF always 

250 # only have one page 

251 number_of_pages = pack(">L", 1) 

252 header += number_of_pages 

253 self.stream.write(header) 

254 data_len = len(header) 

255 

256 data_len += self.write_segments(segments, fix_last_page) 

257 

258 seg_num = 0 

259 for segment in segments: 

260 seg_num = cast(int, segment["number"]) 

261 

262 seg_num_offset = 2 if fix_last_page else 1 

263 eof_segment = self.get_eof_segment(seg_num + seg_num_offset) 

264 data = self.encode_segment(eof_segment) 

265 

266 self.stream.write(data) 

267 data_len += len(data) 

268 

269 return data_len 

270 

271 def encode_segment(self, segment: JBIG2Segment) -> bytes: 

272 data = b"" 

273 for field_format, name in SEG_STRUCT: 

274 value = segment.get(name) 

275 encoder = getattr(self, f"encode_{name}", None) 

276 if callable(encoder): 

277 field = encoder(value, segment) 

278 else: 

279 field = pack(field_format, value) 

280 data += field 

281 return data 

282 

283 def encode_flags(self, value: JBIG2SegmentFlags, segment: JBIG2Segment) -> bytes: 

284 flags = 0 

285 if value.get("deferred"): 

286 flags |= HEADER_FLAG_DEFERRED 

287 

288 if "page_assoc_long" in value: 

289 flags |= HEADER_FLAG_PAGE_ASSOC_LONG if value["page_assoc_long"] else flags 

290 else: 

291 flags |= ( 

292 HEADER_FLAG_PAGE_ASSOC_LONG 

293 if cast(int, segment.get("page", 0)) > 255 

294 else flags 

295 ) 

296 

297 flags |= mask_value(SEG_TYPE_MASK, value["type"]) 

298 

299 return pack(">B", flags) 

300 

301 def encode_retention_flags( 

302 self, 

303 value: JBIG2RetentionFlags, 

304 segment: JBIG2Segment, 

305 ) -> bytes: 

306 flags = [] 

307 flags_format = ">B" 

308 ref_count = value["ref_count"] 

309 assert isinstance(ref_count, int) 

310 retain_segments = cast(list[bool], value.get("retain_segments", [])) 

311 

312 if ref_count <= 4: 

313 flags_byte = mask_value(REF_COUNT_SHORT_MASK, ref_count) 

314 for ref_index, ref_retain in enumerate(retain_segments): 

315 if ref_retain: 

316 flags_byte |= 1 << ref_index 

317 flags.append(flags_byte) 

318 else: 

319 bytes_count = math.ceil((ref_count + 1) / 8) 

320 flags_format = ">L" + ("B" * bytes_count) 

321 flags_dword = mask_value(REF_COUNT_SHORT_MASK, REF_COUNT_LONG) << 24 

322 flags.append(flags_dword) 

323 

324 for byte_index in range(bytes_count): 

325 ret_byte = 0 

326 ret_part = retain_segments[byte_index * 8 : byte_index * 8 + 8] 

327 for bit_pos, ret_seg in enumerate(ret_part): 

328 ret_byte |= 1 << bit_pos if ret_seg else ret_byte 

329 

330 flags.append(ret_byte) 

331 

332 ref_segments = cast(list[int], value.get("ref_segments", [])) 

333 

334 seg_num = cast(int, segment["number"]) 

335 if seg_num <= 256: 

336 ref_format = "B" 

337 elif seg_num <= 65536: 

338 ref_format = "I" 

339 else: 

340 ref_format = "L" 

341 

342 for ref in ref_segments: 

343 flags_format += ref_format 

344 flags.append(ref) 

345 

346 return pack(flags_format, *flags) 

347 

348 def encode_data_length(self, value: int, segment: JBIG2Segment) -> bytes: 

349 data = pack(">L", value) 

350 data += cast(bytes, segment["raw_data"]) 

351 return data 

352 

353 def get_eop_segment(self, seg_number: int, page_number: int) -> JBIG2Segment: 

354 return { 

355 "data_length": 0, 

356 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_PAGE}, 

357 "number": seg_number, 

358 "page_assoc": page_number, 

359 "raw_data": b"", 

360 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS, 

361 } 

362 

363 def get_eof_segment(self, seg_number: int) -> JBIG2Segment: 

364 return { 

365 "data_length": 0, 

366 "flags": {"deferred": False, "type": SEG_TYPE_END_OF_FILE}, 

367 "number": seg_number, 

368 "page_assoc": 0, 

369 "raw_data": b"", 

370 "retention_flags": JBIG2StreamWriter.EMPTY_RETENTION_FLAGS, 

371 }