Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/pikepdf/models/image.py: 26%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

539 statements  

1# SPDX-FileCopyrightText: 2022 James R. Barlow 

2# SPDX-License-Identifier: MPL-2.0 

3 

4"""Extract images embedded in PDF.""" 

5 

6from __future__ import annotations 

7 

8from abc import ABC, abstractmethod 

9from collections.abc import Callable 

10from decimal import Decimal 

11from io import BytesIO 

12from itertools import zip_longest 

13from pathlib import Path 

14from shutil import copyfileobj 

15from typing import Any, BinaryIO, NamedTuple, TypeVar, cast 

16 

17from PIL import Image 

18from PIL.ImageCms import ImageCmsProfile 

19 

20from pikepdf import jbig2 

21from pikepdf._core import Buffer, Pdf, PdfError, StreamDecodeLevel 

22from pikepdf._exceptions import DependencyError 

23from pikepdf.models import _transcoding 

24from pikepdf.models._transcoding import ImageDecompressionError 

25from pikepdf.objects import ( 

26 Array, 

27 Dictionary, 

28 Name, 

29 Object, 

30 Stream, 

31 String, 

32) 

33 

34T = TypeVar('T') 

35 

36RGBDecodeArray = tuple[float, float, float, float, float, float] 

37GrayDecodeArray = tuple[float, float] 

38CMYKDecodeArray = tuple[float, float, float, float, float, float, float, float] 

39DecodeArray = RGBDecodeArray | GrayDecodeArray | CMYKDecodeArray 

40 

41 

42class UnsupportedImageTypeError(Exception): 

43 """This image is formatted in a way pikepdf does not supported.""" 

44 

45 

46class NotExtractableError(Exception): 

47 """Indicates that an image cannot be directly extracted.""" 

48 

49 

50class HifiPrintImageNotTranscodableError(NotExtractableError): 

51 """Image contains high fidelity printing information and cannot be extracted.""" 

52 

53 

54class InvalidPdfImageError(Exception): 

55 """This image is not valid according to the PDF 1.7 specification.""" 

56 

57 

58def _array_str(value: Object | str | list): 

59 """Simplify pikepdf objects to array of str. Keep streams, dictionaries intact.""" 

60 

61 def _convert(item): 

62 if isinstance(item, list | Array): 

63 return [_convert(subitem) for subitem in item] 

64 if isinstance(item, Stream | Dictionary | bytes | int): 

65 return item 

66 if isinstance(item, Name | str): 

67 return str(item) 

68 if isinstance(item, (String)): 

69 return bytes(item) 

70 raise NotImplementedError(value) 

71 

72 result = _convert(value) 

73 if not isinstance(result, list): 

74 result = [result] 

75 return result 

76 

77 

78def _ensure_list(value: list[Object] | Dictionary | Array | Object) -> list[Object]: 

79 """Ensure value is a list of pikepdf.Object, if it was not already. 

80 

81 To support DecodeParms which can be present as either an array of dicts or a single 

82 dict. It's easier to convert to an array of one dict. 

83 """ 

84 if isinstance(value, list): 

85 return value 

86 return list(value.wrap_in_array().as_list()) 

87 

88 

89def _metadata_from_obj( 

90 obj: Dictionary | Stream, name: str, type_: Callable[[Any], T], default: T 

91) -> T | None: 

92 """Retrieve metadata from a dictionary or stream and wrangle types.""" 

93 val = getattr(obj, name, default) 

94 try: 

95 return type_(val) 

96 except TypeError: 

97 if val is None: 

98 return None 

99 raise NotImplementedError('Metadata access for ' + name) 

100 

101 

102class PaletteData(NamedTuple): 

103 """Returns the color space and binary representation of the palette. 

104 

105 ``base_colorspace`` is typically ``"RGB"`` or ``"L"`` (for grayscale). 

106 

107 ``palette`` is typically 256 or 256*3=768 bytes, for grayscale and RGB color 

108 respectively, with each unit/triplet being the grayscale/RGB triplet values. 

109 """ 

110 

111 base_colorspace: str 

112 palette: bytes 

113 

114 

115class PdfImageBase(ABC): 

116 """Abstract base class for images.""" 

117 

118 SIMPLE_COLORSPACES = {'/DeviceRGB', '/DeviceGray', '/CalRGB', '/CalGray'} 

119 MAIN_COLORSPACES = SIMPLE_COLORSPACES | {'/DeviceCMYK', '/CalCMYK', '/ICCBased'} 

120 PRINT_COLORSPACES = {'/Separation', '/DeviceN'} 

121 

122 @abstractmethod 

123 def _metadata(self, name: str, type_: Callable[[Any], T], default: T) -> T: 

124 """Get metadata for this image type.""" 

125 

126 @property 

127 def width(self) -> int: 

128 """Width of the image data in pixels.""" 

129 return self._metadata('Width', int, 0) 

130 

131 @property 

132 def height(self) -> int: 

133 """Height of the image data in pixels.""" 

134 return self._metadata('Height', int, 0) 

135 

136 @property 

137 def image_mask(self) -> bool: 

138 """Return ``True`` if this is an image mask.""" 

139 return self._metadata('ImageMask', bool, False) 

140 

141 @property 

142 def _bpc(self) -> int | None: 

143 """Bits per component for this image (low-level).""" 

