1# SPDX-FileCopyrightText: 2022 James R. Barlow
2# SPDX-License-Identifier: MPL-2.0
3
4"""Extract images embedded in PDF."""
5
6from __future__ import annotations
7
8from abc import ABC, abstractmethod
9from copy import copy
10from decimal import Decimal
11from io import BytesIO
12from itertools import zip_longest
13from pathlib import Path
14from shutil import copyfileobj
15from typing import Any, BinaryIO, Callable, NamedTuple, TypeVar, Union, cast
16
17from PIL import Image
18from PIL.ImageCms import ImageCmsProfile
19
20from pikepdf import jbig2
21from pikepdf._core import Buffer, Pdf, PdfError, StreamDecodeLevel
22from pikepdf._exceptions import DependencyError
23from pikepdf.models import _transcoding
24from pikepdf.models._transcoding import ImageDecompressionError
25from pikepdf.objects import (
26 Array,
27 Dictionary,
28 Name,
29 Object,
30 Stream,
31 String,
32)
33
34T = TypeVar('T')
35
36RGBDecodeArray = tuple[float, float, float, float, float, float]
37GrayDecodeArray = tuple[float, float]
38CMYKDecodeArray = tuple[float, float, float, float, float, float, float, float]
39DecodeArray = Union[RGBDecodeArray, GrayDecodeArray, CMYKDecodeArray]
40
41
42class UnsupportedImageTypeError(Exception):
43 """This image is formatted in a way pikepdf does not supported."""
44
45
46class NotExtractableError(Exception):
47 """Indicates that an image cannot be directly extracted."""
48
49
50class HifiPrintImageNotTranscodableError(NotExtractableError):
51 """Image contains high fidelity printing information and cannot be extracted."""
52
53
54class InvalidPdfImageError(Exception):
55 """This image is not valid according to the PDF 1.7 specification."""
56
57
58def _array_str(value: Object | str | list):
59 """Simplify pikepdf objects to array of str. Keep streams, dictionaries intact."""
60
61 def _convert(item):
62 if isinstance(item, (list, Array)):
63 return [_convert(subitem) for subitem in item]
64 if isinstance(item, (Stream, Dictionary, bytes, int)):
65 return item
66 if isinstance(item, (Name, str)):
67 return str(item)
68 if isinstance(item, (String)):
69 return bytes(item)
70 raise NotImplementedError(value)
71
72 result = _convert(value)
73 if not isinstance(result, list):
74 result = [result]
75 return result
76
77
78def _ensure_list(value: list[Object] | Dictionary | Array | Object) -> list[Object]:
79 """Ensure value is a list of pikepdf.Object, if it was not already.
80
81 To support DecodeParms which can be present as either an array of dicts or a single
82 dict. It's easier to convert to an array of one dict.
83 """
84 if isinstance(value, list):
85 return value
86 return list(value.wrap_in_array().as_list())
87
88
89def _metadata_from_obj(
90 obj: Dictionary | Stream, name: str, type_: Callable[[Any], T], default: T
91) -> T | None:
92 """Retrieve metadata from a dictionary or stream and wrangle types."""
93 val = getattr(obj, name, default)
94 try:
95 return type_(val)
96 except TypeError:
97 if val is None:
98 return None
99 raise NotImplementedError('Metadata access for ' + name)
100
101
102class PaletteData(NamedTuple):
103 """Returns the color space and binary representation of the palette.
104
105 ``base_colorspace`` is typically ``"RGB"`` or ``"L"`` (for grayscale).
106
107 ``palette`` is typically 256 or 256*3=768 bytes, for grayscale and RGB color
108 respectively, with each unit/triplet being the grayscale/RGB triplet values.
109 """
110
111 base_colorspace: str
112 palette: bytes
113
114
115class PdfImageBase(ABC):
116 """Abstract base class for images."""
117
118 SIMPLE_COLORSPACES = {'/DeviceRGB', '/DeviceGray', '/CalRGB', '/CalGray'}
119 MAIN_COLORSPACES = SIMPLE_COLORSPACES | {'/DeviceCMYK', '/CalCMYK', '/ICCBased'}
120 PRINT_COLORSPACES = {'/Separation', '/DeviceN'}
121
122 @abstractmethod
123 def _metadata(self, name: str, type_: Callable[[Any], T], default: T) -> T:
124 """Get metadata for this image type."""
125
126 @property
127 def width(self) -> int:
128 """Width of the image data in pixels."""
129 return self._metadata('Width', int, 0)
130
131 @property
132 def height(self) -> int:
133 """Height of the image data in pixels."""
134 return self._metadata('Height', int, 0)
135
136 @property
137 def image_mask(self) -> bool:
138 """Return ``True`` if this is an image mask."""
139 return self._metadata('ImageMask', bool, False)
140
141 @property
142 def _bpc(self) -> int | None:
143 """Bits per component for this image (low-level)."""
144 return self._metadata('BitsPerComponent', int, 0)
145
146 @property
147 def _colorspaces(self):
148 """Colorspace (low-level)."""
149 return self._metadata('ColorSpace', _array_str, [])
150
151 @property
152 def filters(self):
153 """List of names of the filters that we applied to encode this image."""
154 return self._metadata('Filter', _array_str, [])
155
156 @property
157 def _decode_array(self) -> DecodeArray:
158 """Extract the /Decode array."""
159 decode: list = self._metadata('Decode', _ensure_list, [])
160 if decode and len(decode) in (2, 6, 8):
161 return cast(DecodeArray, tuple(float(value) for value in decode))
162
163 if self.colorspace in ('/DeviceGray', '/CalGray'):
164 return (0.0, 1.0)
165 if self.colorspace in ('/DeviceRGB', '/CalRGB'):
166 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0)
167 if self.colorspace == '/DeviceCMYK':
168 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0, 0.0, 1.0)
169 if self.colorspace == '/ICCBased':
170 if self._approx_mode_from_icc() == 'L':
171 return (0.0, 1.0)
172 if self._approx_mode_from_icc() == 'RGB':
173 return (0.0, 1.0, 0.0, 1.0, 0.0, 1.0)
174
175 raise NotImplementedError(
176 "Don't how to retrieve default /Decode array for image" + repr(self)
177 )
178
179 @property
180 def decode_parms(self):
181 """List of the /DecodeParms, arguments to filters."""
182 return self._metadata('DecodeParms', _ensure_list, [])
183
184 @property
185 def colorspace(self) -> str | None:
186 """PDF name of the colorspace that best describes this image."""
187 if self.image_mask:
188 return None # Undefined for image masks
189 if self._colorspaces:
190 if self._colorspaces[0] in self.MAIN_COLORSPACES:
191 return self._colorspaces[0]
192 if self._colorspaces[0] == '/Indexed':
193 subspace = self._colorspaces[1]
194 if isinstance(subspace, str) and subspace in self.MAIN_COLORSPACES:
195 return subspace
196 if isinstance(subspace, list) and subspace[0] in (
197 '/ICCBased',
198 '/DeviceN',
199 '/CalGray',
200 '/CalRGB',
201 ):
202 return subspace[0]
203 if self._colorspaces[0] == '/DeviceN':
204 return '/DeviceN'
205
206 raise NotImplementedError(
207 "not sure how to get colorspace: " + repr(self._colorspaces)
208 )
209
210 @property
211 def bits_per_component(self) -> int:
212 """Bits per component of this image."""
213 if self._bpc is None or self._bpc == 0:
214 return 1 if self.image_mask else 8
215 return self._bpc
216
217 @property
218 @abstractmethod
219 def icc(self) -> ImageCmsProfile | None:
220 """Return ICC profile for this image if one is defined."""
221
222 @property
223 def indexed(self) -> bool:
224 """Check if the image has a defined color palette."""
225 return '/Indexed' in self._colorspaces
226
227 def _colorspace_has_name(self, name):
228 try:
229 cs = self._colorspaces
230 if cs[0] == '/Indexed' and cs[1][0] == name:
231 return True
232 if cs[0] == name:
233 return True
234 except (IndexError, AttributeError, KeyError):
235 pass
236 return False
237
238 @property
239 def is_device_n(self) -> bool:
240 """Check if image has a /DeviceN (complex printing) colorspace."""
241 return self._colorspace_has_name('/DeviceN')
242
243 @property
244 def is_separation(self) -> bool:
245 """Check if image has a /DeviceN (complex printing) colorspace."""
246 return self._colorspace_has_name('/Separation')
247
248 @property
249 def size(self) -> tuple[int, int]:
250 """Size of image as (width, height)."""
251 return self.width, self.height
252
253 def _approx_mode_from_icc(self):
254 if self.indexed:
255 icc_profile = self._colorspaces[1][1]
256 else:
257 icc_profile = self._colorspaces[1]
258 icc_profile_nchannels = int(icc_profile['/N'])
259
260 if icc_profile_nchannels == 1:
261 return 'L'
262
263 # Multiple channels, need to open the profile and look
264 mode_from_xcolor_space = {'RGB ': 'RGB', 'CMYK': 'CMYK'}
265 xcolor_space = self.icc.profile.xcolor_space
266 return mode_from_xcolor_space.get(xcolor_space, '')
267
268 @property
269 def mode(self) -> str:
270 """``PIL.Image.mode`` equivalent for this image, where possible.
271
272 If an ICC profile is attached to the image, we still attempt to resolve a Pillow
273 mode.
274 """
275 m = ''
276 if self.is_device_n:
277 m = 'DeviceN'
278 elif self.is_separation:
279 m = 'Separation'
280 elif self.indexed:
281 m = 'P'
282 elif self.colorspace == '/DeviceGray' and self.bits_per_component == 1:
283 m = '1'
284 elif self.colorspace == '/DeviceGray' and self.bits_per_component > 1:
285 m = 'L'
286 elif self.colorspace == '/DeviceRGB':
287 m = 'RGB'
288 elif self.colorspace == '/DeviceCMYK':
289 m = 'CMYK'
290 elif self.colorspace == '/ICCBased':
291 try:
292 m = self._approx_mode_from_icc()
293 except (ValueError, TypeError) as e:
294 raise NotImplementedError(
295 "Not sure how to handle PDF image of this type"
296 ) from e
297 if m == '':
298 raise NotImplementedError(
299 "Not sure how to handle PDF image of this type"
300 ) from None
301 return m
302
303 @property
304 def filter_decodeparms(self):
305 """Return normalized the Filter and DecodeParms data.
306
307 PDF has a lot of possible data structures concerning /Filter and
308 /DecodeParms. /Filter can be absent or a name or an array, /DecodeParms
309 can be absent or a dictionary (if /Filter is a name) or an array (if
310 /Filter is an array). When both are arrays the lengths match.
311
312 Normalize this into:
313 [(/FilterName, {/DecodeParmName: Value, ...}), ...]
314
315 The order of /Filter matters as indicates the encoding/decoding sequence.
316 """
317 return list(zip_longest(self.filters, self.decode_parms, fillvalue={}))
318
319 @property
320 def palette(self) -> PaletteData | None:
321 """Retrieve the color palette for this image if applicable."""
322 if not self.indexed:
323 return None
324 try:
325 _idx, base, _hival, lookup = self._colorspaces
326 except ValueError as e:
327 raise ValueError('Not sure how to interpret this palette') from e
328 if self.icc or self.is_device_n or self.is_separation or isinstance(base, list):
329 base = str(base[0])
330 else:
331 base = str(base)
332 lookup = bytes(lookup)
333 if base not in self.MAIN_COLORSPACES and base not in self.PRINT_COLORSPACES:
334 raise NotImplementedError(f"not sure how to interpret this palette: {base}")
335 if base in ('/DeviceRGB', '/CalRGB'):
336 base = 'RGB'
337 elif base in ('/DeviceGray', '/CalGray'):
338 base = 'L'
339 elif base == '/DeviceCMYK':
340 base = 'CMYK'
341 elif base == '/DeviceN':
342 base = 'DeviceN'
343 elif base == '/Separation':
344 base = 'Separation'
345 elif base == '/ICCBased':
346 base = self._approx_mode_from_icc()
347 else:
348 raise NotImplementedError(f"not sure how to interpret this palette: {base}")
349 return PaletteData(base, lookup)
350
351 @abstractmethod
352 def as_pil_image(self) -> Image.Image:
353 """Convert this PDF image to a Python PIL (Pillow) image."""
354
355 def _repr_png_(self) -> bytes:
356 """Display hook for IPython/Jupyter."""
357 b = BytesIO()
358 with self.as_pil_image() as im:
359 im.save(b, 'PNG')
360 return b.getvalue()
361
362
363class PdfImage(PdfImageBase):
364 """Support class to provide a consistent API for manipulating PDF images.
365
366 The data structure for images inside PDFs is irregular and complex,
367 making it difficult to use without introducing errors for less
368 typical cases. This class addresses these difficulties by providing a
369 regular, Pythonic API similar in spirit (and convertible to) the Python
370 Pillow imaging library.
371 """
372
373 obj: Stream
374 _icc: ImageCmsProfile | None
375 _pdf_source: Pdf | None
376
377 def __new__(cls, obj: Stream):
378 """Construct a PdfImage... or a PdfJpxImage if that is what we really are."""
379 try:
380 # Check if JPXDecode is called for and initialize as PdfJpxImage
381 filters = _ensure_list(obj.Filter)
382 if Name.JPXDecode in filters:
383 return super().__new__(PdfJpxImage)
384 except (AttributeError, KeyError):
385 # __init__ will deal with any other errors
386 pass
387 return super().__new__(PdfImage)
388
389 def __init__(self, obj: Stream):
390 """Construct a PDF image from a Image XObject inside a PDF.
391
392 ``pim = PdfImage(page.Resources.XObject['/ImageNN'])``
393
394 Args:
395 obj: an Image XObject
396 """
397 if isinstance(obj, Stream) and obj.stream_dict.get("/Subtype") != "/Image":
398 raise TypeError("can't construct PdfImage from non-image")
399 self.obj = obj
400 self._icc = None
401
402 def __eq__(self, other):
403 if not isinstance(other, PdfImageBase):
404 return NotImplemented
405 return self.obj == other.obj
406
407 @classmethod
408 def _from_pil_image(cls, *, pdf, page, name, image): # pragma: no cover
409 """Insert a PIL image into a PDF (rudimentary).
410
411 Args:
412 pdf (pikepdf.Pdf): the PDF to attach the image to
413 page (pikepdf.Object): the page to attach the image to
414 name (str or pikepdf.Name): the name to set the image
415 image (PIL.Image.Image): the image to insert
416 """
417 data = image.tobytes()
418
419 imstream = Stream(pdf, data)
420 imstream.Type = Name('/XObject')
421 imstream.Subtype = Name('/Image')
422 if image.mode == 'RGB':
423 imstream.ColorSpace = Name('/DeviceRGB')
424 elif image.mode in ('1', 'L'):
425 imstream.ColorSpace = Name('/DeviceGray')
426 imstream.BitsPerComponent = 1 if image.mode == '1' else 8
427 imstream.Width = image.width
428 imstream.Height = image.height
429
430 page.Resources.XObject[name] = imstream
431
432 return cls(imstream)
433
434 def _metadata(self, name, type_, default):
435 return _metadata_from_obj(self.obj, name, type_, default)
436
437 @property
438 def _iccstream(self):
439 if self.colorspace == '/ICCBased':
440 if not self.indexed:
441 return self._colorspaces[1]
442 assert isinstance(self._colorspaces[1], list)
443 return self._colorspaces[1][1]
444 raise NotImplementedError("Don't know how to find ICC stream for image")
445
446 @property
447 def icc(self) -> ImageCmsProfile | None:
448 """If an ICC profile is attached, return a Pillow object that describe it.
449
450 Most of the information may be found in ``icc.profile``.
451 """
452 if self.colorspace not in ('/ICCBased', '/Indexed'):
453 return None
454 if not self._icc:
455 iccstream = self._iccstream
456 iccbuffer = iccstream.get_stream_buffer()
457 iccbytesio = BytesIO(iccbuffer)
458 try:
459 self._icc = ImageCmsProfile(iccbytesio)
460 except OSError as e:
461 if str(e) == 'cannot open profile from string':
462 # ICC profile is corrupt
463 raise UnsupportedImageTypeError(
464 "ICC profile corrupt or not readable"
465 ) from e
466 return self._icc
467
468 def _remove_simple_filters(self):
469 """Remove simple lossless compression where it appears."""
470 COMPLEX_FILTERS = {
471 '/DCTDecode',
472 '/JPXDecode',
473 '/JBIG2Decode',
474 '/CCITTFaxDecode',
475 }
476 indices = [n for n, filt in enumerate(self.filters) if filt in COMPLEX_FILTERS]
477 if len(indices) > 1:
478 raise NotImplementedError(
479 f"Object {self.obj.objgen} has compound complex filters: "
480 f"{self.filters}. We cannot decompress this."
481 )
482 if len(indices) == 0:
483 # No complex filter indices, so all filters are simple - remove them all
484 return self.obj.read_bytes(StreamDecodeLevel.specialized), []
485
486 n = indices[0]
487 if n == 0:
488 # The only filter is complex, so return
489 return self.obj.read_raw_bytes(), self.filters
490
491 obj_copy = copy(self.obj)
492 obj_copy.Filter = Array([Name(f) for f in self.filters[:n]])
493 obj_copy.DecodeParms = Array(self.decode_parms[:n])
494 return obj_copy.read_bytes(StreamDecodeLevel.specialized), self.filters[n:]
495
496 def _extract_direct(self, *, stream: BinaryIO) -> str | None:
497 """Attempt to extract the image directly to a usable image file.
498
499 If there is no way to extract the image without decompressing or
500 transcoding then raise an exception. The type and format of image
501 generated will vary.
502
503 Args:
504 stream: Writable file stream to write data to, e.g. an open file
505 """
506
507 def normal_dct_rgb() -> bool:
508 # Normal DCTDecode RGB images have the default value of
509 # /ColorTransform 1 and are actually in YUV. Such a file can be
510 # saved as a standard JPEG. RGB JPEGs without YUV conversion can't
511 # be saved as JPEGs, and are probably bugs. Some software in the
512 # wild actually produces RGB JPEGs in PDFs (probably a bug).
513 DEFAULT_CT_RGB = 1
514 ct = DEFAULT_CT_RGB
515 if self.filter_decodeparms[0][1] is not None:
516 ct = self.filter_decodeparms[0][1].get(
517 '/ColorTransform', DEFAULT_CT_RGB
518 )
519 return self.mode == 'RGB' and ct == DEFAULT_CT_RGB
520
521 def normal_dct_cmyk() -> bool:
522 # Normal DCTDecode CMYKs have /ColorTransform 0 and can be saved.
523 # There is a YUVK colorspace but CMYK JPEGs don't generally use it
524 DEFAULT_CT_CMYK = 0
525 ct = DEFAULT_CT_CMYK
526 if self.filter_decodeparms[0][1] is not None:
527 ct = self.filter_decodeparms[0][1].get(
528 '/ColorTransform', DEFAULT_CT_CMYK
529 )
530 return self.mode == 'CMYK' and ct == DEFAULT_CT_CMYK
531
532 data, filters = self._remove_simple_filters()
533
534 if filters == ['/CCITTFaxDecode']:
535 if self.colorspace == '/ICCBased':
536 icc = self._iccstream.read_bytes()
537 else:
538 icc = None
539 stream.write(self._generate_ccitt_header(data, icc=icc))
540 stream.write(data)
541 return '.tif'
542 if filters == ['/DCTDecode'] and (
543 self.mode == 'L' or normal_dct_rgb() or normal_dct_cmyk()
544 ):
545 stream.write(data)
546 return '.jpg'
547
548 return None
549
550 def _extract_transcoded_1248bits(self) -> Image.Image:
551 """Extract an image when there are 1/2/4/8 bits packed in byte data."""
552 stride = 0 # tell Pillow to calculate stride from line width
553 scale = 0 if self.mode == 'L' else 1
554 if self.bits_per_component in (2, 4):
555 buffer, stride = _transcoding.unpack_subbyte_pixels(
556 self.read_bytes(), self.size, self.bits_per_component, scale
557 )
558 elif self.bits_per_component == 8:
559 buffer = cast(memoryview, self.get_stream_buffer())
560 else:
561 raise InvalidPdfImageError("BitsPerComponent must be 1, 2, 4, 8, or 16")
562
563 if self.mode == 'P' and self.palette is not None:
564 base_mode, palette = self.palette
565 im = _transcoding.image_from_buffer_and_palette(
566 buffer,
567 self.size,
568 stride,
569 base_mode,
570 palette,
571 )
572 else:
573 im = _transcoding.image_from_byte_buffer(buffer, self.size, stride)
574 return im
575
576 def _extract_transcoded_1bit(self) -> Image.Image:
577 if not self.image_mask and self.mode in ('RGB', 'CMYK'):
578 raise UnsupportedImageTypeError("1-bit RGB and CMYK are not supported")
579 try:
580 data = self.read_bytes()
581 except (RuntimeError, PdfError) as e:
582 if (
583 'read_bytes called on unfilterable stream' in str(e)
584 and not jbig2.get_decoder().available()
585 ):
586 raise DependencyError(
587 "jbig2dec - not installed or installed version is too old "
588 "(older than version 0.15)"
589 ) from None
590 raise
591
592 im = Image.frombytes('1', self.size, data)
593
594 if self.palette is not None:
595 base_mode, palette = self.palette
596 im = _transcoding.fix_1bit_palette_image(im, base_mode, palette)
597
598 return im
599
600 def _extract_transcoded_mask(self) -> Image.Image:
601 return self._extract_transcoded_1bit()
602
603 def _extract_transcoded(self) -> Image.Image:
604 if self.image_mask:
605 return self._extract_transcoded_mask()
606
607 if self.mode in {'DeviceN', 'Separation'}:
608 raise HifiPrintImageNotTranscodableError()
609
610 if self.mode == 'RGB' and self.bits_per_component == 8:
611 # Cannot use the zero-copy .get_stream_buffer here, we have 3-byte
612 # RGB and Pillow needs RGBX.
613 im = Image.frombuffer(
614 'RGB', self.size, self.read_bytes(), 'raw', 'RGB', 0, 1
615 )
616 elif self.mode == 'CMYK' and self.bits_per_component == 8:
617 im = Image.frombuffer(
618 'CMYK', self.size, self.get_stream_buffer(), 'raw', 'CMYK', 0, 1
619 )
620 # elif self.mode == '1':
621 elif self.bits_per_component == 1:
622 im = self._extract_transcoded_1bit()
623 elif self.mode in ('L', 'P') and self.bits_per_component <= 8:
624 im = self._extract_transcoded_1248bits()
625 else:
626 raise UnsupportedImageTypeError(repr(self) + ", " + repr(self.obj))
627
628 if self.colorspace == '/ICCBased' and self.icc is not None:
629 im.info['icc_profile'] = self.icc.tobytes()
630
631 return im
632
633 def _extract_to_stream(self, *, stream: BinaryIO) -> str:
634 """Extract the image to a stream.
635
636 If possible, the compressed data is extracted and inserted into
637 a compressed image file format without transcoding the compressed
638 content. If this is not possible, the data will be decompressed
639 and extracted to an appropriate format.
640
641 Args:
642 stream: Writable stream to write data to
643
644 Returns:
645 The file format extension.
646 """
647 direct_extraction = self._extract_direct(stream=stream)
648 if direct_extraction:
649 return direct_extraction
650
651 im = None
652 try:
653 im = self._extract_transcoded()
654 if im.mode == 'CMYK':
655 im.save(stream, format='tiff', compression='tiff_adobe_deflate')
656 return '.tiff'
657 if im:
658 im.save(stream, format='png')
659 return '.png'
660 except PdfError as e:
661 if 'called on unfilterable stream' in str(e):
662 raise UnsupportedImageTypeError(repr(self)) from e
663 raise
664 finally:
665 if im:
666 im.close()
667
668 raise UnsupportedImageTypeError(repr(self))
669
670 def extract_to(
671 self, *, stream: BinaryIO | None = None, fileprefix: str = ''
672 ) -> str:
673 """Extract the image directly to a usable image file.
674
675 If possible, the compressed data is extracted and inserted into
676 a compressed image file format without transcoding the compressed
677 content. If this is not possible, the data will be decompressed
678 and extracted to an appropriate format.
679
680 Because it is not known until attempted what image format will be
681 extracted, users should not assume what format they are getting back.
682 When saving the image to a file, use a temporary filename, and then
683 rename the file to its final name based on the returned file extension.
684
685 Images might be saved as any of .png, .jpg, or .tiff.
686
687 Examples:
688 >>> im.extract_to(stream=bytes_io) # doctest: +SKIP
689 '.png'
690
691 >>> im.extract_to(fileprefix='/tmp/image00') # doctest: +SKIP
692 '/tmp/image00.jpg'
693
694 Args:
695 stream: Writable stream to write data to.
696 fileprefix (str or Path): The path to write the extracted image to,
697 without the file extension.
698
699 Returns:
700 If *fileprefix* was provided, then the fileprefix with the
701 appropriate extension. If no *fileprefix*, then an extension
702 indicating the file type.
703 """
704 if bool(stream) == bool(fileprefix):
705 raise ValueError("Cannot set both stream and fileprefix")
706 if stream:
707 return self._extract_to_stream(stream=stream)
708
709 bio = BytesIO()
710 extension = self._extract_to_stream(stream=bio)
711 bio.seek(0)
712 filepath = Path(str(Path(fileprefix)) + extension)
713 with filepath.open('wb') as target:
714 copyfileobj(bio, target)
715 return str(filepath)
716
717 def read_bytes(
718 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized
719 ) -> bytes:
720 """Decompress this image and return it as unencoded bytes."""
721 return self.obj.read_bytes(decode_level=decode_level)
722
723 def get_stream_buffer(
724 self, decode_level: StreamDecodeLevel = StreamDecodeLevel.specialized
725 ) -> Buffer:
726 """Access this image with the buffer protocol."""
727 return self.obj.get_stream_buffer(decode_level=decode_level)
728
729 def as_pil_image(self) -> Image.Image:
730 """Extract the image as a Pillow Image, using decompression as necessary.
731
732 Caller must close the image.
733 """
734 bio = BytesIO()
735 direct_extraction = self._extract_direct(stream=bio)
736 if direct_extraction:
737 bio.seek(0)
738 return Image.open(bio)
739
740 im = self._extract_transcoded()
741 if not im:
742 raise UnsupportedImageTypeError(repr(self))
743
744 return im
745
746 def _generate_ccitt_header(self, data: bytes, icc: bytes | None = None) -> bytes:
747 """Construct a CCITT G3 or G4 header from the PDF metadata."""
748 # https://stackoverflow.com/questions/2641770/
749 # https://www.itu.int/itudoc/itu-t/com16/tiff-fx/docs/tiff6.pdf
750
751 if not self.decode_parms:
752 raise ValueError("/CCITTFaxDecode without /DecodeParms")
753
754 expected_defaults = [
755 ("/EncodedByteAlign", False),
756 ]
757 for name, val in expected_defaults:
758 if self.decode_parms[0].get(name, val) != val:
759 raise UnsupportedImageTypeError(
760 f"/CCITTFaxDecode with decode parameter {name} not equal {val}"
761 )
762
763 k = self.decode_parms[0].get("/K", 0)
764 t4_options = None
765 if k < 0:
766 ccitt_group = 4 # Group 4
767 elif k > 0:
768 ccitt_group = 3 # Group 3 2-D
769 t4_options = 1
770 else:
771 ccitt_group = 3 # Group 3 1-D
772 black_is_one = self.decode_parms[0].get("/BlackIs1", False)
773 decode = self._decode_array
774 # PDF spec says:
775 # BlackIs1: A flag indicating whether 1 bits shall be interpreted as black
776 # pixels and 0 bits as white pixels, the reverse of the normal
777 # PDF convention for image data. Default value: false.
778 # TIFF spec says:
779 # use 0 for white_is_zero (=> black is 1) MINISWHITE
780 # use 1 for black_is_zero (=> white is 1) MINISBLACK
781 photometry = 1 if black_is_one else 0
782
783 # If Decode is [1, 0] then the photometry is inverted
784 if len(decode) == 2 and decode == (1.0, 0.0):
785 photometry = 1 - photometry
786
787 img_size = len(data)
788 if icc is None:
789 icc = b''
790
791 return _transcoding.generate_ccitt_header(
792 self.size,
793 data_length=img_size,
794 ccitt_group=ccitt_group,
795 t4_options=t4_options,
796 photometry=photometry,
797 icc=icc,
798 )
799
800 def show(self): # pragma: no cover
801 """Show the image however PIL wants to."""
802 self.as_pil_image().show()
803
804 def _set_pdf_source(self, pdf: Pdf):
805 self._pdf_source = pdf
806
807 def __repr__(self):
808 try:
809 mode = self.mode
810 except NotImplementedError:
811 mode = '?'
812 return (
813 f'<pikepdf.PdfImage image mode={mode} '
814 f'size={self.width}x{self.height} at {hex(id(self))}>'
815 )
816
817
818class PdfJpxImage(PdfImage):
819 """Support class for JPEG 2000 images. Implements the same API as :class:`PdfImage`.
820
821 If you call PdfImage(object_that_is_actually_jpeg2000_image), pikepdf will return
822 this class instead, due to the check in PdfImage.__new__.
823 """
824
825 def __init__(self, obj):
826 """Initialize a JPEG 2000 image."""
827 super().__init__(obj)
828 self._jpxpil = self.as_pil_image()
829
830 def __eq__(self, other):
831 if not isinstance(other, PdfImageBase):
832 return NotImplemented
833 return (
834 self.obj == other.obj
835 and isinstance(other, PdfJpxImage)
836 and self._jpxpil == other._jpxpil
837 )
838
839 def _extract_direct(self, *, stream: BinaryIO) -> str | None:
840 data, filters = self._remove_simple_filters()
841 if filters != ['/JPXDecode']:
842 return None
843 stream.write(data)
844 return '.jp2'
845
846 def _extract_transcoded(self) -> Image.Image:
847 return super()._extract_transcoded()
848
849 @property
850 def _colorspaces(self):
851 """Return the effective colorspace of a JPEG 2000 image.
852
853 If the ColorSpace dictionary is present, the colorspace embedded in the
854 JPEG 2000 data will be ignored, as required by the specification.
855 """
856 # (PDF 1.7 Table 89) If ColorSpace is present, any colour space
857 # specifications in the JPEG2000 data shall be ignored.
858 super_colorspaces = super()._colorspaces
859 if super_colorspaces:
860 return super_colorspaces
861 if self._jpxpil.mode == 'L':
862 return ['/DeviceGray']
863 if self._jpxpil.mode == 'RGB':
864 return ['/DeviceRGB']
865 raise NotImplementedError('Complex JP2 colorspace')
866
867 @property
868 def _bpc(self) -> int:
869 """Return 8, since bpc is not meaningful for JPEG 2000 encoding."""
870 # (PDF 1.7 Table 89) If the image stream uses the JPXDecode filter, this
871 # entry is optional and shall be ignored if present. The bit depth is
872 # determined by the conforming reader in the process of decoding the
873 # JPEG2000 image.
874 return 8
875
876 @property
877 def indexed(self) -> bool:
878 """Return False, since JPEG 2000 should not be indexed."""
879 # Nothing in the spec precludes an Indexed JPXDecode image, except for
880 # the fact that doing so is madness. Let's assume it no one is that
881 # insane.
882 return False
883
884 def __repr__(self):
885 return (
886 f'<pikepdf.PdfJpxImage JPEG2000 image mode={self.mode} '
887 f'size={self.width}x{self.height} at {hex(id(self))}>'
888 )
889
890
891class PdfInlineImage(PdfImageBase):
892 """Support class for PDF inline images."""
893
894 # Inline images can contain abbreviations that we write automatically
895 ABBREVS = {
896 b'/W': b'/Width',
897 b'/H': b'/Height',
898 b'/BPC': b'/BitsPerComponent',
899 b'/IM': b'/ImageMask',
900 b'/CS': b'/ColorSpace',
901 b'/F': b'/Filter',
902 b'/DP': b'/DecodeParms',
903 b'/G': b'/DeviceGray',
904 b'/RGB': b'/DeviceRGB',
905 b'/CMYK': b'/DeviceCMYK',
906 b'/I': b'/Indexed',
907 b'/AHx': b'/ASCIIHexDecode',
908 b'/A85': b'/ASCII85Decode',
909 b'/LZW': b'/LZWDecode',
910 b'/RL': b'/RunLengthDecode',
911 b'/CCF': b'/CCITTFaxDecode',
912 b'/DCT': b'/DCTDecode',
913 }
914 REVERSE_ABBREVS = {v: k for k, v in ABBREVS.items()}
915
916 _data: Object
917 _image_object: tuple[Object, ...]
918
919 def __init__(self, *, image_data: Object, image_object: tuple):
920 """Construct wrapper for inline image.
921
922 Args:
923 image_data: data stream for image, extracted from content stream
924 image_object: the metadata for image, also from content stream
925 """
926 # Convert the sequence of pikepdf.Object from the content stream into
927 # a dictionary object by unparsing it (to bytes), eliminating inline
928 # image abbreviations, and constructing a bytes string equivalent to
929 # what an image XObject would look like. Then retrieve data from there
930
931 self._data = image_data
932 self._image_object = image_object
933
934 reparse = b' '.join(
935 self._unparse_obj(obj, remap_names=self.ABBREVS) for obj in image_object
936 )
937 try:
938 reparsed_obj = Object.parse(b'<< ' + reparse + b' >>')
939 except PdfError as e:
940 raise PdfError("parsing inline " + reparse.decode('unicode_escape')) from e
941 self.obj = reparsed_obj
942
943 def __eq__(self, other):
944 if not isinstance(other, PdfImageBase):
945 return NotImplemented
946 return (
947 self.obj == other.obj
948 and isinstance(other, PdfInlineImage)
949 and (
950 self._data._inline_image_raw_bytes()
951 == other._data._inline_image_raw_bytes()
952 )
953 )
954
955 @classmethod
956 def _unparse_obj(cls, obj, remap_names):
957 if isinstance(obj, Object):
958 if isinstance(obj, Name):
959 name = obj.unparse(resolved=True)
960 assert isinstance(name, bytes)
961 return remap_names.get(name, name)
962 return obj.unparse(resolved=True)
963 if isinstance(obj, bool):
964 return b'true' if obj else b'false' # Lower case for PDF spec
965 if isinstance(obj, (int, Decimal, float)):
966 return str(obj).encode('ascii')
967 raise NotImplementedError(repr(obj))
968
969 def _metadata(self, name, type_, default):
970 return _metadata_from_obj(self.obj, name, type_, default)
971
972 def unparse(self) -> bytes:
973 """Create the content stream bytes that reproduce this inline image."""
974
975 def metadata_tokens():
976 for metadata_obj in self._image_object:
977 unparsed = self._unparse_obj(
978 metadata_obj, remap_names=self.REVERSE_ABBREVS
979 )
980 assert isinstance(unparsed, bytes)
981 yield unparsed
982
983 def inline_image_tokens():
984 yield b'BI\n'
985 yield b' '.join(m for m in metadata_tokens())
986 yield b'\nID\n'
987 yield self._data._inline_image_raw_bytes()
988 yield b'EI'
989
990 return b''.join(inline_image_tokens())
991
992 @property
993 def icc(self): # pragma: no cover
994 """Raise an exception since ICC profiles are not supported on inline images."""
995 raise InvalidPdfImageError(
996 "Inline images with ICC profiles are not supported in the PDF specification"
997 )
998
999 def __repr__(self):
1000 try:
1001 mode = self.mode
1002 except NotImplementedError:
1003 mode = '?'
1004 return (
1005 f'<pikepdf.PdfInlineImage image mode={mode} '
1006 f'size={self.width}x{self.height} at {hex(id(self))}>'
1007 )
1008
1009 def _convert_to_pdfimage(self) -> PdfImage:
1010 # Construct a temporary PDF that holds this inline image, and...
1011 tmppdf = Pdf.new()
1012 tmppdf.add_blank_page(page_size=(self.width, self.height))
1013 tmppdf.pages[0].contents_add(
1014 f'{self.width} 0 0 {self.height} 0 0 cm'.encode('ascii'), prepend=True
1015 )
1016 tmppdf.pages[0].contents_add(self.unparse())
1017
1018 # ...externalize it,
1019 tmppdf.pages[0].externalize_inline_images()
1020 raw_img = cast(Stream, next(im for im in tmppdf.pages[0].images.values()))
1021
1022 # ...then use the regular PdfImage API to extract it.
1023 img = PdfImage(raw_img)
1024 img._set_pdf_source(tmppdf) # Hold tmppdf open while PdfImage exists
1025 return img
1026
1027 def as_pil_image(self) -> Image.Image:
1028 """Return inline image as a Pillow Image."""
1029 return self._convert_to_pdfimage().as_pil_image()
1030
1031 def extract_to(self, *, stream: BinaryIO | None = None, fileprefix: str = ''):
1032 """Extract the inline image directly to a usable image file.
1033
1034 See:
1035 :meth:`PdfImage.extract_to`
1036 """
1037 return self._convert_to_pdfimage().extract_to(
1038 stream=stream, fileprefix=fileprefix
1039 )
1040
1041 def read_bytes(self):
1042 """Return decompressed image bytes."""
1043 # qpdf does not have an API to return this directly, so convert it.
1044 return self._convert_to_pdfimage().read_bytes()
1045
1046 def get_stream_buffer(self):
1047 """Return decompressed stream buffer."""
1048 # qpdf does not have an API to return this directly, so convert it.
1049 return self._convert_to_pdfimage().get_stream_buffer()
1050
1051
1052__all__ = [
1053 'CMYKDecodeArray',
1054 'DecodeArray',
1055 'HifiPrintImageNotTranscodableError',
1056 'ImageDecompressionError',
1057 'InvalidPdfImageError',
1058 'PaletteData',
1059 'PdfImage',
1060 'PdfImageBase',
1061 'PdfInlineImage',
1062 'PdfJpxImage',
1063 'RGBDecodeArray',
1064 'UnsupportedImageTypeError',
1065]