Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pikepdf/models/image.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

544 statements  

1# SPDX-FileCopyrightText: 2022 James R. Barlow 

2# SPDX-License-Identifier: MPL-2.0 

3 

4"""Extract images embedded in PDF.""" 

5 

6from __future__ import annotations 

7 

8from abc import ABC, abstractmethod 

9from collections.abc import Callable 

10from decimal import Decimal 

11from io import BytesIO 

12from itertools import zip_longest 

13from pathlib import Path 

14from shutil import copyfileobj 

15from typing import TYPE_CHECKING, Any, BinaryIO, NamedTuple, TypeVar, cast 

16 

17from pikepdf import jbig2 

18from pikepdf._core import Buffer, Pdf, PdfError, StreamDecodeLevel 

19from pikepdf._exceptions import DependencyError 

20from pikepdf.models import _transcoding 

21from pikepdf.models._transcoding import ImageDecompressionError 

22from pikepdf.objects import ( 

23 Array, 

24 Dictionary, 

25 Name, 

26 Object, 

27 Stream, 

28 String, 

29) 

30 

31if TYPE_CHECKING: 

32 from PIL import Image 

33 from PIL.ImageCms import ImageCmsProfile 

34 

35 

36T = TypeVar('T') 

37 

38RGBDecodeArray = tuple[float, float, float, float, float, float] 

39GrayDecodeArray = tuple[float, float] 

40CMYKDecodeArray = tuple[float, float, float, float, float, float, float, float] 

41DecodeArray = RGBDecodeArray | GrayDecodeArray | CMYKDecodeArray 

42 

43 

44class UnsupportedImageTypeError(Exception): 

45 """This image is formatted in a way pikepdf does not supported.""" 

46 

47 

48class NotExtractableError(Exception): 

49 """Indicates that an image cannot be directly extracted.""" 

50 

51 

52class HifiPrintImageNotTranscodableError(NotExtractableError): 

53 """Image contains high fidelity printing information and cannot be extracted.""" 

54 

55 

56class InvalidPdfImageError(Exception): 

57 """This image is not valid according to the PDF 1.7 specification.""" 

58 

59 

60def _array_str(value: Object | str | list): 

61 """Simplify pikepdf objects to array of str. Keep streams, dictionaries intact.""" 

62 

63 def _convert(item): 

64 if isinstance(item, list | Array): 

65 return [_convert(subitem) for subitem in item] 

66 if isinstance(item, Stream | Dictionary | bytes | int): 

67 return item 

68 if isinstance(item, Name | str): 

69 return str(item) 

70 if isinstance(item, (String)): 

71 return bytes(item) 

72 raise NotImplementedError(value) 

73 

74 result = _convert(value) 

75 if not isinstance(result, list): 

76 result = [result] 

77 return result 

78 

79 

80def _ensure_list(value: list[Object] | Dictionary | Array | Object) -> list[Object]: 

81 """Ensure value is a list of pikepdf.Object, if it was not already. 

82 

83 To support DecodeParms which can be present as either an array of dicts or a single 

84 dict. It's easier to convert to an array of one dict. 

85 """ 

86 if isinstance(value, list): 

87 return value 

88 return list(value.wrap_in_array().as_list()) 

89 

90 

91def _metadata_from_obj( 

92 obj: Dictionary | Stream, name: str, type_: Callable[[Any], T], default: T 

93) -> T | None: 

94 """Retrieve metadata from a dictionary or stream and wrangle types.""" 

95 val = getattr(obj, name, default) 

96 try: 

97 return type_(val) 

98 except TypeError: 

99 if val is None: 

100 return None 

101 raise NotImplementedError('Metadata access for ' + name) 

102 

103 

104class PaletteData(NamedTuple): 

105 """Returns the color space and binary representation of the palette. 

106 

107 ``base_colorspace`` is typically ``"RGB"`` or ``"L"`` (for grayscale). 

108 

109 ``palette`` is typically 256 or 256*3=768 bytes, for grayscale and RGB color 

110 respectively, with each unit/triplet being the grayscale/RGB triplet values. 

111 """ 

112 

113 base_colorspace: str 

114 palette: bytes 

115 

116 

117class PdfImageBase(ABC): 

118 """Abstract base class for images.""" 

119 

120 SIMPLE_COLORSPACES = {'/DeviceRGB', '/DeviceGray', '/CalRGB', '/CalGray'} 

121 MAIN_COLORSPACES = SIMPLE_COLORSPACES | {'/DeviceCMYK', '/CalCMYK', '/ICCBased'} 

122 PRINT_COLORSPACES = {'/Separation', '/DeviceN'} 

123 

124 @abstractmethod 

125 def _metadata(self, name: str, type_: Callable[[Any], T], default: T) -> T: 

126 """Get metadata for this image type.""" 

127 

128 @property 

129 def width(self) -> int: 

130 """Width of the image data in pixels.""" 

131 return self._metadata('Width', int, 0) 

132 

133 @property 

134 def height(self) -> int: 

135 """Height of the image data in pixels.""" 

136 return self._metadata('Height', int, 0) 

137 

138 @property 

139 def image_mask(self) -> bool: 

140 """Return ``True`` if this is an image mask.""" 

141 return self._metadata('ImageMask', bool, False) 

142 

143 @property 

144 def _bpc(self) -> int | None: 