144 return self._metadata('BitsPerComponent', int, 0) 

145 

146 @property 

147 def _colorspaces(self): 

148 """Colorspace (low-level).""" 

149 return self._metadata('ColorSpace', _array_str, []) 

150 

151 @property 

152 def filters(self): 

153 """List of names of the filters that we applied to encode this image.""" 

154 return self._metadata('Filter', _array_str, []) 

155 

156 @property 

157 def _decode_array(self) -> DecodeArray: 

158 """Extract the /Decode array.""" 

159 decode: list = self._metadata('Decode', _ensure_list, []) 

160 if decode and len(decode) in (2, 6, 8): 

161 return cast(DecodeArray, tuple(float(value) for value in decode)) 

162 

163 if self.colorspace in ('/DeviceGray', '/CalGray'): 

164 return (0.0, 1.0) 

165 if self.colorspace in ('/DeviceRGB', '/CalRGB'): 

166 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0) 

167 if self.colorspace == '/DeviceCMYK': 

168 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0) 

169 if self.colorspace == '/ICCBased': 

170 if self._approx_mode_from_icc() == 'L': 

171 return (0.0, 1.0) 

172 if self._approx_mode_from_icc() == 'RGB': 

173 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0) 

174 if self.image_mask: 

175 return (0.0, 1.0) # Default for image masks; per RM 8.9.6.2 

176 

177 raise NotImplementedError( 

178 "Don't how to retrieve default /Decode array for image" + repr(self) 

179 ) 

180 

181 @property 

182 def decode_parms(self): 

183 """List of the /DecodeParms, arguments to filters.""" 

184 return self._metadata('DecodeParms', _ensure_list, []) 

185 

186 @property 

187 def colorspace(self) -> str | None: 

188 """PDF name of the colorspace that best describes this image.""" 

189 if self.image_mask: 

190 return None # Undefined for image masks 

191 if self._colorspaces: 

192 if self._colorspaces[0] in self.MAIN_COLORSPACES: 

193 return self._colorspaces[0] 

194 if self._colorspaces[0] == '/Indexed': 

195 subspace = self._colorspaces[1] 

196 if isinstance(subspace, str) and subspace in self.MAIN_COLORSPACES: 

197 return subspace 

198 if isinstance(subspace, list) and subspace[0] in ( 

199 '/ICCBased', 

200 '/DeviceN', 

201 '/CalGray', 

202 '/CalRGB', 

203 ): 

204 return subspace[0] 

205 if self._colorspaces[0] == '/DeviceN': 

206 return '/DeviceN' 

207 

208 raise NotImplementedError( 

209 "not sure how to get colorspace: " + repr(self._colorspaces) 

210 ) 

211 

212 @property 

213 def bits_per_component(self) -> int: 

214 """Bits per component of this image.""" 

215 if self._bpc is None or self._bpc == 0: 

216 return 1 if self.image_mask else 8 

217 return self._bpc 

218 

219 @property 

220 @abstractmethod 

221 def icc(self) -> ImageCmsProfile | None: 

222 """Return ICC profile for this image if one is defined.""" 

223 

224 @property 

225 def indexed(self) -> bool: 

226 """Check if the image has a defined color palette.""" 

227 return '/Indexed' in self._colorspaces 

228 

229 def _colorspace_has_name(self, name): 

230 try: 

231 cs = self._colorspaces 

232 if cs[0] == '/Indexed' and cs[1][0] == name: 

233 return True 

234 if cs[0] == name: 

235 return True 

236 except (IndexError, AttributeError, KeyError): 

237 pass 

238 return False 

239 

240 @property 

241 def is_device_n(self) -> bool: 

242 """Check if image has a /DeviceN (complex printing) colorspace.""" 

243 return self._colorspace_has_name('/DeviceN') 

244 

245 @property 

246 def is_separation(self) -> bool: 

247 """Check if image has a /DeviceN (complex printing) colorspace.""" 

248 return self._colorspace_has_name('/Separation') 

249 

250 @property 

251 def size(self) -> tuple[int, int]: 

252 """Size of image as (width, height).""" 

253 return self.width, self.height 

254 

255 def _approx_mode_from_icc(self): 

256 if self.indexed: 

257 icc_profile = self._colorspaces[1][1] 

258 else: 

259 icc_profile = self._colorspaces[1] 

260 icc_profile_nchannels = int(icc_profile['/N']) 

261 

262 if icc_profile_nchannels == 1: 

263 return 'L' 

264 

265 # Multiple channels, need to open the profile and look 

266 mode_from_xcolor_space = {'RGB ': 'RGB', 'CMYK': 'CMYK'} 

267 xcolor_space = self.icc.profile.xcolor_space 

268 return mode_from_xcolor_space.get(xcolor_space, '') 

269 

270 @property 

271 def mode(self) -> str: 

272 """``PIL.Image.mode`` equivalent for this image, where possible. 

273 

274 If an ICC profile is attached to the image, we still attempt to resolve a Pillow 

275 mode. 

276 """ 

277 m = '' 

278 if self.is_device_n: 

279 m = 'DeviceN' 

280 elif self.is_separation: 

281 m = 'Separation' 

282 elif self.indexed: 

283 m = 'P' 

284 elif self.colorspace == '/DeviceGray' and self.bits_per_component == 1: 

