1from __future__ import annotations
2
3from io import BytesIO
4from typing import IO, Any
5
6from . import Image, ImageFile
7
8try:
9 from . import _webp
10
11 SUPPORTED = True
12except ImportError:
13 SUPPORTED = False
14
15
16_VALID_WEBP_MODES = {"RGBX": True, "RGBA": True, "RGB": True}
17
18_VALID_WEBP_LEGACY_MODES = {"RGB": True, "RGBA": True}
19
20_VP8_MODES_BY_IDENTIFIER = {
21 b"VP8 ": "RGB",
22 b"VP8X": "RGBA",
23 b"VP8L": "RGBA", # lossless
24}
25
26
27def _accept(prefix: bytes) -> bool | str:
28 is_riff_file_format = prefix[:4] == b"RIFF"
29 is_webp_file = prefix[8:12] == b"WEBP"
30 is_valid_vp8_mode = prefix[12:16] in _VP8_MODES_BY_IDENTIFIER
31
32 if is_riff_file_format and is_webp_file and is_valid_vp8_mode:
33 if not SUPPORTED:
34 return (
35 "image file could not be identified because WEBP support not installed"
36 )
37 return True
38 return False
39
40
41class WebPImageFile(ImageFile.ImageFile):
42 format = "WEBP"
43 format_description = "WebP image"
44 __loaded = 0
45 __logical_frame = 0
46
47 def _open(self) -> None:
48 if not _webp.HAVE_WEBPANIM:
49 # Legacy mode
50 data, width, height, self._mode, icc_profile, exif = _webp.WebPDecode(
51 self.fp.read()
52 )
53 if icc_profile:
54 self.info["icc_profile"] = icc_profile
55 if exif:
56 self.info["exif"] = exif
57 self._size = width, height
58 self.fp = BytesIO(data)
59 self.tile = [("raw", (0, 0) + self.size, 0, self.mode)]
60 self.n_frames = 1
61 self.is_animated = False
62 return
63
64 # Use the newer AnimDecoder API to parse the (possibly) animated file,
65 # and access muxed chunks like ICC/EXIF/XMP.
66 self._decoder = _webp.WebPAnimDecoder(self.fp.read())
67
68 # Get info from decoder
69 width, height, loop_count, bgcolor, frame_count, mode = self._decoder.get_info()
70 self._size = width, height
71 self.info["loop"] = loop_count
72 bg_a, bg_r, bg_g, bg_b = (
73 (bgcolor >> 24) & 0xFF,
74 (bgcolor >> 16) & 0xFF,
75 (bgcolor >> 8) & 0xFF,
76 bgcolor & 0xFF,
77 )
78 self.info["background"] = (bg_r, bg_g, bg_b, bg_a)
79 self.n_frames = frame_count
80 self.is_animated = self.n_frames > 1
81 self._mode = "RGB" if mode == "RGBX" else mode
82 self.rawmode = mode
83 self.tile = []
84
85 # Attempt to read ICC / EXIF / XMP chunks from file
86 icc_profile = self._decoder.get_chunk("ICCP")
87 exif = self._decoder.get_chunk("EXIF")
88 xmp = self._decoder.get_chunk("XMP ")
89 if icc_profile:
90 self.info["icc_profile"] = icc_profile
91 if exif:
92 self.info["exif"] = exif
93 if xmp:
94 self.info["xmp"] = xmp
95
96 # Initialize seek state
97 self._reset(reset=False)
98
99 def _getexif(self) -> dict[str, Any] | None:
100 if "exif" not in self.info:
101 return None
102 return self.getexif()._get_merged_dict()
103
104 def seek(self, frame: int) -> None:
105 if not self._seek_check(frame):
106 return
107
108 # Set logical frame to requested position
109 self.__logical_frame = frame
110
111 def _reset(self, reset: bool = True) -> None:
112 if reset:
113 self._decoder.reset()
114 self.__physical_frame = 0
115 self.__loaded = -1
116 self.__timestamp = 0
117
118 def _get_next(self):
119 # Get next frame
120 ret = self._decoder.get_next()
121 self.__physical_frame += 1
122
123 # Check if an error occurred
124 if ret is None:
125 self._reset() # Reset just to be safe
126 self.seek(0)
127 msg = "failed to decode next frame in WebP file"
128 raise EOFError(msg)
129
130 # Compute duration
131 data, timestamp = ret
132 duration = timestamp - self.__timestamp
133 self.__timestamp = timestamp
134
135 # libwebp gives frame end, adjust to start of frame
136 timestamp -= duration
137 return data, timestamp, duration
138
139 def _seek(self, frame: int) -> None:
140 if self.__physical_frame == frame:
141 return # Nothing to do
142 if frame < self.__physical_frame:
143 self._reset() # Rewind to beginning
144 while self.__physical_frame < frame:
145 self._get_next() # Advance to the requested frame
146
147 def load(self):
148 if _webp.HAVE_WEBPANIM:
149 if self.__loaded != self.__logical_frame:
150 self._seek(self.__logical_frame)
151
152 # We need to load the image data for this frame
153 data, timestamp, duration = self._get_next()
154 self.info["timestamp"] = timestamp
155 self.info["duration"] = duration
156 self.__loaded = self.__logical_frame
157
158 # Set tile
159 if self.fp and self._exclusive_fp:
160 self.fp.close()
161 self.fp = BytesIO(data)
162 self.tile = [("raw", (0, 0) + self.size, 0, self.rawmode)]
163
164 return super().load()
165
166 def load_seek(self, pos: int) -> None:
167 pass
168
169 def tell(self) -> int:
170 if not _webp.HAVE_WEBPANIM:
171 return super().tell()
172
173 return self.__logical_frame
174
175
176def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
177 encoderinfo = im.encoderinfo.copy()
178 append_images = list(encoderinfo.get("append_images", []))
179
180 # If total frame count is 1, then save using the legacy API, which
181 # will preserve non-alpha modes
182 total = 0
183 for ims in [im] + append_images:
184 total += getattr(ims, "n_frames", 1)
185 if total == 1:
186 _save(im, fp, filename)
187 return
188
189 background: int | tuple[int, ...] = (0, 0, 0, 0)
190 if "background" in encoderinfo:
191 background = encoderinfo["background"]
192 elif "background" in im.info:
193 background = im.info["background"]
194 if isinstance(background, int):
195 # GifImagePlugin stores a global color table index in
196 # info["background"]. So it must be converted to an RGBA value
197 palette = im.getpalette()
198 if palette:
199 r, g, b = palette[background * 3 : (background + 1) * 3]
200 background = (r, g, b, 255)
201 else:
202 background = (background, background, background, 255)
203
204 duration = im.encoderinfo.get("duration", im.info.get("duration", 0))
205 loop = im.encoderinfo.get("loop", 0)
206 minimize_size = im.encoderinfo.get("minimize_size", False)
207 kmin = im.encoderinfo.get("kmin", None)
208 kmax = im.encoderinfo.get("kmax", None)
209 allow_mixed = im.encoderinfo.get("allow_mixed", False)
210 verbose = False
211 lossless = im.encoderinfo.get("lossless", False)
212 quality = im.encoderinfo.get("quality", 80)
213 alpha_quality = im.encoderinfo.get("alpha_quality", 100)
214 method = im.encoderinfo.get("method", 0)
215 icc_profile = im.encoderinfo.get("icc_profile") or ""
216 exif = im.encoderinfo.get("exif", "")
217 if isinstance(exif, Image.Exif):
218 exif = exif.tobytes()
219 xmp = im.encoderinfo.get("xmp", "")
220 if allow_mixed:
221 lossless = False
222
223 # Sensible keyframe defaults are from gif2webp.c script
224 if kmin is None:
225 kmin = 9 if lossless else 3
226 if kmax is None:
227 kmax = 17 if lossless else 5
228
229 # Validate background color
230 if (
231 not isinstance(background, (list, tuple))
232 or len(background) != 4
233 or not all(0 <= v < 256 for v in background)
234 ):
235 msg = f"Background color is not an RGBA tuple clamped to (0-255): {background}"
236 raise OSError(msg)
237
238 # Convert to packed uint
239 bg_r, bg_g, bg_b, bg_a = background
240 background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0)
241
242 # Setup the WebP animation encoder
243 enc = _webp.WebPAnimEncoder(
244 im.size[0],
245 im.size[1],
246 background,
247 loop,
248 minimize_size,
249 kmin,
250 kmax,
251 allow_mixed,
252 verbose,
253 )
254
255 # Add each frame
256 frame_idx = 0
257 timestamp = 0
258 cur_idx = im.tell()
259 try:
260 for ims in [im] + append_images:
261 # Get # of frames in this image
262 nfr = getattr(ims, "n_frames", 1)
263
264 for idx in range(nfr):
265 ims.seek(idx)
266 ims.load()
267
268 # Make sure image mode is supported
269 frame = ims
270 rawmode = ims.mode
271 if ims.mode not in _VALID_WEBP_MODES:
272 alpha = (
273 "A" in ims.mode
274 or "a" in ims.mode
275 or (ims.mode == "P" and "A" in ims.im.getpalettemode())
276 )
277 rawmode = "RGBA" if alpha else "RGB"
278 frame = ims.convert(rawmode)
279
280 if rawmode == "RGB":
281 # For faster conversion, use RGBX
282 rawmode = "RGBX"
283
284 # Append the frame to the animation encoder
285 enc.add(
286 frame.tobytes("raw", rawmode),
287 round(timestamp),
288 frame.size[0],
289 frame.size[1],
290 rawmode,
291 lossless,
292 quality,
293 alpha_quality,
294 method,
295 )
296
297 # Update timestamp and frame index
298 if isinstance(duration, (list, tuple)):
299 timestamp += duration[frame_idx]
300 else:
301 timestamp += duration
302 frame_idx += 1
303
304 finally:
305 im.seek(cur_idx)
306
307 # Force encoder to flush frames
308 enc.add(None, round(timestamp), 0, 0, "", lossless, quality, alpha_quality, 0)
309
310 # Get the final output from the encoder
311 data = enc.assemble(icc_profile, exif, xmp)
312 if data is None:
313 msg = "cannot write file as WebP (encoder returned None)"
314 raise OSError(msg)
315
316 fp.write(data)
317
318
319def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
320 lossless = im.encoderinfo.get("lossless", False)
321 quality = im.encoderinfo.get("quality", 80)
322 alpha_quality = im.encoderinfo.get("alpha_quality", 100)
323 icc_profile = im.encoderinfo.get("icc_profile") or ""
324 exif = im.encoderinfo.get("exif", b"")
325 if isinstance(exif, Image.Exif):
326 exif = exif.tobytes()
327 if exif.startswith(b"Exif\x00\x00"):
328 exif = exif[6:]
329 xmp = im.encoderinfo.get("xmp", "")
330 method = im.encoderinfo.get("method", 4)
331 exact = 1 if im.encoderinfo.get("exact") else 0
332
333 if im.mode not in _VALID_WEBP_LEGACY_MODES:
334 im = im.convert("RGBA" if im.has_transparency_data else "RGB")
335
336 data = _webp.WebPEncode(
337 im.tobytes(),
338 im.size[0],
339 im.size[1],
340 lossless,
341 float(quality),
342 float(alpha_quality),
343 im.mode,
344 icc_profile,
345 method,
346 exact,
347 exif,
348 xmp,
349 )
350 if data is None:
351 msg = "cannot write file as WebP (encoder returned None)"
352 raise OSError(msg)
353
354 fp.write(data)
355
356
357Image.register_open(WebPImageFile.format, WebPImageFile, _accept)
358if SUPPORTED:
359 Image.register_save(WebPImageFile.format, _save)
360 if _webp.HAVE_WEBPANIM:
361 Image.register_save_all(WebPImageFile.format, _save_all)
362 Image.register_extension(WebPImageFile.format, ".webp")
363 Image.register_mime(WebPImageFile.format, "image/webp")