145 """Bits per component for this image (low-level).""" 

146 return self._metadata('BitsPerComponent', int, 0) 

147 

148 @property 

149 def _colorspaces(self): 

150 """Colorspace (low-level).""" 

151 return self._metadata('ColorSpace', _array_str, []) 

152 

153 @property 

154 def filters(self): 

155 """List of names of the filters that we applied to encode this image.""" 

156 return self._metadata('Filter', _array_str, []) 

157 

158 @property 

159 def _decode_array(self) -> DecodeArray: 

160 """Extract the /Decode array.""" 

161 decode: list = self._metadata('Decode', _ensure_list, []) 

162 if decode and len(decode) in (2, 6, 8): 

163 return cast(DecodeArray, tuple(float(value) for value in decode)) 

164 

165 if self.colorspace in ('/DeviceGray', '/CalGray'): 

166 return (0.0, 1.0) 

167 if self.colorspace in ('/DeviceRGB', '/CalRGB'): 

168 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0) 

169 if self.colorspace == '/DeviceCMYK': 

170 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0) 

171 if self.colorspace == '/ICCBased': 

172 if self._approx_mode_from_icc() == 'L': 

173 return (0.0, 1.0) 

174 if self._approx_mode_from_icc() == 'RGB': 

175 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0) 

176 if self.image_mask: 

177 return (0.0, 1.0) # Default for image masks; per RM 8.9.6.2 

178 

179 raise NotImplementedError( 

180 "Don't how to retrieve default /Decode array for image" + repr(self) 

181 ) 

182 

183 @property 

184 def decode_parms(self): 

185 """List of the /DecodeParms, arguments to filters.""" 

186 return self._metadata('DecodeParms', _ensure_list, []) 

187 

188 @property 

189 def colorspace(self) -> str | None: 

190 """PDF name of the colorspace that best describes this image.""" 

191 if self.image_mask: 

192 return None # Undefined for image masks 

193 if self._colorspaces: 

194 if self._colorspaces[0] in self.MAIN_COLORSPACES: 

195 return self._colorspaces[0] 

196 if self._colorspaces[0] == '/Indexed': 

197 subspace = self._colorspaces[1] 

198 if isinstance(subspace, str) and subspace in self.MAIN_COLORSPACES: 

199 return subspace 

200 if isinstance(subspace, list) and subspace[0] in ( 

201 '/ICCBased', 

202 '/DeviceN', 

203 '/CalGray', 

204 '/CalRGB', 

205 ): 

206 return subspace[0] 

207 if self._colorspaces[0] == '/DeviceN': 

208 return '/DeviceN' 

209 

210 raise NotImplementedError( 

211 "not sure how to get colorspace: " + repr(self._colorspaces) 

212 ) 

213 

214 @property 

215 def bits_per_component(self) -> int: 

216 """Bits per component of this image.""" 

217 if self._bpc is None or self._bpc == 0: 

218 return 1 if self.image_mask else 8 

219 return self._bpc 

220 

221 @property 

222 @abstractmethod 

223 def icc(self) -> ImageCmsProfile | None: 

224 """Return ICC profile for this image if one is defined.""" 

225 

226 @property 

227 def indexed(self) -> bool: 

228 """Check if the image has a defined color palette.""" 

229 return '/Indexed' in self._colorspaces 

230 

231 def _colorspace_has_name(self, name): 

232 try: 

233 cs = self._colorspaces 

234 if cs[0] == '/Indexed' and cs[1][0] == name: 

235 return True 

236 if cs[0] == name: 

237 return True 

238 except (IndexError, AttributeError, KeyError): 

239 pass 

240 return False 

241 

242 @property 

243 def is_device_n(self) -> bool: 

244 """Check if image has a /DeviceN (complex printing) colorspace.""" 

245 return self._colorspace_has_name('/DeviceN') 

246 

247 @property 

248 def is_separation(self) -> bool: 

249 """Check if image has a /DeviceN (complex printing) colorspace.""" 

250 return self._colorspace_has_name('/Separation') 

251 

252 @property 

253 def size(self) -> tuple[int, int]: 

254 """Size of image as (width, height).""" 

255 return self.width, self.height 

256 

257 def _approx_mode_from_icc(self): 

258 if self.indexed: 

259 icc_profile = self._colorspaces[1][1] 

260 else: 

261 icc_profile = self._colorspaces[1] 

262 icc_profile_nchannels = int(icc_profile['/N']) 

263 

264 if icc_profile_nchannels == 1: 

265 return 'L' 

266 

267 # Multiple channels, need to open the profile and look 

268 mode_from_xcolor_space = {'RGB ': 'RGB', 'CMYK': 'CMYK'} 

269 xcolor_space = self.icc.profile.xcolor_space 

270 return mode_from_xcolor_space.get(xcolor_space, '') 

271 

272 @property 

273 def mode(self) -> str: 

274 """``PIL.Image.mode`` equivalent for this image, where possible. 

275 

276 If an ICC profile is attached to the image, we still attempt to resolve a Pillow 

277 mode. 

278 """ 

279 m = '' 

280 if self.is_device_n: 

281 m = 'DeviceN' 

282 elif self.is_separation: 

283 m = 'Separation' 

284 elif self.indexed: 

285 m = 'P' 

286 elif self.colorspace == '/DeviceGray' and self.bits_per_component == 1: 

