1# SPDX-FileCopyrightText: 2022 James R. Barlow
2# SPDX-License-Identifier: MPL-2.0
3
4"""Extract images embedded in PDF."""
5
6from __future__ import annotations
7
8from abc import ABC, abstractmethod
9from copy import copy
10from decimal import Decimal
11from io import BytesIO
12from itertools import zip_longest
13from pathlib import Path
14from shutil import copyfileobj
15from typing import Any, BinaryIO, Callable, NamedTuple, TypeVar, Union, cast
16
17from PIL import Image
18from PIL.ImageCms import ImageCmsProfile
19
20from pikepdf import jbig2
21from pikepdf._core import Buffer, Pdf, PdfError, StreamDecodeLevel
22from pikepdf._exceptions import DependencyError
23from pikepdf.models import _transcoding
24from pikepdf.models._transcoding import ImageDecompressionError
25from pikepdf.objects import (
26 Array,
27 Dictionary,
28 Name,
29 Object,
30 Stream,
31 String,
32)
33
34T = TypeVar('T')
35
36RGBDecodeArray = tuple[float, float, float, float, float, float]
37GrayDecodeArray = tuple[float, float]
38CMYKDecodeArray = tuple[float, float, float, float, float, float, float, float]
39DecodeArray = Union[RGBDecodeArray, GrayDecodeArray, CMYKDecodeArray]
40
41
42class UnsupportedImageTypeError(Exception):
43 """This image is formatted in a way pikepdf does not supported."""
44
45
46class NotExtractableError(Exception):
47 """Indicates that an image cannot be directly extracted."""
48
49
50class HifiPrintImageNotTranscodableError(NotExtractableError):
51 """Image contains high fidelity printing information and cannot be extracted."""
52
53
54class InvalidPdfImageError(Exception):
55 """This image is not valid according to the PDF 1.7 specification."""
56
57
58def _array_str(value: Object | str | list):
59 """Simplify pikepdf objects to array of str. Keep streams, dictionaries intact."""
60
61 def _convert(item):
62 if isinstance(item, (list, Array)):
63 return [_convert(subitem) for subitem in item]
64 if isinstance(item, (Stream, Dictionary, bytes, int)):
65 return item
66 if isinstance(item, (Name, str)):
67 return str(item)
68 if isinstance(item, (String)):
69 return bytes(item)
70 raise NotImplementedError(value)
71
72 result = _convert(value)
73 if not isinstance(result, list):
74 result = [result]
75 return result
76
77
78def _ensure_list(value: list[Object] | Dictionary | Array | Object) -> list[Object]:
79 """Ensure value is a list of pikepdf.Object, if it was not already.
80
81 To support DecodeParms which can be present as either an array of dicts or a single
82 dict. It's easier to convert to an array of one dict.
83 """
84 if isinstance(value, list):
85 return value
86 return list(value.wrap_in_array().as_list())
87
88
89def _metadata_from_obj(
90 obj: Dictionary | Stream, name: str, type_: Callable[[Any], T], default: T
91) -> T | None:
92 """Retrieve metadata from a dictionary or stream and wrangle types."""
93 val = getattr(obj, name, default)
94 try:
95 return type_(val)
96 except TypeError:
97 if val is None:
98 return None
99 raise NotImplementedError('Metadata access for ' + name)
100
101
102class PaletteData(NamedTuple):
103 """Returns the color space and binary representation of the palette.
104
105 ``base_colorspace`` is typically ``"RGB"`` or ``"L"`` (for grayscale).
106
107 ``palette`` is typically 256 or 256*3=768 bytes, for grayscale and RGB color
108 respectively, with each unit/triplet being the grayscale/RGB triplet values.
109 """
110
111 base_colorspace: str
112 palette: bytes
113
114
115class PdfImageBase(ABC):
116 """Abstract base class for images."""
117
118 SIMPLE_COLORSPACES = {'/DeviceRGB', '/DeviceGray', '/CalRGB', '/CalGray'}
119 MAIN_COLORSPACES = SIMPLE_COLORSPACES | {'/DeviceCMYK', '/CalCMYK', '/ICCBased'}
120 PRINT_COLORSPACES = {'/Separation', '/DeviceN'}
121
122 @abstractmethod
123 def _metadata(self, name: str, type_: Callable[[Any], T], default: T) -> T:
124 """Get metadata for this image type."""
125
126 @property
127 def width(self) -> int:
128 """Width of the image data in pixels."""
129 return self._metadata('Width', int, 0)
130
131 @property
132 def height(self) -> int:
133 """Height of the image data in pixels."""
134 return self._metadata('Height', int, 0)
135
136 @property
137 def image_mask(self) -> bool:
138 """Return ``True`` if this is an image mask."""
139 return self._metadata('ImageMask', bool, False)
140
141 @property
142 def _bpc(self) -> int | None:
143 """Bits per component for this image (low-level)."""
144 return self._metadata('BitsPerComponent', int, 0)
145
146 @property
147 def _colorspaces(self):
148 """Colorspace (low-level)."""
149 return self._metadata('ColorSpace', _array_str, [])
150
151 @property
152 def filters(self):
153 """List of names of the filters that we applied to encode this image."""
154 return self._metadata('Filter', _array_str, [])
155
156 @property
157 def _decode_array(self) -> DecodeArray:
158 """Extract the /Decode array."""
159 decode: list = self._metadata('Decode', _ensure_list, [])
160 if decode and len(decode) in (2, 6, 8):
161 return cast(DecodeArray, tuple(float(value) for value in decode))
162
163 if self.colorspace in ('/DeviceGray', '/CalGray'):
164 return (0.0, 1.0)
165 if self.colorspace in ('/DeviceRGB', '/CalRGB'):
166 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0)
167 if self.colorspace == '/DeviceCMYK':
168 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0)
169 if self.colorspace == '/ICCBased':
170 if self._approx_mode_from_icc() == 'L':
171 return (0.0, 1.0)
172 if self._approx_mode_from_icc() == 'RGB':
173 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0)
174 if self.image_mask:
175 return (0.0, 1.0) # Default for image masks; per RM 8.9.6.2
176
177 raise NotImplementedError(
178 "Don't how to retrieve default /Decode array for image" + repr(self)
179 )
180
181 @property
182 def decode_parms(self):
183 """List of the /DecodeParms, arguments to filters."""
184 return self._metadata('DecodeParms', _ensure_list, [])
185
186 @property
187 def colorspace(self) -> str | None:
188 """PDF name of the colorspace that best describes this image."""
189 if self.image_mask:
190 return None # Undefined for image masks
191 if self._colorspaces:
192 if self._colorspaces[0] in self.MAIN_COLORSPACES:
193 return self._colorspaces[0]
194 if self._colorspaces[0] == '/Indexed':
195 subspace = self._colorspaces[1]
196 if isinstance(subspace, str) and subspace in self.MAIN_COLORSPACES:
197 return subspace
198 if isinstance(subspace, list) and subspace[0] in (
199 '/ICCBased',
200 '/DeviceN',
201 '/CalGray',
202 '/CalRGB',
203 ):
204 return subspace[0]
205 if self._colorspaces[0] == '/DeviceN':
206 return '/DeviceN'
207
208 raise NotImplementedError(
209 "not sure how to get colorspace: " + repr(self._colorspaces)
210 )
211
212 @property
213 def bits_per_component(self) -> int:
214 """Bits per component of this image."""
215 if self._bpc is None or self._bpc == 0:
216 return 1 if self.image_mask else 8
217 return self._bpc
218
219 @property
220 @abstractmethod
221 def icc(self) -> ImageCmsProfile | None:
222 """Return ICC profile for this image if one is defined."""
223
224 @property
225 def indexed(self) -> bool:
226 """Check if the image has a defined color palette."""
227 return '/Indexed' in self._colorspaces
228
229 def _colorspace_has_name(self, name):
230 try:
231 cs = self._colorspaces
232 if cs[0] == '/Indexed' and cs[1][0] == name:
233 return True
234 if cs[0] == name:
235 return True
236 except (IndexError, AttributeError, KeyError):
237 pass
238 return False
239
240 @property
241 def is_device_n(self) -> bool:
242 """Check if image has a /DeviceN (complex printing) colorspace."""
243 return self._colorspace_has_name('/DeviceN')
244
245 @property
246 def is_separation(self) -> bool:
247 """Check if image has a /DeviceN (complex printing) colorspace."""
248 return self._colorspace_has_name('/Separation')
249
250 @property
251 def size(self) -> tuple[int, int]:
252 """Size of image as (width, height)."""
253 return self.width, self.height
254
255 def _approx_mode_from_icc(self):
256 if self.indexed:
257 icc_profile = self._colorspaces[1][1]
258 else:
259 icc_profile = self._colorspaces[1]
260 icc_profile_nchannels = int(icc_profile['/N'])
261
262 if icc_profile_nchannels == 1:
263 return 'L'
264
265 # Multiple channels, need to open the profile and look
266 mode_from_xcolor_space = {'RGB ': 'RGB', 'CMYK': 'CMYK'}
267 xcolor_space = self.icc.profile.xcolor_space
268 return mode_from_xcolor_space.get(xcolor_space, '')
269
270 @property
271 def mode(self) -> str:
272 """``PIL.Image.mode`` equivalent for this image, where possible.
273
274 If an ICC profile is attached to the image, we still attempt to resolve a Pillow
275 mode.
276 """
277 m = ''
278 if self.is_device_n:
279 m = 'DeviceN'
280 elif self.is_separation:
281 m = 'Separation'
282 elif self.indexed:
283 m = 'P'
284 elif self.colorspace == '/DeviceGray' and self.bits_per_component == 1:
285 m = '1'
286 elif self.colorspace == '/DeviceGray' and self.bits_per_component > 1:
287 m = 'L'
288 elif self.colorspace == '/DeviceRGB':
289 m = 'RGB'
290 elif self.colorspace == '/DeviceCMYK':
291 m = 'CMYK'
292 elif self.colorspace == '/ICCBased':
293 try:
294 m = self._approx_mode_from_icc()
295 except (ValueError, TypeError) as e:
296 raise NotImplementedError(
297 "Not sure how to handle PDF image of this type"
298 ) from e
299 if m == '':
300 raise NotImplementedError(
301 "Not sure how to handle PDF image of this type"
302 ) from None
303 return m
304
305 @property
306 def filter_decodeparms(self):
307 """Return normalized the Filter and DecodeParms data.
308
309 PDF has a lot of possible data structures concerning /Filter and
310 /DecodeParms. /Filter can be absent or a name or an array, /DecodeParms
311 can be absent or a dictionary (if /Filter is a name) or an array (if
312 /Filter is an array). When both are arrays the lengths match.
313
314 Normalize this into:
315 [(/FilterName, {/DecodeParmName: Value, ...}), ...]
316
317 The order of /Filter matters as indicates the encoding/decoding sequence.
318 """
319 return list(zip_longest(self.filters, self.decode_parms, fillvalue={}))
320
321 @property
322 def palette(self) -> PaletteData | None:
323 """Retrieve the color palette for this image if applicable."""
324 if not self.indexed:
325 return None
326 try:
327 _idx, base, _hival, lookup = self._colorspaces
328 except ValueError as e:
329 raise ValueError('Not sure how to interpret this palette') from e
330 if self.icc or self.is_device_n or self.is_separation or isinstance(base, list):
331 base = str(base[0])
332 else:
333 base = str(base)
334 lookup = bytes(lookup)
335 if base not in self.MAIN_COLORSPACES and base not in self.PRINT_COLORSPACES:
336 raise NotImplementedError(f"not sure how to interpret this palette: {base}")
337 if base in ('/DeviceRGB', '/CalRGB'):
338 base = 'RGB'
339 elif base in ('/DeviceGray', '/CalGray'):
340 base = 'L'
341 elif base == '/DeviceCMYK':
342 base = 'CMYK'
343 elif base == '/DeviceN':
344 base = 'DeviceN'
345 elif base == '/Separation':
346 base = 'Separation'
347 elif base == '/ICCBased':
348 base = self._approx_mode_from_icc()
349 else:
350 raise NotImplementedError(f"not sure how to interpret this palette: {base}")
351 return PaletteData(base, lookup)
352
353 @abstractmethod
354 def as_pil_image(self) -> Image.Image:
355 """Convert this PDF image to a Python PIL (Pillow) image."""
356
357 def _repr_png_(self) -> bytes:
358 """Display hook for IPython/Jupyter."""
359 b = BytesIO()
360 with self.as_pil_image() as im:
361 im.save(b, 'PNG')
362 return b.getvalue()
363
364
365class PdfImage(PdfImageBase):
366 """Support class to provide a consistent API for manipulating PDF images.
367
368 The data structure for images inside PDFs is irregular and complex,
369 making it difficult to use without introducing errors for less
370 typical cases. This class addresses these difficulties by providing a
371 regular, Pythonic API similar in spirit (and convertible to) the Python
372 Pillow imaging library.
373 """
374
375 obj: Stream
376 _icc: ImageCmsProfile | None
377 _pdf_source: Pdf | None
378
379 def __new__(cls, obj: Stream):
380 """Construct a PdfImage... or a PdfJpxImage if that is what we really are."""
381 try:
382 # Check if JPXDecode is called for and initialize as PdfJpxImage
383 filters = _ensure_list(obj.Filter)
384 if Name.JPXDecode in filters:
385 return super().__new__(PdfJpxImage)
386 except (AttributeError, KeyError):
387 # __init__ will deal with any other errors
388 pass
389 return super().__new__(PdfImage)
390
391 def __init__(self, obj: Stream):
392 """Construct a PDF image from a Image XObject inside a PDF.
393
394 ``pim = PdfImage(page.Resources.XObject['/ImageNN'])``
395
396 Args:
397 obj: an Image XObject
398 """
399 if isinstance(obj, Stream) and obj.stream_dict.get("/Subtype") != "/Image":
400 raise TypeError("can't construct PdfImage from non-image")
401 self.obj = obj
402 self._icc = None
403
404 def __eq__(self, other):
405 if not isinstance(other, PdfImageBase):
406 return NotImplemented
407 return self.obj == other.obj
408
409 @classmethod
410 def _from_pil_image(cls, *, pdf, page, name, image): # pragma: no cover
411 """Insert a PIL image into a PDF (rudimentary).
412
413 Args:
414 pdf (pikepdf.Pdf): the PDF to attach the image to
415 page (pikepdf.Object): the page to attach the image to
416 name (str or pikepdf.Name): the name to set the image
417 image (PIL.Image.Image): the image to insert
418 """
419 data = image.tobytes()
420
421 imstream = Stream(pdf, data)
422 imstream.Type = Name('/XObject')
423 imstream.Subtype = Name('/Image')
424 if image.mode == 'RGB':
425 imstream.ColorSpace = Name('/DeviceRGB')
426 elif image.mode in ('1', 'L'):
427 imstream.ColorSpace = Name('/DeviceGray')
428 imstream.BitsPerComponent = 1 if image.mode == '1' else 8
429 imstream.Width = image.width
430 imstream.Height = image.height
431
432 page.Resources.XObject[name] = imstream
433
434 return cls(imstream)
435
436 def _metadata(self, name, type_, default):
437 return _metadata_from_obj(self.obj, name, type_, default)
438
439 @property
440 def _iccstream(self):
441 if self.colorspace == '/ICCBased':
442 if not self.indexed:
443 return self._colorspaces[1]
444 assert isinstance(self._colorspaces[1], list)
445 return self._colorspaces[1][1]
446 raise NotImplementedError("Don't know how to find ICC stream for image")
447
448 @property
449 def icc(self) -> ImageCmsProfile | None:
450 """If an ICC profile is attached, return a Pillow object that describe it.
451
452 Most of the information may be found in ``icc.profile``.
453 """
454 if self.colorspace not in ('/ICCBased', '/Indexed'):
455 return None
456 if not self._icc:
457 iccstream = self._iccstream
458 iccbuffer = iccstream.get_stream_buffer()
459 iccbytesio = BytesIO(iccbuffer)
460 try:
461 self._icc = ImageCmsProfile(iccbytesio)
462 except OSError as e:
463 if str(e) == 'cannot open profile from string':
464 # ICC profile is corrupt
465 raise UnsupportedImageTypeError(
466 "ICC profile corrupt or not readable"
467 ) from e
468 return self._icc
469
470 def _remove_simple_filters(self):
471 """Remove simple lossless compression where it appears."""
472 COMPLEX_FILTERS = {
473 '/DCTDecode',
474 '/JPXDecode',
475 '/JBIG2Decode',
476 '/CCITTFaxDecode',
477 }
478 indices = [n for n, filt in enumerate(self.filters) if filt in COMPLEX_FILTERS]
479 if len(indices) > 1:
480 raise NotImplementedError(
481 f"Object {self.obj.objgen} has compound complex filters: "
482 f"{self.filters}. We cannot decompress this."
483 )
484 if len(indices) == 0:
485 # No complex filter indices, so all filters are simple - remove them all
486 return self.obj.read_bytes(StreamDecodeLevel.specialized), []
487
488 n = indices[0]
489 if n == 0:
490 # The only filter is complex, so return
491 return self.obj.read_raw_bytes(), self.filters
492
493 obj_copy = copy(self.obj)
494 obj_copy.Filter = Array([Name(f) for f in self.filters[:n]])
495 obj_copy.DecodeParms = Array(self.decode_parms[:n])
496 return obj_copy.read_bytes(StreamDecodeLevel.specialized), self.filters[n:]
497
498 def _extract_direct(self, *, stream: BinaryIO) -> str | None:
499 """Attempt to extract the image directly to a usable image file.
500
501 If there is no way to extract the image without decompressing or
502 transcoding then raise an exception. The type and format of image
503 generated will vary.
504
505 Args:
506 stream: Writable file stream to write data to, e.g. an open file
507 """
508
509 def normal_dct_rgb() -> bool:
510 # Normal DCTDecode RGB images have the default value of
511 # /ColorTransform 1 and are actually in YUV. Such a file can be
512 # saved as a standard JPEG. RGB JPEGs without YUV conversion can't
513 # be saved as JPEGs, and are probably bugs. Some software in the
514 # wild actually produces RGB JPEGs in PDFs (probably a bug).
515 DEFAULT_CT_RGB = 1
516 ct = DEFAULT_CT_RGB
517 if self.filter_decodeparms[0][1] is not None:
518 ct = self.filter_decodeparms[0][1].get(
519 '/ColorTransform', DEFAULT_CT_RGB
520 )
521 return self.mode == 'RGB' and ct == DEFAULT_CT_RGB
522
523 def normal_dct_cmyk() -> bool:
524 # Normal DCTDecode CMYKs have /ColorTransform 0 and can be saved.
525 # There is a YUVK colorspace but CMYK JPEGs don't generally use it
526 DEFAULT_CT_CMYK = 0
527 ct = DEFAULT_CT_CMYK
528 if self.filter_decodeparms[0][1] is not None:
529 ct = self.filter_decodeparms[0][1].get(
530 '/ColorTransform', DEFAULT_CT_CMYK
531 )
532 return self.mode == 'CMYK' and ct == DEFAULT_CT_CMYK
533
534 data, filters = self._remove_simple_filters()
535
536 if filters == ['/CCITTFaxDecode']:
537 if self.colorspace == '/ICCBased':
538 icc = self._iccstream.read_bytes()
539 else:
540 icc = None
541 stream.write(self._generate_ccitt_header(data, icc=icc))
542 stream.write(data)
543 return '.tif'
544 if filters == ['/DCTDecode'] and (
545 self.mode == 'L' or normal_dct_rgb() or normal_dct_cmyk()
546 ):
547 stream.write(data)
548 return '.jpg'
549
550 return None
551
552 def _extract_transcoded_1248bits(self) -> Image.Image:
553 """Extract an image when there are 1/2/4/8 bits packed in byte data."""
554 stride = 0 # tell Pillow to calculate stride from line width
555 scale = 0 if self.mode == 'L' else 1
556 if self.bits_per_component in (2, 4):
557 buffer, stride = _transcoding.unpack_subbyte_pixels(
558 self.read_bytes(), self.size, self.bits_per_component, scale
559 )
560 elif self.bits_per_component == 8:
561 buffer = cast(memoryview, self.get_stream_buffer())
562 else:
563 raise InvalidPdfImageError("BitsPerComponent must be 1, 2, 4, 8, or 16")
564
565 if self.mode == 'P' and self.palette is not None:
566 base_mode, palette = self.palette
567 im = _transcoding.image_from_buffer_and_palette(
568 buffer,
569 self.size,
570 stride,
571 base_mode,
572 palette,
573 )
574 else:
575 im = _transcoding.image_from_byte_buffer(buffer, self.size, stride)
576 return im
577
578 def _extract_transcoded_1bit(self) -> Image.Image:
579 if not self.image_mask and self.mode in ('RGB', 'CMYK'):
580 raise UnsupportedImageTypeError("1-bit RGB and CMYK are not supported")
581 try:
582 data = self.read_bytes()
583 except (RuntimeError, PdfError) as e:
584 if (
585 'read_bytes called on unfilterable stream' in str(e)
586 and not jbig2.get_decoder().available()
587 ):
588 raise DependencyError(
589 "jbig2dec - not installed or installed version is too old "
590 "(older than version 0.15)"
591 ) from None
592 raise
593
594 im = Image.frombytes('1', self.size, data)
595
596 if self.palette is not None:
597 base_mode, palette = self.palette
598 im = _transcoding.fix_1bit_palette_image(im, base_mode, palette)
599
600 return im
601
602 def _extract_transcoded_mask(self) -> Image.Image:
603 return self._extract_transcoded_1bit()
604
605 def _extract_transcoded(self) -> Image.Image:
606 if self.image_mask:
607 return self._extract_transcoded_mask()
608
609 if self.mode in {'DeviceN', 'Separation'}:
610 raise HifiPrintImageNotTranscodableError()
611
612 if self.mode == 'RGB' and self.bits_per_component == 8:
613 # Cannot use the zero-copy .get_stream_buffer here, we have 3-byte
614 # RGB and Pillow needs RGBX.
615 im = Image.frombuffer(
616 'RGB', self.size, self.read_bytes(), 'raw', 'RGB', 0, 1
617 )
618 elif self.mode == 'CMYK' and self.bits_per_component == 8:
619 im = Image.frombuffer(
620 'CMYK', self.size, self.get_stream_buffer(), 'raw', 'CMYK', 0, 1
621 )
622 # elif self.mode == '1':
623 elif self.bits_per_component == 1:
624 im = self._extract_transcoded_1bit()
625 elif self.mode in ('L', 'P') and self.bits_per_component <= 8:
626 im = self._extract_transcoded_1248bits()
627 else:
628 raise UnsupportedImageTypeError(repr(self) + ", " + repr(self.obj))
629
630 if self.colorspace == '/ICCBased' and self.icc is not None:
631 im.info['icc_profile'] = self.icc.tobytes()
632
633 return im
634
635 def _extract_to_stream(self, *, stream: BinaryIO) -> str:
636 """Extract the image to a stream.
637
638 If possible, the compressed data is extracted and inserted into
639 a compressed image file format without transcoding the compressed
640 content. If this is not possible, the data will be decompressed
641 and extracted to an appropriate format.
642
643 Args:
644 stream: Writable stream to write data to
645
646 Returns:
647 The file format extension.
648 """
649 direct_extraction = self._extract_direct(stream=stream)
650 if direct_extraction:
651 return direct_extraction
652
653 im = None
654 try:
655 im = self._extract_transcoded()
656 if im.mode == 'CMYK':
657 im.save(stream, format='tiff', compression='tiff_adobe_deflate')
658 return '.tiff'
659 if im:
660 im.save(stream, format='png')
661 return '.png'
662 except PdfError as e:
663 if 'called on unfilterable stream' in str(e):
664 raise UnsupportedImageTypeError(repr(self)) from e
665 raise
666 finally:
667 if im:
668 im.close()
669
670 raise UnsupportedImageTypeError(repr(self))
671
672 def extract_to(
673 self, *, stream: BinaryIO | None = None, fileprefix: str = ''
674 ) -> str:
675 """Extract the image directly to a usable image file.
676
677 If possible, the compressed data is extracted and inserted into
678 a compressed image file format without transcoding the compressed
679 content. If this is not possible, the data will be decompressed
680 and extracted to an appropriate format.
681
682 Because it is not known until attempted what image format will be
683 extracted, users should not assume what format they are getting back.
684 When saving the image to a file, use a temporary filename, and then
685 rename the file to its final name based on the returned file extension.
686
687 Images might be saved as any of .png, .jpg, or .tiff.
688
689 Examples:
690 >>> im.extract_to(stream=bytes_io) # doctest: +SKIP
691 '.png'
692
693 >>> im.extract_to(fileprefix='/tmp/image00') # doctest: +SKIP
694 '/tmp/image00.jpg'
695
696 Args:
697 stream: Writable stream to write data to.
698 fileprefix (str or Path): The path to write the extracted image to,
699 without the file extension.
700
701 Returns:
702 If *fileprefix* was provided, then the fileprefix with the
703 appropriate extension. If no *fileprefix*, then an extension
704 indicating the file type.
705 """
706 if bool(stream) == bool(fileprefix):
707 raise ValueError("Cannot set both stream and fileprefix")
708 if stream:
709 return self._extract_to_stream(stream=stream)
710
711 bio = BytesIO()
712 extension = self._extract_to_stream(stream=bio)
713 bio.seek(0)
714 filepath = Path(str(Path(fileprefix)) + extension)
715 with filepath.open('wb') as target:
716 copyfileobj(bio, target)
717 return str(filepath)
718
719 def read_bytes(
720 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized
721 ) -> bytes:
722 """Decompress this image and return it as unencoded bytes."""
723 return self.obj.read_bytes(decode_level=decode_level)
724
725 def get_stream_buffer(
726 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized
727 ) -> Buffer:
728 """Access this image with the buffer protocol."""
729 return self.obj.get_stream_buffer(decode_level=decode_level)
730
731 def as_pil_image(self) -> Image.Image:
732 """Extract the image as a Pillow Image, using decompression as necessary.
733
734 Caller must close the image.
735 """
736 bio = BytesIO()
737 direct_extraction = self._extract_direct(stream=bio)
738 if direct_extraction:
739 bio.seek(0)
740 return Image.open(bio)
741
742 im = self._extract_transcoded()
743 if not im:
744 raise UnsupportedImageTypeError(repr(self))
745
746 return im
747
748 def _generate_ccitt_header(self, data: bytes, icc: bytes | None = None) -> bytes:
749 """Construct a CCITT G3 or G4 header from the PDF metadata."""
750 # https://stackoverflow.com/questions/2641770/
751 # https://www.itu.int/itudoc/itu-t/com16/tiff-fx/docs/tiff6.pdf
752
753 if not self.decode_parms:
754 raise ValueError("/CCITTFaxDecode without /DecodeParms")
755
756 expected_defaults = [
757 ("/EncodedByteAlign", False),
758 ]
759 for name, val in expected_defaults:
760 if self.decode_parms[0].get(name, val) != val:
761 raise UnsupportedImageTypeError(
762 f"/CCITTFaxDecode with decode parameter {name} not equal {val}"
763 )
764
765 k = self.decode_parms[0].get("/K", 0)
766 t4_options = None
767 if k < 0:
768 ccitt_group = 4 # Group 4
769 elif k > 0:
770 ccitt_group = 3 # Group 3 2-D
771 t4_options = 1
772 else:
773 ccitt_group = 3 # Group 3 1-D
774 black_is_one = self.decode_parms[0].get("/BlackIs1", False)
775 decode = self._decode_array
776 # PDF spec says:
777 # BlackIs1: A flag indicating whether 1 bits shall be interpreted as black
778 # pixels and 0 bits as white pixels, the reverse of the normal
779 # PDF convention for image data. Default value: false.
780 # TIFF spec says:
781 # use 0 for white_is_zero (=> black is 1) MINISWHITE
782 # use 1 for black_is_zero (=> white is 1) MINISBLACK
783 photometry = 1 if black_is_one else 0
784
785 # If Decode is [1, 0] then the photometry is inverted
786 if len(decode) == 2 and decode == (1.0, 0.0):
787 photometry = 1 - photometry
788
789 img_size = len(data)
790 if icc is None:
791 icc = b''
792
793 return _transcoding.generate_ccitt_header(
794 self.size,
795 data_length=img_size,
796 ccitt_group=ccitt_group,
797 t4_options=t4_options,
798 photometry=photometry,
799 icc=icc,
800 )
801
802 def show(self): # pragma: no cover
803 """Show the image however PIL wants to."""
804 self.as_pil_image().show()
805
806 def _set_pdf_source(self, pdf: Pdf):
807 self._pdf_source = pdf
808
809 def __repr__(self):
810 try:
811 mode = self.mode
812 except NotImplementedError:
813 mode = '?'
814 return (
815 f'<pikepdf.PdfImage image mode={mode} '
816 f'size={self.width}x{self.height} at {hex(id(self))}>'
817 )
818
819
820class PdfJpxImage(PdfImage):
821 """Support class for JPEG 2000 images. Implements the same API as :class:`PdfImage`.
822
823 If you call PdfImage(object_that_is_actually_jpeg2000_image), pikepdf will return
824 this class instead, due to the check in PdfImage.__new__.
825 """
826
827 def __init__(self, obj):
828 """Initialize a JPEG 2000 image."""
829 super().__init__(obj)
830 self._jpxpil = self.as_pil_image()
831
832 def __eq__(self, other):
833 if not isinstance(other, PdfImageBase):
834 return NotImplemented
835 return (
836 self.obj == other.obj
837 and isinstance(other, PdfJpxImage)
838 and self._jpxpil == other._jpxpil
839 )
840
841 def _extract_direct(self, *, stream: BinaryIO) -> str | None:
842 data, filters = self._remove_simple_filters()
843 if filters != ['/JPXDecode']:
844 return None
845 stream.write(data)
846 return '.jp2'
847
848 def _extract_transcoded(self) -> Image.Image:
849 return super()._extract_transcoded()
850
851 @property
852 def _colorspaces(self):
853 """Return the effective colorspace of a JPEG 2000 image.
854
855 If the ColorSpace dictionary is present, the colorspace embedded in the
856 JPEG 2000 data will be ignored, as required by the specification.
857 """
858 # (PDF 1.7 Table 89) If ColorSpace is present, any colour space
859 # specifications in the JPEG2000 data shall be ignored.
860 super_colorspaces = super()._colorspaces
861 if super_colorspaces:
862 return super_colorspaces
863 if self._jpxpil.mode == 'L':
864 return ['/DeviceGray']
865 if self._jpxpil.mode == 'RGB':
866 return ['/DeviceRGB']
867 raise NotImplementedError('Complex JP2 colorspace')
868
869 @property
870 def _bpc(self) -> int:
871 """Return 8, since bpc is not meaningful for JPEG 2000 encoding."""
872 # (PDF 1.7 Table 89) If the image stream uses the JPXDecode filter, this
873 # entry is optional and shall be ignored if present. The bit depth is
874 # determined by the conforming reader in the process of decoding the
875 # JPEG2000 image.
876 return 8
877
878 @property
879 def indexed(self) -> bool:
880 """Return False, since JPEG 2000 should not be indexed."""
881 # Nothing in the spec precludes an Indexed JPXDecode image, except for
882 # the fact that doing so is madness. Let's assume it no one is that
883 # insane.
884 return False
885
886 def __repr__(self):
887 return (
888 f'<pikepdf.PdfJpxImage JPEG2000 image mode={self.mode} '
889 f'size={self.width}x{self.height} at {hex(id(self))}>'
890 )
891
892
893class PdfInlineImage(PdfImageBase):
894 """Support class for PDF inline images."""
895
896 # Inline images can contain abbreviations that we write automatically
897 ABBREVS = {
898 b'/W': b'/Width',
899 b'/H': b'/Height',
900 b'/BPC': b'/BitsPerComponent',
901 b'/IM': b'/ImageMask',
902 b'/CS': b'/ColorSpace',
903 b'/F': b'/Filter',
904 b'/DP': b'/DecodeParms',
905 b'/G': b'/DeviceGray',
906 b'/RGB': b'/DeviceRGB',
907 b'/CMYK': b'/DeviceCMYK',
908 b'/I': b'/Indexed',
909 b'/AHx': b'/ASCIIHexDecode',
910 b'/A85': b'/ASCII85Decode',
911 b'/LZW': b'/LZWDecode',
912 b'/RL': b'/RunLengthDecode',
913 b'/CCF': b'/CCITTFaxDecode',
914 b'/DCT': b'/DCTDecode',
915 }
916 REVERSE_ABBREVS = {v: k for k, v in ABBREVS.items()}
917
918 _data: Object
919 _image_object: tuple[Object, ...]
920
921 def __init__(self, *, image_data: Object, image_object: tuple):
922 """Construct wrapper for inline image.
923
924 Args:
925 image_data: data stream for image, extracted from content stream
926 image_object: the metadata for image, also from content stream
927 """
928 # Convert the sequence of pikepdf.Object from the content stream into
929 # a dictionary object by unparsing it (to bytes), eliminating inline
930 # image abbreviations, and constructing a bytes string equivalent to
931 # what an image XObject would look like. Then retrieve data from there
932
933 self._data = image_data
934 self._image_object = image_object
935
936 reparse = b' '.join(
937 self._unparse_obj(obj, remap_names=self.ABBREVS) for obj in image_object
938 )
939 try:
940 reparsed_obj = Object.parse(b'<< ' + reparse + b' >>')
941 except PdfError as e:
942 raise PdfError("parsing inline " + reparse.decode('unicode_escape')) from e
943 self.obj = reparsed_obj
944
945 def __eq__(self, other):
946 if not isinstance(other, PdfImageBase):
947 return NotImplemented
948 return (
949 self.obj == other.obj
950 and isinstance(other, PdfInlineImage)
951 and (
952 self._data._inline_image_raw_bytes()
953 == other._data._inline_image_raw_bytes()
954 )
955 )
956
957 @classmethod
958 def _unparse_obj(cls, obj, remap_names):
959 if isinstance(obj, Object):
960 if isinstance(obj, Name):
961 name = obj.unparse(resolved=True)
962 assert isinstance(name, bytes)
963 return remap_names.get(name, name)
964 return obj.unparse(resolved=True)
965 if isinstance(obj, bool):
966 return b'true' if obj else b'false' # Lower case for PDF spec
967 if isinstance(obj, (int, Decimal, float)):
968 return str(obj).encode('ascii')
969 raise NotImplementedError(repr(obj))
970
971 def _metadata(self, name, type_, default):
972 return _metadata_from_obj(self.obj, name, type_, default)
973
974 def unparse(self) -> bytes:
975 """Create the content stream bytes that reproduce this inline image."""
976
977 def metadata_tokens():
978 for metadata_obj in self._image_object:
979 unparsed = self._unparse_obj(
980 metadata_obj, remap_names=self.REVERSE_ABBREVS
981 )
982 assert isinstance(unparsed, bytes)
983 yield unparsed
984
985 def inline_image_tokens():
986 yield b'BI\n'
987 yield b' '.join(m for m in metadata_tokens())
988 yield b'\nID\n'
989 yield self._data._inline_image_raw_bytes()
990 yield b'EI'
991
992 return b''.join(inline_image_tokens())
993
994 @property
995 def icc(self): # pragma: no cover
996 """Raise an exception since ICC profiles are not supported on inline images."""
997 raise InvalidPdfImageError(
998 "Inline images with ICC profiles are not supported in the PDF specification"
999 )
1000
1001 def __repr__(self):
1002 try:
1003 mode = self.mode
1004 except NotImplementedError:
1005 mode = '?'
1006 return (
1007 f'<pikepdf.PdfInlineImage image mode={mode} '
1008 f'size={self.width}x{self.height} at {hex(id(self))}>'
1009 )
1010
1011 def _convert_to_pdfimage(self) -> PdfImage:
1012 # Construct a temporary PDF that holds this inline image, and...
1013 tmppdf = Pdf.new()
1014 tmppdf.add_blank_page(page_size=(self.width, self.height))
1015 tmppdf.pages[0].contents_add(
1016 f'{self.width} 0 0 {self.height} 0 0 cm'.encode('ascii'), prepend=True
1017 )
1018 tmppdf.pages[0].contents_add(self.unparse())
1019
1020 # ...externalize it,
1021 tmppdf.pages[0].externalize_inline_images()
1022 raw_img = cast(Stream, next(im for im in tmppdf.pages[0].images.values()))
1023
1024 # ...then use the regular PdfImage API to extract it.
1025 img = PdfImage(raw_img)
1026 img._set_pdf_source(tmppdf) # Hold tmppdf open while PdfImage exists
1027 return img
1028
1029 def as_pil_image(self) -> Image.Image:
1030 """Return inline image as a Pillow Image."""
1031 return self._convert_to_pdfimage().as_pil_image()
1032
1033 def extract_to(self, *, stream: BinaryIO | None = None, fileprefix: str = ''):
1034 """Extract the inline image directly to a usable image file.
1035
1036 See:
1037 :meth:`PdfImage.extract_to`
1038 """
1039 return self._convert_to_pdfimage().extract_to(
1040 stream=stream, fileprefix=fileprefix
1041 )
1042
1043 def read_bytes(self):
1044 """Return decompressed image bytes."""
1045 # qpdf does not have an API to return this directly, so convert it.
1046 return self._convert_to_pdfimage().read_bytes()
1047
1048 def get_stream_buffer(self):
1049 """Return decompressed stream buffer."""
1050 # qpdf does not have an API to return this directly, so convert it.
1051 return self._convert_to_pdfimage().get_stream_buffer()
1052
1053
1054__all__ = [
1055 'CMYKDecodeArray',
1056 'DecodeArray',
1057 'HifiPrintImageNotTranscodableError',
1058 'ImageDecompressionError',
1059 'InvalidPdfImageError',
1060 'PaletteData',
1061 'PdfImage',
1062 'PdfImageBase',
1063 'PdfInlineImage',
1064 'PdfJpxImage',
1065 'RGBDecodeArray',
1066 'UnsupportedImageTypeError',
1067]