285 m = '1' 

286 elif self.colorspace == '/DeviceGray' and self.bits_per_component > 1: 

287 m = 'L' 

288 elif self.colorspace == '/DeviceRGB': 

289 m = 'RGB' 

290 elif self.colorspace == '/DeviceCMYK': 

291 m = 'CMYK' 

292 elif self.colorspace == '/ICCBased': 

293 try: 

294 m = self._approx_mode_from_icc() 

295 except (ValueError, TypeError) as e: 

296 raise NotImplementedError( 

297 "Not sure how to handle PDF image of this type" 

298 ) from e 

299 if m == '': 

300 raise NotImplementedError( 

301 "Not sure how to handle PDF image of this type" 

302 ) from None 

303 return m 

304 

305 @property 

306 def filter_decodeparms(self): 

307 """Return normalized the Filter and DecodeParms data. 

308 

309 PDF has a lot of possible data structures concerning /Filter and 

310 /DecodeParms. /Filter can be absent or a name or an array, /DecodeParms 

311 can be absent or a dictionary (if /Filter is a name) or an array (if 

312 /Filter is an array). When both are arrays the lengths match. 

313 

314 Normalize this into: 

315 [(/FilterName, {/DecodeParmName: Value, ...}), ...] 

316 

317 The order of /Filter matters as indicates the encoding/decoding sequence. 

318 """ 

319 return list(zip_longest(self.filters, self.decode_parms, fillvalue={})) 

320 

321 @property 

322 def palette(self) -> PaletteData | None: 

323 """Retrieve the color palette for this image if applicable.""" 

324 if not self.indexed: 

325 return None 

326 try: 

327 _idx, base, _hival, lookup = self._colorspaces 

328 except ValueError as e: 

329 raise ValueError('Not sure how to interpret this palette') from e 

330 if self.icc or self.is_device_n or self.is_separation or isinstance(base, list): 

331 base = str(base[0]) 

332 else: 

333 base = str(base) 

334 lookup = bytes(lookup) 

335 if base not in self.MAIN_COLORSPACES and base not in self.PRINT_COLORSPACES: 

336 raise NotImplementedError(f"not sure how to interpret this palette: {base}") 

337 if base in ('/DeviceRGB', '/CalRGB'): 

338 base = 'RGB' 

339 elif base in ('/DeviceGray', '/CalGray'): 

340 base = 'L' 

341 elif base == '/DeviceCMYK': 

342 base = 'CMYK' 

343 elif base == '/DeviceN': 

344 base = 'DeviceN' 

345 elif base == '/Separation': 

346 base = 'Separation' 

347 elif base == '/ICCBased': 

348 base = self._approx_mode_from_icc() 

349 else: 

350 raise NotImplementedError(f"not sure how to interpret this palette: {base}") 

351 return PaletteData(base, lookup) 

352 

353 @abstractmethod 

354 def as_pil_image(self) -> Image.Image: 

355 """Convert this PDF image to a Python PIL (Pillow) image.""" 

356 

357 def _repr_png_(self) -> bytes: 

358 """Display hook for IPython/Jupyter.""" 

359 b = BytesIO() 

360 with self.as_pil_image() as im: 

361 im.save(b, 'PNG') 

362 return b.getvalue() 

363 

364 

365class PdfImage(PdfImageBase): 

366 """Support class to provide a consistent API for manipulating PDF images. 

367 

368 The data structure for images inside PDFs is irregular and complex, 

369 making it difficult to use without introducing errors for less 

370 typical cases. This class addresses these difficulties by providing a 

371 regular, Pythonic API similar in spirit (and convertible to) the Python 

372 Pillow imaging library. 

373 """ 

374 

375 obj: Stream 

376 _icc: ImageCmsProfile | None 

377 _pdf_source: Pdf | None 

378 

379 def __new__(cls, obj: Stream): 

380 """Construct a PdfImage... or a PdfJpxImage if that is what we really are.""" 

381 try: 

382 # Check if JPXDecode is called for and initialize as PdfJpxImage 

383 filters = _ensure_list(obj.Filter) 

384 if Name.JPXDecode in filters: 

385 return super().__new__(PdfJpxImage) 

386 except (AttributeError, KeyError): 

387 # __init__ will deal with any other errors 

388 pass 

389 return super().__new__(PdfImage) 

390 

391 def __init__(self, obj: Stream): 

392 """Construct a PDF image from a Image XObject inside a PDF. 

393 

394 ``pim = PdfImage(page.Resources.XObject['/ImageNN'])`` 

395 

396 Args: 

397 obj: an Image XObject 

398 """ 

399 if isinstance(obj, Stream) and obj.stream_dict.get("/Subtype") != "/Image": 

400 raise TypeError("can't construct PdfImage from non-image") 

401 self.obj = obj 

402 self._icc = None 

403 

404 def __eq__(self, other): 

405 if not isinstance(other, PdfImageBase): 

406 return NotImplemented 

407 return self.obj == other.obj 

408 

409 @classmethod 

410 def _from_pil_image(cls, *, pdf, page, name, image): # pragma: no cover 

411 """Insert a PIL image into a PDF (rudimentary). 

412 

413 Args: 

414 pdf (pikepdf.Pdf): the PDF to attach the image to 

415 page (pikepdf.Object): the page to attach the image to 

416 name (str or pikepdf.Name): the name to set the image 

417 image (PIL.Image.Image): the image to insert 

418 """ 