287 m = '1' 

288 elif self.colorspace == '/DeviceGray' and self.bits_per_component > 1: 

289 m = 'L' 

290 elif self.colorspace == '/DeviceRGB': 

291 m = 'RGB' 

292 elif self.colorspace == '/DeviceCMYK': 

293 m = 'CMYK' 

294 elif self.colorspace == '/ICCBased': 

295 try: 

296 m = self._approx_mode_from_icc() 

297 except (ValueError, TypeError) as e: 

298 raise NotImplementedError( 

299 "Not sure how to handle PDF image of this type" 

300 ) from e 

301 if m == '': 

302 raise NotImplementedError( 

303 "Not sure how to handle PDF image of this type" 

304 ) from None 

305 return m 

306 

307 @property 

308 def filter_decodeparms(self): 

309 """Return normalized the Filter and DecodeParms data. 

310 

311 PDF has a lot of possible data structures concerning /Filter and 

312 /DecodeParms. /Filter can be absent or a name or an array, /DecodeParms 

313 can be absent or a dictionary (if /Filter is a name) or an array (if 

314 /Filter is an array). When both are arrays the lengths match. 

315 

316 Normalize this into: 

317 [(/FilterName, {/DecodeParmName: Value, ...}), ...] 

318 

319 The order of /Filter matters as indicates the encoding/decoding sequence. 

320 """ 

321 return list(zip_longest(self.filters, self.decode_parms, fillvalue={})) 

322 

323 @property 

324 def palette(self) -> PaletteData | None: 

325 """Retrieve the color palette for this image if applicable.""" 

326 if not self.indexed: 

327 return None 

328 try: 

329 _idx, base, _hival, lookup = self._colorspaces 

330 except ValueError as e: 

331 raise ValueError('Not sure how to interpret this palette') from e 

332 if self.icc or self.is_device_n or self.is_separation or isinstance(base, list): 

333 base = str(base[0]) 

334 else: 

335 base = str(base) 

336 lookup = bytes(lookup) 

337 if base not in self.MAIN_COLORSPACES and base not in self.PRINT_COLORSPACES: 

338 raise NotImplementedError(f"not sure how to interpret this palette: {base}") 

339 if base in ('/DeviceRGB', '/CalRGB'): 

340 base = 'RGB' 

341 elif base in ('/DeviceGray', '/CalGray'): 

342 base = 'L' 

343 elif base == '/DeviceCMYK': 

344 base = 'CMYK' 

345 elif base == '/DeviceN': 

346 base = 'DeviceN' 

347 elif base == '/Separation': 

348 base = 'Separation' 

349 elif base == '/ICCBased': 

350 base = self._approx_mode_from_icc() 

351 else: 

352 raise NotImplementedError(f"not sure how to interpret this palette: {base}") 

353 return PaletteData(base, lookup) 

354 

355 @abstractmethod 

356 def as_pil_image(self) -> Image.Image: 

357 """Convert this PDF image to a Python PIL (Pillow) image.""" 

358 

359 def _repr_png_(self) -> bytes: 

360 """Display hook for IPython/Jupyter.""" 

361 b = BytesIO() 

362 with self.as_pil_image() as im: 

363 im.save(b, 'PNG') 

364 return b.getvalue() 

365 

366 

367class PdfImage(PdfImageBase): 

368 """Support class to provide a consistent API for manipulating PDF images. 

369 

370 The data structure for images inside PDFs is irregular and complex, 

371 making it difficult to use without introducing errors for less 

372 typical cases. This class addresses these difficulties by providing a 

373 regular, Pythonic API similar in spirit (and convertible to) the Python 

374 Pillow imaging library. 

375 """ 

376 

377 obj: Stream 

378 _icc: ImageCmsProfile | None 

379 _pdf_source: Pdf | None 

380 

381 def __new__(cls, obj: Stream): 

382 """Construct a PdfImage... or a PdfJpxImage if that is what we really are.""" 

383 try: 

384 # Check if JPXDecode is called for and initialize as PdfJpxImage 

385 filters = _ensure_list(obj.Filter) 

386 if Name.JPXDecode in filters: 

387 return super().__new__(PdfJpxImage) 

388 except (AttributeError, KeyError): 

389 # __init__ will deal with any other errors 

390 pass 

391 return super().__new__(PdfImage) 

392 

393 def __init__(self, obj: Stream): 

394 """Construct a PDF image from a Image XObject inside a PDF. 

395 

396 ``pim = PdfImage(page.Resources.XObject['/ImageNN'])`` 

397 

398 Args: 

399 obj: an Image XObject 

400 """ 

401 if isinstance(obj, Stream) and obj.stream_dict.get("/Subtype") != "/Image": 

402 raise TypeError("can't construct PdfImage from non-image") 

403 self.obj = obj 

404 self._icc = None 

405 

406 def __eq__(self, other): 

407 if not isinstance(other, PdfImageBase): 

408 return NotImplemented 

409 return self.obj == other.obj 

410 

411 @classmethod 

412 def _from_pil_image(cls, *, pdf, page, name, image): # pragma: no cover 

413 """Insert a PIL image into a PDF (rudimentary). 

414 

415 Args: 

416 pdf (pikepdf.Pdf): the PDF to attach the image to 

417 page (pikepdf.Object): the page to attach the image to 

418 name (str or pikepdf.Name): the name to set the image 

419 image (PIL.Image.Image): the image to insert 

420 """ 

