1from __future__ import annotations
2
3from io import BytesIO
4
5from . import Image, ImageFile
6
7try:
8 from . import _webp
9
10 SUPPORTED = True
11except ImportError:
12 SUPPORTED = False
13
14TYPE_CHECKING = False
15if TYPE_CHECKING:
16 from typing import IO, Any
17
18_VP8_MODES_BY_IDENTIFIER = {
19 b"VP8 ": "RGB",
20 b"VP8X": "RGBA",
21 b"VP8L": "RGBA", # lossless
22}
23
24
25def _accept(prefix: bytes) -> bool | str:
26 is_riff_file_format = prefix.startswith(b"RIFF")
27 is_webp_file = prefix[8:12] == b"WEBP"
28 is_valid_vp8_mode = prefix[12:16] in _VP8_MODES_BY_IDENTIFIER
29
30 if is_riff_file_format and is_webp_file and is_valid_vp8_mode:
31 if not SUPPORTED:
32 return (
33 "image file could not be identified because WEBP support not installed"
34 )
35 return True
36 return False
37
38
39class WebPImageFile(ImageFile.ImageFile):
40 format = "WEBP"
41 format_description = "WebP image"
42 __loaded = 0
43 __logical_frame = 0
44
45 def _open(self) -> None:
46 # Use the newer AnimDecoder API to parse the (possibly) animated file,
47 # and access muxed chunks like ICC/EXIF/XMP.
48 self._decoder = _webp.WebPAnimDecoder(self.fp.read())
49
50 # Get info from decoder
51 self._size, loop_count, bgcolor, frame_count, mode = self._decoder.get_info()
52 self.info["loop"] = loop_count
53 bg_a, bg_r, bg_g, bg_b = (
54 (bgcolor >> 24) & 0xFF,
55 (bgcolor >> 16) & 0xFF,
56 (bgcolor >> 8) & 0xFF,
57 bgcolor & 0xFF,
58 )
59 self.info["background"] = (bg_r, bg_g, bg_b, bg_a)
60 self.n_frames = frame_count
61 self.is_animated = self.n_frames > 1
62 self._mode = "RGB" if mode == "RGBX" else mode
63 self.rawmode = mode
64
65 # Attempt to read ICC / EXIF / XMP chunks from file
66 icc_profile = self._decoder.get_chunk("ICCP")
67 exif = self._decoder.get_chunk("EXIF")
68 xmp = self._decoder.get_chunk("XMP ")
69 if icc_profile:
70 self.info["icc_profile"] = icc_profile
71 if exif:
72 self.info["exif"] = exif
73 if xmp:
74 self.info["xmp"] = xmp
75
76 # Initialize seek state
77 self._reset(reset=False)
78
79 def _getexif(self) -> dict[int, Any] | None:
80 if "exif" not in self.info:
81 return None
82 return self.getexif()._get_merged_dict()
83
84 def seek(self, frame: int) -> None:
85 if not self._seek_check(frame):
86 return
87
88 # Set logical frame to requested position
89 self.__logical_frame = frame
90
91 def _reset(self, reset: bool = True) -> None:
92 if reset:
93 self._decoder.reset()
94 self.__physical_frame = 0
95 self.__loaded = -1
96 self.__timestamp = 0
97
98 def _get_next(self) -> tuple[bytes, int, int]:
99 # Get next frame
100 ret = self._decoder.get_next()
101 self.__physical_frame += 1
102
103 # Check if an error occurred
104 if ret is None:
105 self._reset() # Reset just to be safe
106 self.seek(0)
107 msg = "failed to decode next frame in WebP file"
108 raise EOFError(msg)
109
110 # Compute duration
111 data, timestamp = ret
112 duration = timestamp - self.__timestamp
113 self.__timestamp = timestamp
114
115 # libwebp gives frame end, adjust to start of frame
116 timestamp -= duration
117 return data, timestamp, duration
118
119 def _seek(self, frame: int) -> None:
120 if self.__physical_frame == frame:
121 return # Nothing to do
122 if frame < self.__physical_frame:
123 self._reset() # Rewind to beginning
124 while self.__physical_frame < frame:
125 self._get_next() # Advance to the requested frame
126
127 def load(self) -> Image.core.PixelAccess | None:
128 if self.__loaded != self.__logical_frame:
129 self._seek(self.__logical_frame)
130
131 # We need to load the image data for this frame
132 data, timestamp, duration = self._get_next()
133 self.info["timestamp"] = timestamp
134 self.info["duration"] = duration
135 self.__loaded = self.__logical_frame
136
137 # Set tile
138 if self.fp and self._exclusive_fp:
139 self.fp.close()
140 self.fp = BytesIO(data)
141 self.tile = [ImageFile._Tile("raw", (0, 0) + self.size, 0, self.rawmode)]
142
143 return super().load()
144
145 def load_seek(self, pos: int) -> None:
146 pass
147
148 def tell(self) -> int:
149 return self.__logical_frame
150
151
152def _convert_frame(im: Image.Image) -> Image.Image:
153 # Make sure image mode is supported
154 if im.mode not in ("RGBX", "RGBA", "RGB"):
155 im = im.convert("RGBA" if im.has_transparency_data else "RGB")
156 return im
157
158
159def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
160 encoderinfo = im.encoderinfo.copy()
161 append_images = list(encoderinfo.get("append_images", []))
162
163 # If total frame count is 1, then save using the legacy API, which
164 # will preserve non-alpha modes
165 total = 0
166 for ims in [im] + append_images:
167 total += getattr(ims, "n_frames", 1)
168 if total == 1:
169 _save(im, fp, filename)
170 return
171
172 background: int | tuple[int, ...] = (0, 0, 0, 0)
173 if "background" in encoderinfo:
174 background = encoderinfo["background"]
175 elif "background" in im.info:
176 background = im.info["background"]
177 if isinstance(background, int):
178 # GifImagePlugin stores a global color table index in
179 # info["background"]. So it must be converted to an RGBA value
180 palette = im.getpalette()
181 if palette:
182 r, g, b = palette[background * 3 : (background + 1) * 3]
183 background = (r, g, b, 255)
184 else:
185 background = (background, background, background, 255)
186
187 duration = im.encoderinfo.get("duration", im.info.get("duration", 0))
188 loop = im.encoderinfo.get("loop", 0)
189 minimize_size = im.encoderinfo.get("minimize_size", False)
190 kmin = im.encoderinfo.get("kmin", None)
191 kmax = im.encoderinfo.get("kmax", None)
192 allow_mixed = im.encoderinfo.get("allow_mixed", False)
193 verbose = False
194 lossless = im.encoderinfo.get("lossless", False)
195 quality = im.encoderinfo.get("quality", 80)
196 alpha_quality = im.encoderinfo.get("alpha_quality", 100)
197 method = im.encoderinfo.get("method", 0)
198 icc_profile = im.encoderinfo.get("icc_profile") or ""
199 exif = im.encoderinfo.get("exif", "")
200 if isinstance(exif, Image.Exif):
201 exif = exif.tobytes()
202 xmp = im.encoderinfo.get("xmp", "")
203 if allow_mixed:
204 lossless = False
205
206 # Sensible keyframe defaults are from gif2webp.c script
207 if kmin is None:
208 kmin = 9 if lossless else 3
209 if kmax is None:
210 kmax = 17 if lossless else 5
211
212 # Validate background color
213 if (
214 not isinstance(background, (list, tuple))
215 or len(background) != 4
216 or not all(0 <= v < 256 for v in background)
217 ):
218 msg = f"Background color is not an RGBA tuple clamped to (0-255): {background}"
219 raise OSError(msg)
220
221 # Convert to packed uint
222 bg_r, bg_g, bg_b, bg_a = background
223 background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0)
224
225 # Setup the WebP animation encoder
226 enc = _webp.WebPAnimEncoder(
227 im.size,
228 background,
229 loop,
230 minimize_size,
231 kmin,
232 kmax,
233 allow_mixed,
234 verbose,
235 )
236
237 # Add each frame
238 frame_idx = 0
239 timestamp = 0
240 cur_idx = im.tell()
241 try:
242 for ims in [im] + append_images:
243 # Get number of frames in this image
244 nfr = getattr(ims, "n_frames", 1)
245
246 for idx in range(nfr):
247 ims.seek(idx)
248
249 frame = _convert_frame(ims)
250
251 # Append the frame to the animation encoder
252 enc.add(
253 frame.getim(),
254 round(timestamp),
255 lossless,
256 quality,
257 alpha_quality,
258 method,
259 )
260
261 # Update timestamp and frame index
262 if isinstance(duration, (list, tuple)):
263 timestamp += duration[frame_idx]
264 else:
265 timestamp += duration
266 frame_idx += 1
267
268 finally:
269 im.seek(cur_idx)
270
271 # Force encoder to flush frames
272 enc.add(None, round(timestamp), lossless, quality, alpha_quality, 0)
273
274 # Get the final output from the encoder
275 data = enc.assemble(icc_profile, exif, xmp)
276 if data is None:
277 msg = "cannot write file as WebP (encoder returned None)"
278 raise OSError(msg)
279
280 fp.write(data)
281
282
283def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None:
284 lossless = im.encoderinfo.get("lossless", False)
285 quality = im.encoderinfo.get("quality", 80)
286 alpha_quality = im.encoderinfo.get("alpha_quality", 100)
287 icc_profile = im.encoderinfo.get("icc_profile") or ""
288 exif = im.encoderinfo.get("exif", b"")
289 if isinstance(exif, Image.Exif):
290 exif = exif.tobytes()
291 if exif.startswith(b"Exif\x00\x00"):
292 exif = exif[6:]
293 xmp = im.encoderinfo.get("xmp", "")
294 method = im.encoderinfo.get("method", 4)
295 exact = 1 if im.encoderinfo.get("exact") else 0
296
297 im = _convert_frame(im)
298
299 data = _webp.WebPEncode(
300 im.getim(),
301 lossless,
302 float(quality),
303 float(alpha_quality),
304 icc_profile,
305 method,
306 exact,
307 exif,
308 xmp,
309 )
310 if data is None:
311 msg = "cannot write file as WebP (encoder returned None)"
312 raise OSError(msg)
313
314 fp.write(data)
315
316
317Image.register_open(WebPImageFile.format, WebPImageFile, _accept)
318if SUPPORTED:
319 Image.register_save(WebPImageFile.format, _save)
320 Image.register_save_all(WebPImageFile.format, _save_all)
321 Image.register_extension(WebPImageFile.format, ".webp")
322 Image.register_mime(WebPImageFile.format, "image/webp")