419 data = image.tobytes() 

420 

421 imstream = Stream(pdf, data) 

422 imstream.Type = Name('/XObject') 

423 imstream.Subtype = Name('/Image') 

424 if image.mode == 'RGB': 

425 imstream.ColorSpace = Name('/DeviceRGB') 

426 elif image.mode in ('1', 'L'): 

427 imstream.ColorSpace = Name('/DeviceGray') 

428 imstream.BitsPerComponent = 1 if image.mode == '1' else 8 

429 imstream.Width = image.width 

430 imstream.Height = image.height 

431 

432 page.Resources.XObject[name] = imstream 

433 

434 return cls(imstream) 

435 

436 def _metadata(self, name, type_, default): 

437 return _metadata_from_obj(self.obj, name, type_, default) 

438 

439 @property 

440 def _iccstream(self): 

441 if self.colorspace == '/ICCBased': 

442 if not self.indexed: 

443 return self._colorspaces[1] 

444 assert isinstance(self._colorspaces[1], list) 

445 return self._colorspaces[1][1] 

446 raise NotImplementedError("Don't know how to find ICC stream for image") 

447 

448 @property 

449 def icc(self) -> ImageCmsProfile | None: 

450 """If an ICC profile is attached, return a Pillow object that describe it. 

451 

452 Most of the information may be found in ``icc.profile``. 

453 """ 

454 if self.colorspace not in ('/ICCBased', '/Indexed'): 

455 return None 

456 if not self._icc: 

457 iccstream = self._iccstream 

458 iccbuffer = iccstream.get_stream_buffer() 

459 iccbytesio = BytesIO(iccbuffer) 

460 try: 

461 self._icc = ImageCmsProfile(iccbytesio) 

462 except OSError as e: 

463 if str(e) == 'cannot open profile from string': 

464 # ICC profile is corrupt 

465 raise UnsupportedImageTypeError( 

466 "ICC profile corrupt or not readable" 

467 ) from e 

468 return self._icc 

469 

470 def _remove_simple_filters(self): 

471 """Remove simple lossless compression where it appears.""" 

472 COMPLEX_FILTERS = { 

473 '/DCTDecode', 

474 '/JPXDecode', 

475 '/JBIG2Decode', 

476 '/CCITTFaxDecode', 

477 } 

478 indices = [n for n, filt in enumerate(self.filters) if filt in COMPLEX_FILTERS] 

479 if len(indices) > 1: 

480 raise NotImplementedError( 

481 f"Object {self.obj.objgen} has compound complex filters: " 

482 f"{self.filters}. We cannot decompress this." 

483 ) 

484 if len(indices) == 0: 

485 # No complex filter indices, so all filters are simple - remove them all 

486 return self.obj.read_bytes(StreamDecodeLevel.specialized), [] 

487 

488 n = indices[0] 

489 if n == 0: 

490 # The only filter is complex, so return 

491 return self.obj.read_raw_bytes(), self.filters 

492 

493 # Put copy in a temporary PDF to ensure we don't permanently modify self 

494 with Pdf.new() as tmp_pdf: 

495 obj_copy = tmp_pdf.copy_foreign(self.obj) 

496 obj_copy.Filter = Array([Name(f) for f in self.filters[:n]]) 

497 obj_copy.DecodeParms = Array(self.decode_parms[:n]) 

498 return obj_copy.read_bytes(StreamDecodeLevel.specialized), self.filters[n:] 

499 

500 def _extract_direct(self, *, stream: BinaryIO) -> str | None: 

501 """Attempt to extract the image directly to a usable image file. 

502 

503 If there is no way to extract the image without decompressing or 

504 transcoding then raise an exception. The type and format of image 

505 generated will vary. 

506 

507 Args: 

508 stream: Writable file stream to write data to, e.g. an open file 

509 """ 

510 

511 def normal_dct_rgb() -> bool: 

512 # Normal DCTDecode RGB images have the default value of 

513 # /ColorTransform 1 and are actually in YUV. Such a file can be 

514 # saved as a standard JPEG. RGB JPEGs without YUV conversion can't 

515 # be saved as JPEGs, and are probably bugs. Some software in the 

516 # wild actually produces RGB JPEGs in PDFs (probably a bug). 

517 DEFAULT_CT_RGB = 1 

518 ct = DEFAULT_CT_RGB 

519 if self.filter_decodeparms[0][1] is not None: 

520 ct = self.filter_decodeparms[0][1].get( 

521 '/ColorTransform', DEFAULT_CT_RGB 

522 ) 

523 return self.mode == 'RGB' and ct == DEFAULT_CT_RGB 

524 

525 def normal_dct_cmyk() -> bool: 

526 # Normal DCTDecode CMYKs have /ColorTransform 0 and can be saved. 

527 # There is a YUVK colorspace but CMYK JPEGs don't generally use it 

528 DEFAULT_CT_CMYK = 0 

529 ct = DEFAULT_CT_CMYK 

530 if self.filter_decodeparms[0][1] is not None: 

531 ct = self.filter_decodeparms[0][1].get( 

532 '/ColorTransform', DEFAULT_CT_CMYK 

533 ) 

534 return self.mode == 'CMYK' and ct == DEFAULT_CT_CMYK 