421 data = image.tobytes() 

422 

423 imstream = Stream(pdf, data) 

424 imstream.Type = Name('/XObject') 

425 imstream.Subtype = Name('/Image') 

426 if image.mode == 'RGB': 

427 imstream.ColorSpace = Name('/DeviceRGB') 

428 elif image.mode in ('1', 'L'): 

429 imstream.ColorSpace = Name('/DeviceGray') 

430 imstream.BitsPerComponent = 1 if image.mode == '1' else 8 

431 imstream.Width = image.width 

432 imstream.Height = image.height 

433 

434 page.Resources.XObject[name] = imstream 

435 

436 return cls(imstream) 

437 

438 def _metadata(self, name, type_, default): 

439 return _metadata_from_obj(self.obj, name, type_, default) 

440 

441 @property 

442 def _iccstream(self): 

443 if self.colorspace == '/ICCBased': 

444 if not self.indexed: 

445 return self._colorspaces[1] 

446 assert isinstance(self._colorspaces[1], list) 

447 return self._colorspaces[1][1] 

448 raise NotImplementedError("Don't know how to find ICC stream for image") 

449 

450 @property 

451 def icc(self) -> ImageCmsProfile | None: 

452 """If an ICC profile is attached, return a Pillow object that describe it. 

453 

454 Most of the information may be found in ``icc.profile``. 

455 """ 

456 if self.colorspace not in ('/ICCBased', '/Indexed'): 

457 return None 

458 if not self._icc: 

459 iccstream = self._iccstream 

460 iccbuffer = iccstream.get_stream_buffer() 

461 iccbytesio = BytesIO(iccbuffer) 

462 try: 

463 from PIL.ImageCms import ImageCmsProfile 

464 self._icc = ImageCmsProfile(iccbytesio) 

465 except OSError as e: 

466 if str(e) == 'cannot open profile from string': 

467 # ICC profile is corrupt 

468 raise UnsupportedImageTypeError( 

469 "ICC profile corrupt or not readable" 

470 ) from e 

471 return self._icc 

472 

473 def _remove_simple_filters(self): 

474 """Remove simple lossless compression where it appears.""" 

475 COMPLEX_FILTERS = { 

476 '/DCTDecode', 

477 '/JPXDecode', 

478 '/JBIG2Decode', 

479 '/CCITTFaxDecode', 

480 } 

481 indices = [n for n, filt in enumerate(self.filters) if filt in COMPLEX_FILTERS] 

482 if len(indices) > 1: 

483 raise NotImplementedError( 

484 f"Object {self.obj.objgen} has compound complex filters: " 

485 f"{self.filters}. We cannot decompress this." 

486 ) 

487 if len(indices) == 0: 

488 # No complex filter indices, so all filters are simple - remove them all 

489 return self.obj.read_bytes(StreamDecodeLevel.specialized), [] 

490 

491 n = indices[0] 

492 if n == 0: 

493 # The only filter is complex, so return 

494 return self.obj.read_raw_bytes(), self.filters 

495 

496 # Put copy in a temporary PDF to ensure we don't permanently modify self 

497 with Pdf.new() as tmp_pdf: 

498 obj_copy = tmp_pdf.copy_foreign(self.obj) 

499 obj_copy.Filter = Array([Name(f) for f in self.filters[:n]]) 

500 obj_copy.DecodeParms = Array(self.decode_parms[:n]) 

501 return obj_copy.read_bytes(StreamDecodeLevel.specialized), self.filters[n:] 

502 

503 def _extract_direct(self, *, stream: BinaryIO) -> str | None: 

504 """Attempt to extract the image directly to a usable image file. 

505 

506 If there is no way to extract the image without decompressing or 

507 transcoding then raise an exception. The type and format of image 

508 generated will vary. 

509 

510 Args: 

511 stream: Writable file stream to write data to, e.g. an open file 

512 """ 

513 

514 def normal_dct_rgb() -> bool: 

515 # Normal DCTDecode RGB images have the default value of 

516 # /ColorTransform 1 and are actually in YUV. Such a file can be 

517 # saved as a standard JPEG. RGB JPEGs without YUV conversion can't 

518 # be saved as JPEGs, and are probably bugs. Some software in the 

519 # wild actually produces RGB JPEGs in PDFs (probably a bug). 

520 DEFAULT_CT_RGB = 1 

521 ct = DEFAULT_CT_RGB 

522 if self.filter_decodeparms[0][1] is not None: 

523 ct = self.filter_decodeparms[0][1].get( 

524 '/ColorTransform', DEFAULT_CT_RGB 

525 ) 

526 return self.mode == 'RGB' and ct == DEFAULT_CT_RGB 

527 

528 def normal_dct_cmyk() -> bool: 

529 # Normal DCTDecode CMYKs have /ColorTransform 0 and can be saved. 

530 # There is a YUVK colorspace but CMYK JPEGs don't generally use it 

531 DEFAULT_CT_CMYK = 0 

532 ct = DEFAULT_CT_CMYK 

533 if self.filter_decodeparms[0][1] is not None: 

534 ct = self.filter_decodeparms[0][1].get( 

535 '/ColorTransform', DEFAULT_CT_CMYK 

536 ) 

537 return self.mode == 'CMYK' and ct == DEFAULT_CT_CMYK 

