1# SPDX-FileCopyrightText: 2022 James R. Barlow
2# SPDX-License-Identifier: MPL-2.0
3
4from __future__ import annotations
5
6import struct
7from collections.abc import Callable
8from typing import TYPE_CHECKING, Any, NamedTuple
9
10if TYPE_CHECKING:
11 from PIL import Image
12
13class ImageDecompressionError(Exception):
14 """Image decompression error."""
15
16
17BytesLike = bytes | memoryview
18MutableBytesLike = bytearray | memoryview
19
20
21def _next_multiple(n: int, k: int) -> int:
22 """Return the multiple of k that is greater than or equal n.
23
24 >>> _next_multiple(101, 4)
25 104
26 >>> _next_multiple(100, 4)
27 100
28 """
29 div, mod = divmod(n, k)
30 if mod > 0:
31 div += 1
32 return div * k
33
34
35def unpack_subbyte_pixels(
36 packed: BytesLike, size: tuple[int, int], bits: int, scale: int = 0
37) -> tuple[BytesLike, int]:
38 """Unpack subbyte *bits* pixels into full bytes and rescale.
39
40 When scale is 0, the appropriate scale is calculated.
41 e.g. for 2-bit, the scale is adjusted so that
42 0b00 = 0.00 = 0x00
43 0b01 = 0.33 = 0x55
44 0b10 = 0.66 = 0xaa
45 0b11 = 1.00 = 0xff
46 When scale is 1, no scaling is applied, appropriate when
47 the bytes are palette indexes.
48 """
49 width, height = size
50 bits_per_byte = 8 // bits
51 stride = _next_multiple(width, bits_per_byte)
52 buffer = bytearray(bits_per_byte * stride * height)
53 max_read = len(buffer) // bits_per_byte
54 if scale == 0:
55 scale = 255 / ((2**bits) - 1)
56 if bits == 4:
57 _4bit_inner_loop(packed[:max_read], buffer, scale)
58 elif bits == 2:
59 _2bit_inner_loop(packed[:max_read], buffer, scale)
60 # elif bits == 1:
61 # _1bit_inner_loop(packed[:max_read], buffer, scale)
62 else:
63 raise NotImplementedError(bits)
64 return memoryview(buffer), stride
65
66
67# def _1bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None:
68# """Unpack 1-bit values to their 8-bit equivalents.
69
70# Thus *out* must be 8x at long as *in*.
71# """
72# for n, val in enumerate(in_):
73# out[8 * n + 0] = int((val >> 7) & 0b1) * scale
74# out[8 * n + 1] = int((val >> 6) & 0b1) * scale
75# out[8 * n + 2] = int((val >> 5) & 0b1) * scale
76# out[8 * n + 3] = int((val >> 4) & 0b1) * scale
77# out[8 * n + 4] = int((val >> 3) & 0b1) * scale
78# out[8 * n + 5] = int((val >> 2) & 0b1) * scale
79# out[8 * n + 6] = int((val >> 1) & 0b1) * scale
80# out[8 * n + 7] = int((val >> 0) & 0b1) * scale
81
82
83def _2bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None:
84 """Unpack 2-bit values to their 8-bit equivalents.
85
86 Thus *out* must be 4x at long as *in*.
87
88 Images of this type are quite rare in practice, so we don't
89 optimize this loop.
90 """
91 for n, val in enumerate(in_):
92 out[4 * n] = int((val >> 6) * scale)
93 out[4 * n + 1] = int(((val >> 4) & 0b11) * scale)
94 out[4 * n + 2] = int(((val >> 2) & 0b11) * scale)
95 out[4 * n + 3] = int((val & 0b11) * scale)
96
97
98def _4bit_inner_loop(in_: BytesLike, out: MutableBytesLike, scale: int) -> None:
99 """Unpack 4-bit values to their 8-bit equivalents.
100
101 Thus *out* must be 2x at long as *in*.
102
103 Images of this type are quite rare in practice, so we don't
104 optimize this loop.
105 """
106 for n, val in enumerate(in_):
107 out[2 * n] = int((val >> 4) * scale)
108 out[2 * n + 1] = int((val & 0b1111) * scale)
109
110
111def image_from_byte_buffer(buffer: BytesLike, size: tuple[int, int], stride: int):
112 """Use Pillow to create one-component image from a byte buffer.
113
114 *stride* is the number of bytes per row, and is essential for packed bits
115 with odd image widths.
116 """
117 from PIL import Image
118
119 ystep = 1 # image is top to bottom in memory
120 # Even if the image is type 'P' (palette), we create it as a 'L' grayscale
121 # at this step. The palette is attached later.
122 try:
123 return Image.frombuffer('L', size, buffer, "raw", 'L', stride, ystep)
124 except ValueError as e:
125 if 'buffer is not large enough' in str(e):
126 # If Pillow says the buffer is not large enough, then we're going
127 # to guess that it's padded to a multiple of 4 bytes. In practice
128 # the image may just be corrupted.
129 try:
130 return Image.frombuffer(
131 'L', size, buffer, "raw", 'L', (size[0] + 3) // 4, ystep
132 )
133 except ValueError as e2:
134 raise ImageDecompressionError(str(e2)) from e2
135 else:
136 raise ImageDecompressionError() from e
137
138
139def _make_rgb_palette(gray_palette: bytes) -> bytes:
140 palette = b''
141 for entry in gray_palette:
142 palette += bytes([entry]) * 3
143 return palette
144
145
146def _depalettize_cmyk(buffer: BytesLike, palette: BytesLike):
147 with memoryview(buffer) as mv:
148 output = bytearray(4 * len(mv))
149 for n, pal_idx in enumerate(mv):
150 output[4 * n : 4 * (n + 1)] = palette[4 * pal_idx : 4 * (pal_idx + 1)]
151 return output
152
153
154def image_from_buffer_and_palette(
155 buffer: BytesLike,
156 size: tuple[int, int],
157 stride: int,
158 base_mode: str,
159 palette: BytesLike,
160) -> Image.Image:
161 """Construct an image from a byte buffer and apply the palette.
162
163 1/2/4-bit images must be unpacked (no scaling!) to byte buffers first, such
164 that every 8-bit integer is an index into the palette.
165 """
166 if base_mode == 'RGB':
167 im = image_from_byte_buffer(buffer, size, stride)
168 im.putpalette(palette, rawmode=base_mode)
169 elif base_mode == 'L':
170 # Pillow does not fully support palettes with rawmode='L'.
171 # Convert to RGB palette.
172 gray_palette = _make_rgb_palette(palette)
173 im = image_from_byte_buffer(buffer, size, stride)
174 im.putpalette(gray_palette, rawmode='RGB')
175 elif base_mode == 'CMYK':
176 from PIL import Image
177
178 # Pillow does not support CMYK with palettes; convert manually
179 output = _depalettize_cmyk(buffer, palette)
180 im = Image.frombuffer('CMYK', size, data=output, decoder_name='raw')
181 else:
182 raise NotImplementedError(f'palette with {base_mode}')
183 return im
184
185
186def fix_1bit_palette_image(
187 im: Image.Image, base_mode: str, palette: BytesLike
188) -> Image.Image:
189 """Apply palettes to 1-bit images."""
190 im = im.convert('P')
191 if base_mode == 'RGB' and len(palette) == 6:
192 # rgbrgb -> rgb000000...rgb
193 expanded_palette = b''.join(
194 [palette[0:3], (b'\x00\x00\x00' * (256 - 2)), palette[3:6]]
195 )
196 im.putpalette(expanded_palette, rawmode='RGB')
197 elif base_mode == 'L':
198 try:
199 im.putpalette(palette, rawmode='L')
200 except ValueError as e:
201 if 'unrecognized raw mode' in str(e):
202 rgb_palette = _make_rgb_palette(palette)
203 im.putpalette(rgb_palette, rawmode='RGB')
204 return im
205
206
207def generate_ccitt_header(
208 size: tuple[int, int],
209 *,
210 data_length: int,
211 ccitt_group: int,
212 t4_options: int | None,
213 photometry: int,
214 icc: bytes,
215) -> bytes:
216 """Generate binary CCITT header for image with given parameters."""
217 tiff_header_struct = '<' + '2s' + 'H' + 'L' + 'H'
218 from PIL.TiffTags import TAGS_V2 as TIFF_TAGS
219
220 tag_keys = {tag.name: key for key, tag in TIFF_TAGS.items()} # type: ignore
221 ifd_struct = '<HHLL'
222
223 class IFD(NamedTuple):
224 key: int
225 typecode: Any
226 count_: int
227 data: int | Callable[[], int | None]
228
229 ifds: list[IFD] = []
230
231 def header_length(ifd_count) -> int:
232 return (
233 struct.calcsize(tiff_header_struct)
234 + struct.calcsize(ifd_struct) * ifd_count
235 + 4
236 )
237
238 def add_ifd(tag_name: str, data: int | Callable[[], int | None], count: int = 1):
239
240 key = tag_keys[tag_name]
241 typecode = TIFF_TAGS[key].type # type: ignore
242 ifds.append(IFD(key, typecode, count, data))
243
244 image_offset = None
245 width, height = size
246 add_ifd('ImageWidth', width)
247 add_ifd('ImageLength', height)
248 add_ifd('BitsPerSample', 1)
249 add_ifd('Compression', ccitt_group)
250 add_ifd('FillOrder', 1)
251 if t4_options is not None:
252 add_ifd('T4Options', t4_options)
253 add_ifd('PhotometricInterpretation', photometry)
254 add_ifd('StripOffsets', lambda: image_offset)
255 add_ifd('RowsPerStrip', height)
256 add_ifd('StripByteCounts', data_length)
257
258 icc_offset = 0
259 if icc:
260 add_ifd('ICCProfile', lambda: icc_offset, count=len(icc))
261
262 icc_offset = header_length(len(ifds))
263 image_offset = icc_offset + len(icc)
264
265 ifd_args = [(arg() if callable(arg) else arg) for ifd in ifds for arg in ifd]
266 tiff_header = struct.pack(
267 (tiff_header_struct + ifd_struct[1:] * len(ifds) + 'L'),
268 b'II', # Byte order indication: Little endian
269 42, # Version number (always 42)
270 8, # Offset to first IFD
271 len(ifds), # Number of tags in IFD
272 *ifd_args,
273 0, # Last IFD
274 )
275
276 if icc:
277 tiff_header += icc
278 return tiff_header