535 

536 data, filters = self._remove_simple_filters() 

537 

538 if filters == ['/CCITTFaxDecode']: 

539 if self.colorspace == '/ICCBased': 

540 icc = self._iccstream.read_bytes() 

541 else: 

542 icc = None 

543 stream.write(self._generate_ccitt_header(data, icc=icc)) 

544 stream.write(data) 

545 return '.tif' 

546 if filters == ['/DCTDecode'] and ( 

547 self.mode == 'L' or normal_dct_rgb() or normal_dct_cmyk() 

548 ): 

549 stream.write(data) 

550 return '.jpg' 

551 

552 return None 

553 

554 def _extract_transcoded_1248bits(self) -> Image.Image: 

555 """Extract an image when there are 1/2/4/8 bits packed in byte data.""" 

556 stride = 0 # tell Pillow to calculate stride from line width 

557 scale = 0 if self.mode == 'L' else 1 

558 if self.bits_per_component in (2, 4): 

559 buffer, stride = _transcoding.unpack_subbyte_pixels( 

560 self.read_bytes(), self.size, self.bits_per_component, scale 

561 ) 

562 elif self.bits_per_component == 8: 

563 buffer = cast(memoryview, self.get_stream_buffer()) 

564 else: 

565 raise InvalidPdfImageError("BitsPerComponent must be 1, 2, 4, 8, or 16") 

566 

567 if self.mode == 'P' and self.palette is not None: 

568 base_mode, palette = self.palette 

569 im = _transcoding.image_from_buffer_and_palette( 

570 buffer, 

571 self.size, 

572 stride, 

573 base_mode, 

574 palette, 

575 ) 

576 else: 

577 im = _transcoding.image_from_byte_buffer(buffer, self.size, stride) 

578 return im 

579 

580 def _extract_transcoded_1bit(self) -> Image.Image: 

581 if not self.image_mask and self.mode in ('RGB', 'CMYK'): 

582 raise UnsupportedImageTypeError("1-bit RGB and CMYK are not supported") 

583 try: 

584 data = self.read_bytes() 

585 except (RuntimeError, PdfError) as e: 

586 if ( 

587 'read_bytes called on unfilterable stream' in str(e) 

588 and not jbig2.get_decoder().available() 

589 ): 

590 raise DependencyError( 

591 "jbig2dec - not installed or installed version is too old " 

592 "(older than version 0.15)" 

593 ) from None 

594 raise 

595 

596 im = Image.frombytes('1', self.size, data) 

597 

598 if self.palette is not None: 

599 base_mode, palette = self.palette 

600 im = _transcoding.fix_1bit_palette_image(im, base_mode, palette) 

601 

602 return im 

603 

604 def _extract_transcoded_mask(self) -> Image.Image: 

605 return self._extract_transcoded_1bit() 

606 

607 def _extract_transcoded(self) -> Image.Image: 

608 if self.image_mask: 

609 return self._extract_transcoded_mask() 

610 

611 if self.mode in {'DeviceN', 'Separation'}: 

612 raise HifiPrintImageNotTranscodableError() 

613 

614 if self.mode == 'RGB' and self.bits_per_component == 8: 

615 # Cannot use the zero-copy .get_stream_buffer here, we have 3-byte 

616 # RGB and Pillow needs RGBX. 

617 im = Image.frombuffer( 

618 'RGB', self.size, self.read_bytes(), 'raw', 'RGB', 0, 1 

619 ) 

620 elif self.mode == 'CMYK' and self.bits_per_component == 8: 

621 im = Image.frombuffer( 

622 'CMYK', self.size, self.get_stream_buffer(), 'raw', 'CMYK', 0, 1 

623 ) 

624 # elif self.mode == '1': 

625 elif self.bits_per_component == 1: 

626 im = self._extract_transcoded_1bit() 

627 elif self.mode in ('L', 'P') and self.bits_per_component <= 8: 

628 im = self._extract_transcoded_1248bits() 

629 else: 

630 raise UnsupportedImageTypeError(repr(self) + ", " + repr(self.obj)) 

631 

632 if self.colorspace == '/ICCBased' and self.icc is not None: 

633 im.info['icc_profile'] = self.icc.tobytes() 

634 

635 return im 

636 

637 def _extract_to_stream(self, *, stream: BinaryIO) -> str: 

638 """Extract the image to a stream. 

639 

640 If possible, the compressed data is extracted and inserted into 

641 a compressed image file format without transcoding the compressed 

642 content. If this is not possible, the data will be decompressed 

643 and extracted to an appropriate format. 

644 

645 Args: 

646 stream: Writable stream to write data to 

647 

648 Returns: 

649 The file format extension. 

650 """ 

651 direct_extraction = self._extract_direct(stream=stream) 

652 if direct_extraction: 

653 return direct_extraction 

654 

655 im = None 

656 try: 

657 im = self._extract_transcoded() 

658 if im.mode == 'CMYK': 

659 im.save(stream, format='tiff', compression='tiff_adobe_deflate') 

660 return '.tiff' 

661 if im: 

662 im.save(stream, format='png') 

663 return '.png' 

664 except PdfError as e: 

665 if 'called on unfilterable stream' in str(e): 

666 raise UnsupportedImageTypeError(repr(self)) from e 

667 raise 

668 finally: 

