Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/WebPImagePlugin.py: 43%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

192 statements  

1from __future__ import annotations 

2 

3from io import BytesIO 

4from typing import IO, Any 

5 

6from . import Image, ImageFile 

7 

8try: 

9 from . import _webp 

10 

11 SUPPORTED = True 

12except ImportError: 

13 SUPPORTED = False 

14 

15 

16_VP8_MODES_BY_IDENTIFIER = { 

17 b"VP8 ": "RGB", 

18 b"VP8X": "RGBA", 

19 b"VP8L": "RGBA", # lossless 

20} 

21 

22 

23def _accept(prefix: bytes) -> bool | str: 

24 is_riff_file_format = prefix.startswith(b"RIFF") 

25 is_webp_file = prefix[8:12] == b"WEBP" 

26 is_valid_vp8_mode = prefix[12:16] in _VP8_MODES_BY_IDENTIFIER 

27 

28 if is_riff_file_format and is_webp_file and is_valid_vp8_mode: 

29 if not SUPPORTED: 

30 return ( 

31 "image file could not be identified because WEBP support not installed" 

32 ) 

33 return True 

34 return False 

35 

36 

37class WebPImageFile(ImageFile.ImageFile): 

38 format = "WEBP" 

39 format_description = "WebP image" 

40 __loaded = 0 

41 __logical_frame = 0 

42 

43 def _open(self) -> None: 

44 # Use the newer AnimDecoder API to parse the (possibly) animated file, 

45 # and access muxed chunks like ICC/EXIF/XMP. 

46 self._decoder = _webp.WebPAnimDecoder(self.fp.read()) 

47 

48 # Get info from decoder 

49 self._size, loop_count, bgcolor, frame_count, mode = self._decoder.get_info() 

50 self.info["loop"] = loop_count 

51 bg_a, bg_r, bg_g, bg_b = ( 

52 (bgcolor >> 24) & 0xFF, 

53 (bgcolor >> 16) & 0xFF, 

54 (bgcolor >> 8) & 0xFF, 

55 bgcolor & 0xFF, 

56 ) 

57 self.info["background"] = (bg_r, bg_g, bg_b, bg_a) 

58 self.n_frames = frame_count 

59 self.is_animated = self.n_frames > 1 

60 self._mode = "RGB" if mode == "RGBX" else mode 

61 self.rawmode = mode 

62 

63 # Attempt to read ICC / EXIF / XMP chunks from file 

64 icc_profile = self._decoder.get_chunk("ICCP") 

65 exif = self._decoder.get_chunk("EXIF") 

66 xmp = self._decoder.get_chunk("XMP ") 

67 if icc_profile: 

68 self.info["icc_profile"] = icc_profile 

69 if exif: 

70 self.info["exif"] = exif 

71 if xmp: 

72 self.info["xmp"] = xmp 

73 

74 # Initialize seek state 

75 self._reset(reset=False) 

76 

77 def _getexif(self) -> dict[int, Any] | None: 

78 if "exif" not in self.info: 

79 return None 

80 return self.getexif()._get_merged_dict() 

81 

82 def seek(self, frame: int) -> None: 

83 if not self._seek_check(frame): 

84 return 

85 

86 # Set logical frame to requested position 

87 self.__logical_frame = frame 

88 

89 def _reset(self, reset: bool = True) -> None: 

90 if reset: 

91 self._decoder.reset() 

92 self.__physical_frame = 0 

93 self.__loaded = -1 

94 self.__timestamp = 0 

95 

96 def _get_next(self) -> tuple[bytes, int, int]: 

97 # Get next frame 

98 ret = self._decoder.get_next() 

99 self.__physical_frame += 1 

100 

101 # Check if an error occurred 

102 if ret is None: 

103 self._reset() # Reset just to be safe 

104 self.seek(0) 

105 msg = "failed to decode next frame in WebP file" 

106 raise EOFError(msg) 

107 

108 # Compute duration 

109 data, timestamp = ret 

110 duration = timestamp - self.__timestamp 

111 self.__timestamp = timestamp 

112 

113 # libwebp gives frame end, adjust to start of frame 

114 timestamp -= duration 

115 return data, timestamp, duration 

116 

117 def _seek(self, frame: int) -> None: 

118 if self.__physical_frame == frame: 

119 return # Nothing to do 

120 if frame < self.__physical_frame: 

121 self._reset() # Rewind to beginning 

122 while self.__physical_frame < frame: 

123 self._get_next() # Advance to the requested frame 

124 

125 def load(self) -> Image.core.PixelAccess | None: 

126 if self.__loaded != self.__logical_frame: 

127 self._seek(self.__logical_frame) 

128 

129 # We need to load the image data for this frame 

130 data, timestamp, duration = self._get_next() 

131 self.info["timestamp"] = timestamp 

132 self.info["duration"] = duration 

133 self.__loaded = self.__logical_frame 

134 

135 # Set tile 

136 if self.fp and self._exclusive_fp: 

137 self.fp.close() 

138 self.fp = BytesIO(data) 

139 self.tile = [ImageFile._Tile("raw", (0, 0) + self.size, 0, self.rawmode)] 

140 

141 return super().load() 

142 

143 def load_seek(self, pos: int) -> None: 

144 pass 

145 

146 def tell(self) -> int: 

147 return self.__logical_frame 

148 

149 

150def _convert_frame(im: Image.Image) -> Image.Image: 

151 # Make sure image mode is supported 

152 if im.mode not in ("RGBX", "RGBA", "RGB"): 

153 im = im.convert("RGBA" if im.has_transparency_data else "RGB") 

154 return im 

155 

156 

157def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: 

158 encoderinfo = im.encoderinfo.copy() 

159 append_images = list(encoderinfo.get("append_images", [])) 

160 

