1from __future__ import annotations
2
3from io import BytesIO
4from typing import IO, Any
5
6from . import Image, ImageFile
7
8try:
9 from . import _webp
10
11 SUPPORTED = True
12except ImportError:
13 SUPPORTED = False
14
15
16_VP8_MODES_BY_IDENTIFIER = {
17 b"VP8 ": "RGB",
18 b"VP8X": "RGBA",
19 b"VP8L": "RGBA", # lossless
20}
21
22
23def _accept(prefix: bytes) -> bool | str:
24 is_riff_file_format = prefix.startswith(b"RIFF")
25 is_webp_file = prefix[8:12] == b"WEBP"
26 is_valid_vp8_mode = prefix[12:16] in _VP8_MODES_BY_IDENTIFIER
27
28 if is_riff_file_format and is_webp_file and is_valid_vp8_mode:
29 if not SUPPORTED:
30 return (
31 "image file could not be identified because WEBP support not installed"
32 )
33 return True
34 return False
35
36
37class WebPImageFile(ImageFile.ImageFile):
38 format = "WEBP"
39 format_description = "WebP image"
40 __loaded = 0
41 __logical_frame = 0
42
43 def _open(self) -> None:
44 # Use the newer AnimDecoder API to parse the (possibly) animated file,
45 # and access muxed chunks like ICC/EXIF/XMP.
46 self._decoder = _webp.WebPAnimDecoder(self.fp.read())
47
48 # Get info from decoder
49 self._size, loop_count, bgcolor, frame_count, mode = self._decoder.get_info()
50 self.info["loop"] = loop_count
51 bg_a, bg_r, bg_g, bg_b = (
52 (bgcolor >> 24) & 0xFF,
53 (bgcolor >> 16) & 0xFF,
54 (bgcolor >> 8) & 0xFF,
55 bgcolor & 0xFF,
56 )
57 self.info["background"] = (bg_r, bg_g, bg_b, bg_a)
58 self.n_frames = frame_count
59 self.is_animated = self.n_frames > 1
60 self._mode = "RGB" if mode == "RGBX" else mode
61 self.rawmode = mode
62
63 # Attempt to read ICC / EXIF / XMP chunks from file
64 icc_profile = self._decoder.get_chunk("ICCP")
65 exif = self._decoder.get_chunk("EXIF")
66 xmp = self._decoder.get_chunk("XMP ")
67 if icc_profile:
68 self.info["icc_profile"] = icc_profile
69 if exif:
70 self.info["exif"] = exif
71 if xmp:
72 self.info["xmp"] = xmp
73
74 # Initialize seek state
75 self._reset(reset=False)
76
77 def _getexif(self) -> dict[int, Any] | None:
78 if "exif" not in self.info:
79 return None
80 return self.getexif()._get_merged_dict()
81
82 def seek(self, frame: int) -> None:
83 if not self._seek_check(frame):
84 return
85
86 # Set logical frame to requested position
87 self.__logical_frame = frame
88
89 def _reset(self, reset: bool = True) -> None:
90 if reset:
91 self._decoder.reset()
92 self.__physical_frame = 0
93 self.__loaded = -1
94 self.__timestamp = 0
95
96 def _get_next(self) -> tuple[bytes, int, int]:
97 # Get next frame
98 ret = self._decoder.get_next()
99 self.__physical_frame += 1
100
101 # Check if an error occurred
102 if ret is None:
103 self._reset() # Reset just to be safe
104 self.seek(0)
105 msg = "failed to decode next frame in WebP file"
106 raise EOFError(msg)
107
108 # Compute duration
109 data, timestamp = ret
110 duration = timestamp - self.__timestamp
111 self.__timestamp = timestamp
112
113 # libwebp gives frame end, adjust to start of frame
114 timestamp -= duration
115 return data, timestamp, duration
116
117 def _seek(self, frame: int) -> None:
118 if self.__physical_frame == frame:
119 return # Nothing to do
120 if frame < self.__physical_frame:
121 self._reset() # Rewind to beginning
122 while self.__physical_frame < frame:
123 self._get_next() # Advance to the requested frame
124
125 def load(self) -> Image.core.PixelAccess | None:
126 if self.__loaded != self.__logical_frame:
127 self._seek(self.__logical_frame)
128
129 # We need to load the image data for this frame
130 data, timestamp, duration = self._get_next()
131 self.info["timestamp"] = timestamp
132 self.info["duration"] = duration
133 self.__loaded = self.__logical_frame
134
135 # Set tile
136 if self.fp and self._exclusive_fp:
137 self.fp.close()
138 self.fp = BytesIO(data)
139 self.tile = [ImageFile._Tile("raw", (0, 0) + self.size, 0, self.rawmode)]
140
141 return super().load()
142
143 def load_seek(self, pos: int) -> None:
144 pass
145
146 def tell(self) -> int:
147 return self.__logical_frame
148
149
150def _convert_frame(im: Image.Image) -> Image.Image:
151 # Make sure image mode is supported
152 if im.mode not in ("RGBX", "RGBA", "RGB"):
153 im = im.convert("RGBA" if im.has_transparency_data else "RGB")
154 return im
155
156
157def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
158 encoderinfo = im.encoderinfo.copy()
159 append_images = list(encoderinfo.get("append_images", []))
160
161 # If total frame count is 1, then save using the legacy API, which
162 # will preserve non-alpha modes
163 total = 0
164 for ims in [im] + append_images:
165 total += getattr(ims, "n_frames", 1)
166 if total == 1:
167 _save(im, fp, filename)
168 return
169
170 background: int | tuple[int, ...] = (0, 0, 0, 0)
171 if "background" in encoderinfo:
172 background = encoderinfo["background"]
173 elif "background" in im.info:
174 background = im.info["background"]
175 if isinstance(background, int):
176 # GifImagePlugin stores a global color table index in
177 # info["background"]. So it must be converted to an RGBA value
178 palette = im.getpalette()
179 if palette:
180 r, g, b = palette[background * 3 : (background + 1) * 3]
181 background = (r, g, b, 255)
182 else:
183 background = (background, background, background, 255)
184
185 duration = im.encoderinfo.get("duration", im.info.get("duration", 0))
186 loop = im.encoderinfo.get("loop", 0)
187 minimize_size = im.encoderinfo.get("minimize_size", False)
188 kmin = im.encoderinfo.get("kmin", None)
189 kmax = im.encoderinfo.get("kmax", None)
190 allow_mixed = im.encoderinfo.get("allow_mixed", False)
191 verbose = False
192 lossless = im.encoderinfo.get("lossless", False)
193 quality = im.encoderinfo.get("quality", 80)
194 alpha_quality = im.encoderinfo.get("alpha_quality", 100)
195 method = im.encoderinfo.get("method", 0)
196 icc_profile = im.encoderinfo.get("icc_profile") or ""
197 exif = im.encoderinfo.get("exif", "")
198 if isinstance(exif, Image.Exif):
199 exif = exif.tobytes()
200 xmp = im.encoderinfo.get("xmp", "")
201 if allow_mixed:
202 lossless = False
203
204 # Sensible keyframe defaults are from gif2webp.c script
205 if kmin is None:
206 kmin = 9 if lossless else 3
207 if kmax is None:
208 kmax = 17 if lossless else 5
209
210 # Validate background color
211 if (
212 not isinstance(background, (list, tuple))
213 or len(background) != 4
214 or not all(0 <= v < 256 for v in background)
215 ):
216 msg = f"Background color is not an RGBA tuple clamped to (0-255): {background}"
217 raise OSError(msg)
218
219 # Convert to packed uint
220 bg_r, bg_g, bg_b, bg_a = background
221 background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0)
222
223 # Setup the WebP animation encoder
224 enc = _webp.WebPAnimEncoder(
225 im.size,
226 background,
227 loop,
228 minimize_size,
229 kmin,
230 kmax,
231 allow_mixed,
232 verbose,
233 )
234
235 # Add each frame
236 frame_idx = 0
237 timestamp = 0
238 cur_idx = im.tell()
239 try:
240 for ims in [im] + append_images:
241 # Get number of frames in this image
242 nfr = getattr(ims, "n_frames", 1)
243
244 for idx in range(nfr):
245 ims.seek(idx)
246
247 frame = _convert_frame(ims)
248
249 # Append the frame to the animation encoder
250 enc.add(
251 frame.getim(),
252 round(timestamp),
253 lossless,
254 quality,
255 alpha_quality,
256 method,
257 )
258
259 # Update timestamp and frame index
260 if isinstance(duration, (list, tuple)):
261 timestamp += duration[frame_idx]
262 else:
263 timestamp += duration
264 frame_idx += 1
265
266 finally:
267 im.seek(cur_idx)
268
269 # Force encoder to flush frames
270 enc.add(None, round(timestamp), lossless, quality, alpha_quality, 0)
271
272 # Get the final output from the encoder
273 data = enc.assemble(icc_profile, exif, xmp)
274 if data is None:
275 msg = "cannot write file as WebP (encoder returned None)"
276 raise OSError(msg)
277
278 fp.write(data)
279
280
281def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
282 lossless = im.encoderinfo.get("lossless", False)
283 quality = im.encoderinfo.get("quality", 80)
284 alpha_quality = im.encoderinfo.get("alpha_quality", 100)
285 icc_profile = im.encoderinfo.get("icc_profile") or ""
286 exif = im.encoderinfo.get("exif", b"")
287 if isinstance(exif, Image.Exif):
288 exif = exif.tobytes()
289 if exif.startswith(b"Exif\x00\x00"):
290 exif = exif[6:]
291 xmp = im.encoderinfo.get("xmp", "")
292 method = im.encoderinfo.get("method", 4)
293 exact = 1 if im.encoderinfo.get("exact") else 0
294
295 im = _convert_frame(im)
296
297 data = _webp.WebPEncode(
298 im.getim(),
299 lossless,
300 float(quality),
301 float(alpha_quality),
302 icc_profile,
303 method,
304 exact,
305 exif,
306 xmp,
307 )
308 if data is None:
309 msg = "cannot write file as WebP (encoder returned None)"
310 raise OSError(msg)
311
312 fp.write(data)
313
314
315Image.register_open(WebPImageFile.format, WebPImageFile, _accept)
316if SUPPORTED:
317 Image.register_save(WebPImageFile.format, _save)
318 Image.register_save_all(WebPImageFile.format, _save_all)
319 Image.register_extension(WebPImageFile.format, ".webp")
320 Image.register_mime(WebPImageFile.format, "image/webp")