669 if im: 

670 im.close() 

671 

672 raise UnsupportedImageTypeError(repr(self)) 

673 

674 def extract_to( 

675 self, *, stream: BinaryIO | None = None, fileprefix: str = '' 

676 ) -> str: 

677 """Extract the image directly to a usable image file. 

678 

679 If possible, the compressed data is extracted and inserted into 

680 a compressed image file format without transcoding the compressed 

681 content. If this is not possible, the data will be decompressed 

682 and extracted to an appropriate format. 

683 

684 Because it is not known until attempted what image format will be 

685 extracted, users should not assume what format they are getting back. 

686 When saving the image to a file, use a temporary filename, and then 

687 rename the file to its final name based on the returned file extension. 

688 

689 Images might be saved as any of .png, .jpg, or .tiff. 

690 

691 Examples: 

692 >>> im.extract_to(stream=bytes_io) # doctest: +SKIP 

693 '.png' 

694 

695 >>> im.extract_to(fileprefix='/tmp/image00') # doctest: +SKIP 

696 '/tmp/image00.jpg' 

697 

698 Args: 

699 stream: Writable stream to write data to. 

700 fileprefix (str or Path): The path to write the extracted image to, 

701 without the file extension. 

702 

703 Returns: 

704 If *fileprefix* was provided, then the fileprefix with the 

705 appropriate extension. If no *fileprefix*, then an extension 

706 indicating the file type. 

707 """ 

708 if bool(stream) == bool(fileprefix): 

709 raise ValueError("Cannot set both stream and fileprefix") 

710 if stream: 

711 return self._extract_to_stream(stream=stream) 

712 

713 bio = BytesIO() 

714 extension = self._extract_to_stream(stream=bio) 

715 bio.seek(0) 

716 filepath = Path(str(Path(fileprefix)) + extension) 

717 with filepath.open('wb') as target: 

718 copyfileobj(bio, target) 

719 return str(filepath) 

720 

721 def read_bytes( 

722 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized 

723 ) -> bytes: 

724 """Decompress this image and return it as unencoded bytes.""" 

725 return self.obj.read_bytes(decode_level=decode_level) 

726 

727 def get_stream_buffer( 

728 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized 

729 ) -> Buffer: 

730 """Access this image with the buffer protocol.""" 

731 return self.obj.get_stream_buffer(decode_level=decode_level) 

732 

733 def as_pil_image(self) -> Image.Image: 

734 """Extract the image as a Pillow Image, using decompression as necessary. 

735 

736 Caller must close the image. 

737 """ 

738 bio = BytesIO() 

739 direct_extraction = self._extract_direct(stream=bio) 

740 if direct_extraction: 

741 bio.seek(0) 

742 return Image.open(bio) 

743 

744 im = self._extract_transcoded() 

745 if not im: 

746 raise UnsupportedImageTypeError(repr(self)) 

747 

748 return im 

749 

750 def _generate_ccitt_header(self, data: bytes, icc: bytes | None = None) -> bytes: 

751 """Construct a CCITT G3 or G4 header from the PDF metadata.""" 

752 # https://stackoverflow.com/questions/2641770/ 

753 # https://www.itu.int/itudoc/itu-t/com16/tiff-fx/docs/tiff6.pdf 

754 

755 if not self.decode_parms: 

756 raise ValueError("/CCITTFaxDecode without /DecodeParms") 

757 

758 expected_defaults = [ 

759 ("/EncodedByteAlign", False), 

760 ] 

761 for name, val in expected_defaults: 

762 if self.decode_parms[0].get(name, val) != val: 

763 raise UnsupportedImageTypeError( 

764 f"/CCITTFaxDecode with decode parameter {name} not equal {val}" 

765 ) 

766 

767 k = self.decode_parms[0].get("/K", 0) 

768 t4_options = None 

769 if k < 0: 

770 ccitt_group = 4 # Group 4 

771 elif k > 0: 

772 ccitt_group = 3 # Group 3 2-D 

773 t4_options = 1 

774 else: 

775 ccitt_group = 3 # Group 3 1-D 

776 black_is_one = self.decode_parms[0].get("/BlackIs1", False) 

777 decode = self._decode_array 

778 # PDF spec says: 

779 # BlackIs1: A flag indicating whether 1 bits shall be interpreted as black 

780 # pixels and 0 bits as white pixels, the reverse of the normal 

781 # PDF convention for image data. Default value: false. 

782 # TIFF spec says: 

783 # use 0 for white_is_zero (=> black is 1) MINISWHITE 

784 # use 1 for black_is_zero (=> white is 1) MINISBLACK 

785 photometry = 1 if black_is_one else 0 

786 

787 # If Decode is [1, 0] then the photometry is inverted 

788 if len(decode) == 2 and decode == (1.0, 0.0): 

789 photometry = 1 - photometry 

790 

791 img_size = len(data) 

792 if icc is None: 

793 icc = b'' 

794 

795 return _transcoding.generate_ccitt_header( 

796 self.size, 

797 data_length=img_size, 

798 ccitt_group=ccitt_group, 

799 t4_options=t4_options, 

800 photometry=photometry, 

801 icc=icc, 

802 ) 

803 

804 def show(self): # pragma: no cover 

805 """Show the image however PIL wants to.""" 

806 self.as_pil_image().show() 