161 # If total frame count is 1, then save using the legacy API, which 

162 # will preserve non-alpha modes 

163 total = 0 

164 for ims in [im] + append_images: 

165 total += getattr(ims, "n_frames", 1) 

166 if total == 1: 

167 _save(im, fp, filename) 

168 return 

169 

170 background: int | tuple[int, ...] = (0, 0, 0, 0) 

171 if "background" in encoderinfo: 

172 background = encoderinfo["background"] 

173 elif "background" in im.info: 

174 background = im.info["background"] 

175 if isinstance(background, int): 

176 # GifImagePlugin stores a global color table index in 

177 # info["background"]. So it must be converted to an RGBA value 

178 palette = im.getpalette() 

179 if palette: 

180 r, g, b = palette[background * 3 : (background + 1) * 3] 

181 background = (r, g, b, 255) 

182 else: 

183 background = (background, background, background, 255) 

184 

185 duration = im.encoderinfo.get("duration", im.info.get("duration", 0)) 

186 loop = im.encoderinfo.get("loop", 0) 

187 minimize_size = im.encoderinfo.get("minimize_size", False) 

188 kmin = im.encoderinfo.get("kmin", None) 

189 kmax = im.encoderinfo.get("kmax", None) 

190 allow_mixed = im.encoderinfo.get("allow_mixed", False) 

191 verbose = False 

192 lossless = im.encoderinfo.get("lossless", False) 

193 quality = im.encoderinfo.get("quality", 80) 

194 alpha_quality = im.encoderinfo.get("alpha_quality", 100) 

195 method = im.encoderinfo.get("method", 0) 

196 icc_profile = im.encoderinfo.get("icc_profile") or "" 

197 exif = im.encoderinfo.get("exif", "") 

198 if isinstance(exif, Image.Exif): 

199 exif = exif.tobytes() 

200 xmp = im.encoderinfo.get("xmp", "") 

201 if allow_mixed: 

202 lossless = False 

203 

204 # Sensible keyframe defaults are from gif2webp.c script 

205 if kmin is None: 

206 kmin = 9 if lossless else 3 

207 if kmax is None: 

208 kmax = 17 if lossless else 5 

209 

210 # Validate background color 

211 if ( 

212 not isinstance(background, (list, tuple)) 

213 or len(background) != 4 

214 or not all(0 <= v < 256 for v in background) 

215 ): 

216 msg = f"Background color is not an RGBA tuple clamped to (0-255): {background}" 

217 raise OSError(msg) 

218 

219 # Convert to packed uint 

220 bg_r, bg_g, bg_b, bg_a = background 

221 background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0) 

222 

223 # Setup the WebP animation encoder 

224 enc = _webp.WebPAnimEncoder( 

225 im.size, 

226 background, 

227 loop, 

228 minimize_size, 

229 kmin, 

230 kmax, 

231 allow_mixed, 

232 verbose, 

233 ) 

234 

235 # Add each frame 

236 frame_idx = 0 

237 timestamp = 0 

238 cur_idx = im.tell() 

239 try: 

240 for ims in [im] + append_images: 

241 # Get number of frames in this image 

242 nfr = getattr(ims, "n_frames", 1) 

243 

244 for idx in range(nfr): 

245 ims.seek(idx) 

246 

247 frame = _convert_frame(ims) 

248 

249 # Append the frame to the animation encoder 

250 enc.add( 

251 frame.getim(), 

252 round(timestamp), 

253 lossless, 

254 quality, 

255 alpha_quality, 

256 method, 

257 ) 

258 

259 # Update timestamp and frame index 

260 if isinstance(duration, (list, tuple)): 

261 timestamp += duration[frame_idx] 

262 else: 

263 timestamp += duration 

264 frame_idx += 1 

265 

266 finally: 

267 im.seek(cur_idx) 

268 

269 # Force encoder to flush frames 

270 enc.add(None, round(timestamp), lossless, quality, alpha_quality, 0) 

271 

272 # Get the final output from the encoder 

273 data = enc.assemble(icc_profile, exif, xmp) 

274 if data is None: 

275 msg = "cannot write file as WebP (encoder returned None)" 

276 raise OSError(msg) 

277 

278 fp.write(data) 

279 

280 

281def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: 

282 lossless = im.encoderinfo.get("lossless", False) 

283 quality = im.encoderinfo.get("quality", 80) 

284 alpha_quality = im.encoderinfo.get("alpha_quality", 100) 

285 icc_profile = im.encoderinfo.get("icc_profile") or "" 

286 exif = im.encoderinfo.get("exif", b"") 

287 if isinstance(exif, Image.Exif): 

288 exif = exif.tobytes() 

289 if exif.startswith(b"Exif\x00\x00"): 

290 exif = exif[6:] 

291 xmp = im.encoderinfo.get("xmp", "") 

292 method = im.encoderinfo.get("method", 4) 

293 exact = 1 if im.encoderinfo.get("exact") else 0 

294 

295 im = _convert_frame(im) 

296 

297 data = _webp.WebPEncode( 

298 im.getim(), 

299 lossless, 

300 float(quality), 

301 float(alpha_quality), 

302 icc_profile, 

303 method, 

304 exact, 

305 exif, 

306 xmp, 

307 ) 

308 if data is None: 

309 msg = "cannot write file as WebP (encoder returned None)" 

310 raise OSError(msg) 

311 

312 fp.write(data) 

313 

314 

315Image.register_open(WebPImageFile.format, WebPImageFile, _accept) 

316if SUPPORTED: 

317 Image.register_save(WebPImageFile.format, _save) 

318 Image.register_save_all(WebPImageFile.format, _save_all) 

319 Image.register_extension(WebPImageFile.format, ".webp") 

320 Image.register_mime(WebPImageFile.format, "image/webp")