538 

539 data, filters = self._remove_simple_filters() 

540 

541 if filters == ['/CCITTFaxDecode']: 

542 if self.colorspace == '/ICCBased': 

543 icc = self._iccstream.read_bytes() 

544 else: 

545 icc = None 

546 stream.write(self._generate_ccitt_header(data, icc=icc)) 

547 stream.write(data) 

548 return '.tif' 

549 if filters == ['/DCTDecode'] and ( 

550 self.mode == 'L' or normal_dct_rgb() or normal_dct_cmyk() 

551 ): 

552 stream.write(data) 

553 return '.jpg' 

554 

555 return None 

556 

557 def _extract_transcoded_1248bits(self) -> Image.Image: 

558 """Extract an image when there are 1/2/4/8 bits packed in byte data.""" 

559 stride = 0 # tell Pillow to calculate stride from line width 

560 scale = 0 if self.mode == 'L' else 1 

561 if self.bits_per_component in (2, 4): 

562 buffer, stride = _transcoding.unpack_subbyte_pixels( 

563 self.read_bytes(), self.size, self.bits_per_component, scale 

564 ) 

565 elif self.bits_per_component == 8: 

566 buffer = cast(memoryview, self.get_stream_buffer()) 

567 else: 

568 raise InvalidPdfImageError("BitsPerComponent must be 1, 2, 4, 8, or 16") 

569 

570 if self.mode == 'P' and self.palette is not None: 

571 base_mode, palette = self.palette 

572 im = _transcoding.image_from_buffer_and_palette( 

573 buffer, 

574 self.size, 

575 stride, 

576 base_mode, 

577 palette, 

578 ) 

579 else: 

580 im = _transcoding.image_from_byte_buffer(buffer, self.size, stride) 

581 return im 

582 

583 def _extract_transcoded_1bit(self) -> Image.Image: 

584 from PIL import Image 

585 

586 if not self.image_mask and self.mode in ('RGB', 'CMYK'): 

587 raise UnsupportedImageTypeError("1-bit RGB and CMYK are not supported") 

588 try: 

589 data = self.read_bytes() 

590 except (RuntimeError, PdfError) as e: 

591 if ( 

592 'read_bytes called on unfilterable stream' in str(e) 

593 and not jbig2.get_decoder().available() 

594 ): 

595 raise DependencyError( 

596 "jbig2dec - not installed or installed version is too old " 

597 "(older than version 0.15)" 

598 ) from None 

599 raise 

600 

601 im = Image.frombytes('1', self.size, data) 

602 

603 if self.palette is not None: 

604 base_mode, palette = self.palette 

605 im = _transcoding.fix_1bit_palette_image(im, base_mode, palette) 

606 

607 return im 

608 

609 def _extract_transcoded_mask(self) -> Image.Image: 

610 return self._extract_transcoded_1bit() 

611 

612 def _extract_transcoded(self) -> Image.Image: 

613 from PIL import Image 

614 if self.image_mask: 

615 return self._extract_transcoded_mask() 

616 

617 if self.mode in {'DeviceN', 'Separation'}: 

618 raise HifiPrintImageNotTranscodableError() 

619 

620 if self.mode == 'RGB' and self.bits_per_component == 8: 

621 # Cannot use the zero-copy .get_stream_buffer here, we have 3-byte 

622 # RGB and Pillow needs RGBX. 

623 im = Image.frombuffer( 

624 'RGB', self.size, self.read_bytes(), 'raw', 'RGB', 0, 1 

625 ) 

626 elif self.mode == 'CMYK' and self.bits_per_component == 8: 

627 im = Image.frombuffer( 

628 'CMYK', self.size, self.get_stream_buffer(), 'raw', 'CMYK', 0, 1 

629 ) 

630 # elif self.mode == '1': 

631 elif self.bits_per_component == 1: 

632 im = self._extract_transcoded_1bit() 

633 elif self.mode in ('L', 'P') and self.bits_per_component <= 8: 

634 im = self._extract_transcoded_1248bits() 

635 else: 

636 raise UnsupportedImageTypeError(repr(self) + ", " + repr(self.obj)) 

637 

638 if self.colorspace == '/ICCBased' and self.icc is not None: 

639 im.info['icc_profile'] = self.icc.tobytes() 

640 

641 return im 

642 

643 def _extract_to_stream(self, *, stream: BinaryIO) -> str: 

644 """Extract the image to a stream. 

645 

646 If possible, the compressed data is extracted and inserted into 

647 a compressed image file format without transcoding the compressed 

648 content. If this is not possible, the data will be decompressed 

649 and extracted to an appropriate format. 

650 

651 Args: 

652 stream: Writable stream to write data to 

653 

654 Returns: 

655 The file format extension. 

656 """ 

657 direct_extraction = self._extract_direct(stream=stream) 

658 if direct_extraction: 

659 return direct_extraction 

660 

661 im = None 

662 try: 

663 im = self._extract_transcoded() 

664 if im.mode == 'CMYK': 

665 im.save(stream, format='tiff', compression='tiff_adobe_deflate') 

666 return '.tiff' 

667 if im: 

668 im.save(stream, format='png') 

669 return '.png' 

670 except PdfError as e: 

671 if 'called on unfilterable stream' in str(e): 

672 raise UnsupportedImageTypeError(repr(self)) from e 