807 

808 def _set_pdf_source(self, pdf: Pdf): 

809 self._pdf_source = pdf 

810 

811 def __repr__(self): 

812 try: 

813 mode = self.mode 

814 except NotImplementedError: 

815 mode = '?' 

816 return ( 

817 f'<pikepdf.PdfImage image mode={mode} ' 

818 f'size={self.width}x{self.height} at {hex(id(self))}>' 

819 ) 

820 

821 

822class PdfJpxImage(PdfImage): 

823 """Support class for JPEG 2000 images. Implements the same API as :class:`PdfImage`. 

824 

825 If you call PdfImage(object_that_is_actually_jpeg2000_image), pikepdf will return 

826 this class instead, due to the check in PdfImage.__new__. 

827 """ 

828 

829 def __init__(self, obj): 

830 """Initialize a JPEG 2000 image.""" 

831 super().__init__(obj) 

832 self._jpxpil = self.as_pil_image() 

833 

834 def __eq__(self, other): 

835 if not isinstance(other, PdfImageBase): 

836 return NotImplemented 

837 return ( 

838 self.obj == other.obj 

839 and isinstance(other, PdfJpxImage) 

840 and self._jpxpil == other._jpxpil 

841 ) 

842 

843 def _extract_direct(self, *, stream: BinaryIO) -> str | None: 

844 data, filters = self._remove_simple_filters() 

845 if filters != ['/JPXDecode']: 

846 return None 

847 stream.write(data) 

848 return '.jp2' 

849 

850 def _extract_transcoded(self) -> Image.Image: 

851 return super()._extract_transcoded() 

852 

853 @property 

854 def _colorspaces(self): 

855 """Return the effective colorspace of a JPEG 2000 image. 

856 

857 If the ColorSpace dictionary is present, the colorspace embedded in the 

858 JPEG 2000 data will be ignored, as required by the specification. 

859 """ 

860 # (PDF 1.7 Table 89) If ColorSpace is present, any colour space 

861 # specifications in the JPEG2000 data shall be ignored. 

862 super_colorspaces = super()._colorspaces 

863 if super_colorspaces: 

864 return super_colorspaces 

865 if self._jpxpil.mode == 'L': 

866 return ['/DeviceGray'] 

867 if self._jpxpil.mode == 'RGB': 

868 return ['/DeviceRGB'] 

869 raise NotImplementedError('Complex JP2 colorspace') 

870 

871 @property 

872 def _bpc(self) -> int: 

873 """Return 8, since bpc is not meaningful for JPEG 2000 encoding.""" 

874 # (PDF 1.7 Table 89) If the image stream uses the JPXDecode filter, this 

875 # entry is optional and shall be ignored if present. The bit depth is 

876 # determined by the conforming reader in the process of decoding the 

877 # JPEG2000 image. 

878 return 8 

879 

880 @property 

881 def indexed(self) -> bool: 

882 """Return False, since JPEG 2000 should not be indexed.""" 

883 # Nothing in the spec precludes an Indexed JPXDecode image, except for 

884 # the fact that doing so is madness. Let's assume it no one is that 

885 # insane. 

886 return False 

887 

888 def __repr__(self): 

889 return ( 

890 f'<pikepdf.PdfJpxImage JPEG2000 image mode={self.mode} ' 

891 f'size={self.width}x{self.height} at {hex(id(self))}>' 

892 ) 

893 

894 

895class PdfInlineImage(PdfImageBase): 

896 """Support class for PDF inline images.""" 

897 

898 # Inline images can contain abbreviations that we write automatically 

899 ABBREVS = { 

900 b'/W': b'/Width', 

901 b'/H': b'/Height', 

902 b'/BPC': b'/BitsPerComponent', 

903 b'/IM': b'/ImageMask', 

904 b'/CS': b'/ColorSpace', 

905 b'/F': b'/Filter', 

906 b'/DP': b'/DecodeParms', 

907 b'/G': b'/DeviceGray', 

908 b'/RGB': b'/DeviceRGB', 

909 b'/CMYK': b'/DeviceCMYK', 

910 b'/I': b'/Indexed', 

911 b'/AHx': b'/ASCIIHexDecode', 

912 b'/A85': b'/ASCII85Decode', 

913 b'/LZW': b'/LZWDecode', 

914 b'/RL': b'/RunLengthDecode', 

915 b'/CCF': b'/CCITTFaxDecode', 

916 b'/DCT': b'/DCTDecode', 

917 } 

918 REVERSE_ABBREVS = {v: k for k, v in ABBREVS.items()} 

919 

920 _data: Object 

921 _image_object: tuple[Object, ...] 

922 

923 def __init__(self, *, image_data: Object, image_object: tuple): 

924 """Construct wrapper for inline image. 

925 

926 Args: 

927 image_data: data stream for image, extracted from content stream 

928 image_object: the metadata for image, also from content stream 

929 """ 

930 # Convert the sequence of pikepdf.Object from the content stream into 

931 # a dictionary object by unparsing it (to bytes), eliminating inline 

932 # image abbreviations, and constructing a bytes string equivalent to 

933 # what an image XObject would look like. Then retrieve data from there 

934 

935 self._data = image_data 

936 self._image_object = image_object 

937 

938 reparse = b' '.join( 

939 self._unparse_obj(obj, remap_names=self.ABBREVS) for obj in image_object 

940 ) 

941 try: 

942 reparsed_obj = Object.parse(b'<< ' + reparse + b' >>') 

943 except PdfError as e: 

944 raise PdfError("parsing inline " + reparse.decode('unicode_escape')) from e 

945 self.obj = reparsed_obj 

946 

947 def __eq__(self, other): 

948 if not isinstance(other, PdfImageBase): 

949 return NotImplemented 

950 return ( 

951 self.obj == other.obj 

952 and isinstance(other, PdfInlineImage) 

953 and ( 

954 self._data._inline_image_raw_bytes() 

955 == other._data._inline_image_raw_bytes() 

956 ) 

957 ) 

958 

959 @classmethod 

960 def _unparse_obj(cls, obj, remap_names): 

961 if isinstance(obj, Object): 

962 if isinstance(obj, Name): 

963 name = obj.unparse(resolved=True) 

964 assert isinstance(name, bytes) 

965 return remap_names.get(name, name) 

966 return obj.unparse(resolved=True) 

967 if isinstance(obj, bool): 

968 return b'true' if obj else b'false' # Lower case for PDF spec 

969 if isinstance(obj, int | Decimal | float): 

970 return str(obj).encode('ascii') 

971 raise NotImplementedError(repr(obj)) 

972 

973 def _metadata(self, name, type_, default): 

974 return _metadata_from_obj(self.obj, name, type_, default) 

975 

976 def unparse(self) -> bytes: 

977 """Create the content stream bytes that reproduce this inline image.""" 

978 

979 def metadata_tokens(): 

980 for metadata_obj in self._image_object: 

981 unparsed = self._unparse_obj( 

982 metadata_obj, remap_names=self.REVERSE_ABBREVS 

983 ) 

984 assert isinstance(unparsed, bytes) 

985 yield unparsed 

986 

987 def inline_image_tokens(): 

988 yield b'BI\n' 

989 yield b' '.join(m for m in metadata_tokens()) 

990 yield b'\nID\n' 

991 yield self._data._inline_image_raw_bytes() 

992 yield b'EI' 

993 

994 return b''.join(inline_image_tokens()) 

995 

996 @property 

997 def icc(self): # pragma: no cover 

998 """Raise an exception since ICC profiles are not supported on inline images.""" 

999 raise InvalidPdfImageError( 

1000 "Inline images with ICC profiles are not supported in the PDF specification" 

1001 ) 

1002 

1003 def __repr__(self): 

1004 try: 

1005 mode = self.mode 

1006 except NotImplementedError: 

1007 mode = '?' 

1008 return ( 

1009 f'<pikepdf.PdfInlineImage image mode={mode} ' 

1010 f'size={self.width}x{self.height} at {hex(id(self))}>' 

1011 ) 

1012 

1013 def _convert_to_pdfimage(self) -> PdfImage: 

1014 # Construct a temporary PDF that holds this inline image, and... 

1015 tmppdf = Pdf.new() 

1016 tmppdf.add_blank_page(page_size=(self.width, self.height)) 

1017 tmppdf.pages[0].contents_add( 

1018 f'{self.width} 0 0 {self.height} 0 0 cm'.encode('ascii'), prepend=True 

1019 ) 

1020 tmppdf.pages[0].contents_add(self.unparse()) 

1021 

1022 # ...externalize it, 

1023 tmppdf.pages[0].externalize_inline_images() 

1024 raw_img = cast(Stream, next(im for im in tmppdf.pages[0].images.values())) 

1025 

1026 # ...then use the regular PdfImage API to extract it. 

1027 img = PdfImage(raw_img) 

1028 img._set_pdf_source(tmppdf) # Hold tmppdf open while PdfImage exists 

1029 return img 

1030 

1031 def as_pil_image(self) -> Image.Image: 

1032 """Return inline image as a Pillow Image.""" 

1033 return self._convert_to_pdfimage().as_pil_image() 

1034 

1035 def extract_to(self, *, stream: BinaryIO | None = None, fileprefix: str = ''): 

1036 """Extract the inline image directly to a usable image file. 

1037 

1038 See: 

1039 :meth:`PdfImage.extract_to` 

1040 """ 

1041 return self._convert_to_pdfimage().extract_to( 

1042 stream=stream, fileprefix=fileprefix 

1043 ) 

1044 

1045 def read_bytes(self): 

1046 """Return decompressed image bytes.""" 

1047 # qpdf does not have an API to return this directly, so convert it. 

1048 return self._convert_to_pdfimage().read_bytes() 

1049 

1050 def get_stream_buffer(self): 

1051 """Return decompressed stream buffer.""" 

1052 # qpdf does not have an API to return this directly, so convert it. 

1053 return self._convert_to_pdfimage().get_stream_buffer() 

1054 

1055 

1056__all__ = [ 

1057 'CMYKDecodeArray', 

1058 'DecodeArray', 

1059 'HifiPrintImageNotTranscodableError', 

1060 'ImageDecompressionError', 

1061 'InvalidPdfImageError', 

1062 'PaletteData', 

1063 'PdfImage', 

1064 'PdfImageBase', 

1065 'PdfInlineImage', 

1066 'PdfJpxImage', 

1067 'RGBDecodeArray', 

1068 'UnsupportedImageTypeError', 

1069]