673 raise 

674 finally: 

675 if im: 

676 im.close() 

677 

678 raise UnsupportedImageTypeError(repr(self)) 

679 

680 def extract_to( 

681 self, *, stream: BinaryIO | None = None, fileprefix: str = '' 

682 ) -> str: 

683 """Extract the image directly to a usable image file. 

684 

685 If possible, the compressed data is extracted and inserted into 

686 a compressed image file format without transcoding the compressed 

687 content. If this is not possible, the data will be decompressed 

688 and extracted to an appropriate format. 

689 

690 Because it is not known until attempted what image format will be 

691 extracted, users should not assume what format they are getting back. 

692 When saving the image to a file, use a temporary filename, and then 

693 rename the file to its final name based on the returned file extension. 

694 

695 Images might be saved as any of .png, .jpg, or .tiff. 

696 

697 Examples: 

698 >>> im.extract_to(stream=bytes_io) # doctest: +SKIP 

699 '.png' 

700 

701 >>> im.extract_to(fileprefix='/tmp/image00') # doctest: +SKIP 

702 '/tmp/image00.jpg' 

703 

704 Args: 

705 stream: Writable stream to write data to. 

706 fileprefix (str or Path): The path to write the extracted image to, 

707 without the file extension. 

708 

709 Returns: 

710 If *fileprefix* was provided, then the fileprefix with the 

711 appropriate extension. If no *fileprefix*, then an extension 

712 indicating the file type. 

713 """ 

714 if bool(stream) == bool(fileprefix): 

715 raise ValueError("Cannot set both stream and fileprefix") 

716 if stream: 

717 return self._extract_to_stream(stream=stream) 

718 

719 bio = BytesIO() 

720 extension = self._extract_to_stream(stream=bio) 

721 bio.seek(0) 

722 filepath = Path(str(Path(fileprefix)) + extension) 

723 with filepath.open('wb') as target: 

724 copyfileobj(bio, target) 

725 return str(filepath) 

726 

727 def read_bytes( 

728 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized 

729 ) -> bytes: 

730 """Decompress this image and return it as unencoded bytes.""" 

731 return self.obj.read_bytes(decode_level=decode_level) 

732 

733 def get_stream_buffer( 

734 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized 

735 ) -> Buffer: 

736 """Access this image with the buffer protocol.""" 

737 return self.obj.get_stream_buffer(decode_level=decode_level) 

738 

739 def as_pil_image(self) -> Image.Image: 

740 """Extract the image as a Pillow Image, using decompression as necessary. 

741 

742 Caller must close the image. 

743 """ 

744 from PIL import Image 

745 

746 bio = BytesIO() 

747 direct_extraction = self._extract_direct(stream=bio) 

748 if direct_extraction: 

749 bio.seek(0) 

750 return Image.open(bio) 

751 

752 im = self._extract_transcoded() 

753 if not im: 

754 raise UnsupportedImageTypeError(repr(self)) 

755 

756 return im 

757 

758 def _generate_ccitt_header(self, data: bytes, icc: bytes | None = None) -> bytes: 

759 """Construct a CCITT G3 or G4 header from the PDF metadata.""" 

760 # https://stackoverflow.com/questions/2641770/ 

761 # https://www.itu.int/itudoc/itu-t/com16/tiff-fx/docs/tiff6.pdf 

762 

763 if not self.decode_parms: 

764 raise ValueError("/CCITTFaxDecode without /DecodeParms") 

765 

766 expected_defaults = [ 

767 ("/EncodedByteAlign", False), 

768 ] 

769 for name, val in expected_defaults: 

770 if self.decode_parms[0].get(name, val) != val: 

771 raise UnsupportedImageTypeError( 

772 f"/CCITTFaxDecode with decode parameter {name} not equal {val}" 

773 ) 

774 

775 k = self.decode_parms[0].get("/K", 0) 

776 t4_options = None 

777 if k < 0: 

778 ccitt_group = 4 # Group 4 

779 elif k > 0: 

780 ccitt_group = 3 # Group 3 2-D 

781 t4_options = 1 

782 else: 

783 ccitt_group = 3 # Group 3 1-D 

784 black_is_one = self.decode_parms[0].get("/BlackIs1", False) 

785 decode = self._decode_array 

786 # PDF spec says: 

787 # BlackIs1: A flag indicating whether 1 bits shall be interpreted as black 

788 # pixels and 0 bits as white pixels, the reverse of the normal 

789 # PDF convention for image data. Default value: false. 

790 # TIFF spec says: 

791 # use 0 for white_is_zero (=> black is 1) MINISWHITE 

792 # use 1 for black_is_zero (=> white is 1) MINISBLACK 

793 photometry = 1 if black_is_one else 0 

794 

795 # If Decode is [1, 0] then the photometry is inverted 

796 if len(decode) == 2 and decode == (1.0, 0.0): 

797 photometry = 1 - photometry 

798 

799 img_size = len(data) 

800 if icc is None: 

801 icc = b'' 

802 

803 return _transcoding.generate_ccitt_header( 

804 self.size, 

805 data_length=img_size, 

806 ccitt_group=ccitt_group, 

807 t4_options=t4_options, 

808 photometry=photometry, 

809 icc=icc, 

810 ) 

811 

812 def show(self): # pragma: no cover 

813 """Show the image however PIL wants to.""" 

814 self.as_pil_image().show() 

815 

816 def _set_pdf_source(self, pdf: Pdf): 

817 self._pdf_source = pdf 

818 

819 def __repr__(self): 

820 try: 

821 mode = self.mode 

822 except NotImplementedError: 

823 mode = '?' 

824 return ( 

825 f'<pikepdf.PdfImage image mode={mode} ' 

826 f'size={self.width}x{self.height} at {hex(id(self))}>' 

827 ) 

828 

829 

830class PdfJpxImage(PdfImage): 

831 """Support class for JPEG 2000 images. Implements the same API as :class:`PdfImage`. 

832 

833 If you call PdfImage(object_that_is_actually_jpeg2000_image), pikepdf will return 

834 this class instead, due to the check in PdfImage.__new__. 

835 """ 

836 

837 def __init__(self, obj): 

838 """Initialize a JPEG 2000 image.""" 

839 super().__init__(obj) 

840 self._jpxpil = self.as_pil_image() 

841 

842 def __eq__(self, other): 

843 if not isinstance(other, PdfImageBase): 

844 return NotImplemented 

845 return ( 

846 self.obj == other.obj 

847 and isinstance(other, PdfJpxImage) 

848 and self._jpxpil == other._jpxpil 

849 ) 

850 

851 def _extract_direct(self, *, stream: BinaryIO) -> str | None: 

852 data, filters = self._remove_simple_filters() 

853 if filters != ['/JPXDecode']: 

854 return None 

855 stream.write(data) 

856 return '.jp2' 

857 

858 def _extract_transcoded(self) -> Image.Image: 

859 return super()._extract_transcoded() 

860 

861 @property 

862 def _colorspaces(self): 

863 """Return the effective colorspace of a JPEG 2000 image. 

864 

865 If the ColorSpace dictionary is present, the colorspace embedded in the 

866 JPEG 2000 data will be ignored, as required by the specification. 

867 """ 

868 # (PDF 1.7 Table 89) If ColorSpace is present, any colour space 

869 # specifications in the JPEG2000 data shall be ignored. 

870 super_colorspaces = super()._colorspaces 

871 if super_colorspaces: 

872 return super_colorspaces 

873 if self._jpxpil.mode == 'L': 

874 return ['/DeviceGray'] 

875 if self._jpxpil.mode == 'RGB': 

876 return ['/DeviceRGB'] 

877 raise NotImplementedError('Complex JP2 colorspace') 

878 

879 @property 

880 def _bpc(self) -> int: 

881 """Return 8, since bpc is not meaningful for JPEG 2000 encoding.""" 

882 # (PDF 1.7 Table 89) If the image stream uses the JPXDecode filter, this 

883 # entry is optional and shall be ignored if present. The bit depth is 

884 # determined by the conforming reader in the process of decoding the 

885 # JPEG2000 image. 

886 return 8 

887 

888 @property 

889 def indexed(self) -> bool: 

890 """Return False, since JPEG 2000 should not be indexed.""" 

891 # Nothing in the spec precludes an Indexed JPXDecode image, except for 

892 # the fact that doing so is madness. Let's assume it no one is that 

893 # insane. 

894 return False 

895 

896 def __repr__(self): 

897 return ( 

898 f'<pikepdf.PdfJpxImage JPEG2000 image mode={self.mode} ' 

899 f'size={self.width}x{self.height} at {hex(id(self))}>' 

900 ) 

901 

902 

903class PdfInlineImage(PdfImageBase): 

904 """Support class for PDF inline images.""" 

905 

906 # Inline images can contain abbreviations that we write automatically 

907 ABBREVS = { 

908 b'/W': b'/Width', 

909 b'/H': b'/Height', 

910 b'/BPC': b'/BitsPerComponent', 

911 b'/IM': b'/ImageMask', 

912 b'/CS': b'/ColorSpace', 

913 b'/F': b'/Filter', 

914 b'/DP': b'/DecodeParms', 

915 b'/G': b'/DeviceGray', 

916 b'/RGB': b'/DeviceRGB', 

917 b'/CMYK': b'/DeviceCMYK', 

918 b'/I': b'/Indexed', 

919 b'/AHx': b'/ASCIIHexDecode', 

920 b'/A85': b'/ASCII85Decode', 

921 b'/LZW': b'/LZWDecode', 

922 b'/RL': b'/RunLengthDecode', 

923 b'/CCF': b'/CCITTFaxDecode', 

924 b'/DCT': b'/DCTDecode', 

925 } 

926 REVERSE_ABBREVS = {v: k for k, v in ABBREVS.items()} 

927 

928 _data: Object 

929 _image_object: tuple[Object, ...] 

930 

931 def __init__(self, *, image_data: Object, image_object: tuple): 

932 """Construct wrapper for inline image. 

933 

934 Args: 

935 image_data: data stream for image, extracted from content stream 

936 image_object: the metadata for image, also from content stream 

937 """ 

938 # Convert the sequence of pikepdf.Object from the content stream into 

939 # a dictionary object by unparsing it (to bytes), eliminating inline 

940 # image abbreviations, and constructing a bytes string equivalent to 

941 # what an image XObject would look like. Then retrieve data from there 

942 

943 self._data = image_data 

944 self._image_object = image_object 

945 

946 reparse = b' '.join( 

947 self._unparse_obj(obj, remap_names=self.ABBREVS) for obj in image_object 

948 ) 

949 try: 

950 reparsed_obj = Object.parse(b'<< ' + reparse + b' >>') 

951 except PdfError as e: 

952 raise PdfError("parsing inline " + reparse.decode('unicode_escape')) from e 

953 self.obj = reparsed_obj 

954 

955 def __eq__(self, other): 

956 if not isinstance(other, PdfImageBase): 

957 return NotImplemented 

958 return ( 

959 self.obj == other.obj 

960 and isinstance(other, PdfInlineImage) 

961 and ( 

962 self._data._inline_image_raw_bytes() 

963 == other._data._inline_image_raw_bytes() 

964 ) 

965 ) 

966 

967 @classmethod 

968 def _unparse_obj(cls, obj, remap_names): 

969 if isinstance(obj, Object): 

970 if isinstance(obj, Name): 

971 name = obj.unparse(resolved=True) 

972 assert isinstance(name, bytes) 

973 return remap_names.get(name, name) 

974 return obj.unparse(resolved=True) 

975 if isinstance(obj, bool): 

976 return b'true' if obj else b'false' # Lower case for PDF spec 

977 if isinstance(obj, int | Decimal | float): 

978 return str(obj).encode('ascii') 

979 raise NotImplementedError(repr(obj)) 

980 

981 def _metadata(self, name, type_, default): 

982 return _metadata_from_obj(self.obj, name, type_, default) 

983 

984 def unparse(self) -> bytes: 

985 """Create the content stream bytes that reproduce this inline image.""" 

986 

987 def metadata_tokens(): 

988 for metadata_obj in self._image_object: 

989 unparsed = self._unparse_obj( 

990 metadata_obj, remap_names=self.REVERSE_ABBREVS 

991 ) 

992 assert isinstance(unparsed, bytes) 

993 yield unparsed 

994 

995 def inline_image_tokens(): 

996 yield b'BI\n' 

997 yield b' '.join(m for m in metadata_tokens()) 

998 yield b'\nID\n' 

999 yield self._data._inline_image_raw_bytes() 

1000 yield b'EI' 

1001 

1002 return b''.join(inline_image_tokens()) 

1003 

1004 @property 

1005 def icc(self): # pragma: no cover 

1006 """Raise an exception since ICC profiles are not supported on inline images.""" 

1007 raise InvalidPdfImageError( 

1008 "Inline images with ICC profiles are not supported in the PDF specification" 

1009 ) 

1010 

1011 def __repr__(self): 

1012 try: 

1013 mode = self.mode 

1014 except NotImplementedError: 

1015 mode = '?' 

1016 return ( 

1017 f'<pikepdf.PdfInlineImage image mode={mode} ' 

1018 f'size={self.width}x{self.height} at {hex(id(self))}>' 

1019 ) 

1020 

1021 def _convert_to_pdfimage(self) -> PdfImage: 

1022 # Construct a temporary PDF that holds this inline image, and... 

1023 tmppdf = Pdf.new() 

1024 tmppdf.add_blank_page(page_size=(self.width, self.height)) 

1025 tmppdf.pages[0].contents_add( 

1026 f'{self.width} 0 0 {self.height} 0 0 cm'.encode('ascii'), prepend=True 

1027 ) 

1028 tmppdf.pages[0].contents_add(self.unparse()) 

1029 

1030 # ...externalize it, 

1031 tmppdf.pages[0].externalize_inline_images() 

1032 raw_img = cast(Stream, next(im for im in tmppdf.pages[0].images.values())) 

1033 

1034 # ...then use the regular PdfImage API to extract it. 

1035 img = PdfImage(raw_img) 

1036 img._set_pdf_source(tmppdf) # Hold tmppdf open while PdfImage exists 

1037 return img 

1038 

1039 def as_pil_image(self) -> Image.Image: 

1040 """Return inline image as a Pillow Image.""" 

1041 return self._convert_to_pdfimage().as_pil_image() 

1042 

1043 def extract_to(self, *, stream: BinaryIO | None = None, fileprefix: str = ''): 

1044 """Extract the inline image directly to a usable image file. 

1045 

1046 See: 

1047 :meth:`PdfImage.extract_to` 

1048 """ 

1049 return self._convert_to_pdfimage().extract_to( 

1050 stream=stream, fileprefix=fileprefix 

1051 ) 

1052 

1053 def read_bytes(self): 

1054 """Return decompressed image bytes.""" 

1055 # qpdf does not have an API to return this directly, so convert it. 

1056 return self._convert_to_pdfimage().read_bytes() 

1057 

1058 def get_stream_buffer(self): 

1059 """Return decompressed stream buffer.""" 

1060 # qpdf does not have an API to return this directly, so convert it. 

1061 return self._convert_to_pdfimage().get_stream_buffer() 

1062 

1063 

1064__all__ = [ 

1065 'CMYKDecodeArray', 

1066 'DecodeArray', 

1067 'HifiPrintImageNotTranscodableError', 

1068 'ImageDecompressionError', 

1069 'InvalidPdfImageError', 

1070 'PaletteData', 

1071 'PdfImage', 

1072 'PdfImageBase', 

1073 'PdfInlineImage', 

1074 'PdfJpxImage', 

1075 'RGBDecodeArray', 

1076 'UnsupportedImageTypeError', 

1077]