Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/WebPImagePlugin.py: 40%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

183 statements  

1from __future__ import annotations 

2 

3from io import BytesIO 

4 

5from . import Image, ImageFile 

6 

7try: 

8 from . import _webp 

9 

10 SUPPORTED = True 

11except ImportError: 

12 SUPPORTED = False 

13 

14TYPE_CHECKING = False 

15if TYPE_CHECKING: 

16 from typing import IO, Any 

17 

18_VP8_MODES_BY_IDENTIFIER = { 

19 b"VP8 ": "RGB", 

20 b"VP8X": "RGBA", 

21 b"VP8L": "RGBA", # lossless 

22} 

23 

24 

25def _accept(prefix: bytes) -> bool | str: 

26 is_riff_file_format = prefix.startswith(b"RIFF") 

27 is_webp_file = prefix[8:12] == b"WEBP" 

28 is_valid_vp8_mode = prefix[12:16] in _VP8_MODES_BY_IDENTIFIER 

29 

30 if is_riff_file_format and is_webp_file and is_valid_vp8_mode: 

31 if not SUPPORTED: 

32 return ( 

33 "image file could not be identified because WEBP support not installed" 

34 ) 

35 return True 

36 return False 

37 

38 

39class WebPImageFile(ImageFile.ImageFile): 

40 format = "WEBP" 

41 format_description = "WebP image" 

42 __loaded = 0 

43 __logical_frame = 0 

44 

45 def _open(self) -> None: 

46 # Use the newer AnimDecoder API to parse the (possibly) animated file, 

47 # and access muxed chunks like ICC/EXIF/XMP. 

48 assert self.fp is not None 

49 self._decoder = _webp.WebPAnimDecoder(self.fp.read()) 

50 

51 # Get info from decoder 

52 self._size, self.info["loop"], bgcolor, self.n_frames, self.rawmode = ( 

53 self._decoder.get_info() 

54 ) 

55 self.info["background"] = ( 

56 (bgcolor >> 16) & 0xFF, # R 

57 (bgcolor >> 8) & 0xFF, # G 

58 bgcolor & 0xFF, # B 

59 (bgcolor >> 24) & 0xFF, # A 

60 ) 

61 self.is_animated = self.n_frames > 1 

62 self._mode = "RGB" if self.rawmode == "RGBX" else self.rawmode 

63 

64 # Attempt to read ICC / EXIF / XMP chunks from file 

65 for key, chunk_name in { 

66 "icc_profile": "ICCP", 

67 "exif": "EXIF", 

68 "xmp": "XMP ", 

69 }.items(): 

70 if value := self._decoder.get_chunk(chunk_name): 

71 self.info[key] = value 

72 

73 # Initialize seek state 

74 self._reset(reset=False) 

75 

76 def _getexif(self) -> dict[int, Any] | None: 

77 if "exif" not in self.info: 

78 return None 

79 return self.getexif()._get_merged_dict() 

80 

81 def seek(self, frame: int) -> None: 

82 if not self._seek_check(frame): 

83 return 

84 

85 # Set logical frame to requested position 

86 self.__logical_frame = frame 

87 

88 def _reset(self, reset: bool = True) -> None: 

89 if reset: 

90 self._decoder.reset() 

91 self.__physical_frame = 0 

92 self.__loaded = -1 

93 self.__timestamp = 0 

94 

95 def _get_next(self) -> tuple[bytes, int, int]: 

96 # Get next frame 

97 ret = self._decoder.get_next() 

98 self.__physical_frame += 1 

99 

100 # Check if an error occurred 

101 if ret is None: 

102 self._reset() # Reset just to be safe 

103 self.seek(0) 

104 msg = "failed to decode next frame in WebP file" 

105 raise EOFError(msg) 

106 

107 # Compute duration 

108 data, timestamp = ret 

109 duration = timestamp - self.__timestamp 

110 self.__timestamp = timestamp 

111 

112 # libwebp gives frame end, adjust to start of frame 

113 timestamp -= duration 

114 return data, timestamp, duration 

115 

116 def _seek(self, frame: int) -> None: 

117 if self.__physical_frame == frame: 

118 return # Nothing to do 

119 if frame < self.__physical_frame: 

120 self._reset() # Rewind to beginning 

121 while self.__physical_frame < frame: 

122 self._get_next() # Advance to the requested frame 

123 

124 def load(self) -> Image.core.PixelAccess | None: 

125 if self.__loaded != self.__logical_frame: 

126 self._seek(self.__logical_frame) 

127 

128 # We need to load the image data for this frame 

129 data, self.info["timestamp"], self.info["duration"] = self._get_next() 

130 self.__loaded = self.__logical_frame 

131 

132 # Set tile 

133 if self.fp and self._exclusive_fp: 

134 self.fp.close() 

135 self.fp = BytesIO(data) 

136 self.tile = [ImageFile._Tile("raw", (0, 0) + self.size, 0, self.rawmode)] 

137 

138 return super().load() 

139 

140 def load_seek(self, pos: int) -> None: 

141 pass 

142 

143 def tell(self) -> int: 

144 return self.__logical_frame 

145 

146 

147def _convert_frame(im: Image.Image) -> Image.Image: 

148 # Make sure image mode is supported 

149 if im.mode not in ("RGBX", "RGBA", "RGB"): 

150 im = im.convert("RGBA" if im.has_transparency_data else "RGB") 

151 return im 

152 

153 

154def _save_all(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: 

155 encoderinfo = im.encoderinfo.copy() 

156 append_images = list(encoderinfo.get("append_images", [])) 

157 

158 # If total frame count is 1, then save using the legacy API, which 

159 # will preserve non-alpha modes 

160 total = 0 

161 for ims in [im] + append_images: 

162 total += getattr(ims, "n_frames", 1) 

163 if total == 1: 

164 _save(im, fp, filename) 

165 return 

166 

167 background: int | tuple[int, ...] = (0, 0, 0, 0) 

168 if "background" in encoderinfo: 

169 background = encoderinfo["background"] 

170 elif "background" in im.info: 

171 background = im.info["background"] 

172 if isinstance(background, int): 

173 # GifImagePlugin stores a global color table index in 

174 # info["background"]. So it must be converted to an RGBA value 

175 palette = im.getpalette() 

176 if palette: 

177 r, g, b = palette[background * 3 : (background + 1) * 3] 

178 background = (r, g, b, 255) 

179 else: 

180 background = (background, background, background, 255) 

181 

182 duration = im.encoderinfo.get("duration", im.info.get("duration", 0)) 

183 loop = im.encoderinfo.get("loop", 0) 

184 minimize_size = im.encoderinfo.get("minimize_size", False) 

185 kmin = im.encoderinfo.get("kmin", None) 

186 kmax = im.encoderinfo.get("kmax", None) 

187 allow_mixed = im.encoderinfo.get("allow_mixed", False) 

188 verbose = False 

189 lossless = im.encoderinfo.get("lossless", False) 

190 quality = im.encoderinfo.get("quality", 80) 

191 alpha_quality = im.encoderinfo.get("alpha_quality", 100) 

192 method = im.encoderinfo.get("method", 0) 

193 icc_profile = im.encoderinfo.get("icc_profile") or "" 

194 exif = im.encoderinfo.get("exif", "") 

195 if isinstance(exif, Image.Exif): 

196 exif = exif.tobytes() 

197 xmp = im.encoderinfo.get("xmp", "") 

198 if allow_mixed: 

199 lossless = False 

200 

201 # Sensible keyframe defaults are from gif2webp.c script 

202 if kmin is None: 

203 kmin = 9 if lossless else 3 

204 if kmax is None: 

205 kmax = 17 if lossless else 5 

206 

207 # Validate background color 

208 if ( 

209 not isinstance(background, (list, tuple)) 

210 or len(background) != 4 

211 or not all(0 <= v < 256 for v in background) 

212 ): 

213 msg = f"Background color is not an RGBA tuple clamped to (0-255): {background}" 

214 raise OSError(msg) 

215 

216 # Convert to packed uint 

217 bg_r, bg_g, bg_b, bg_a = background 

218 background = (bg_a << 24) | (bg_r << 16) | (bg_g << 8) | (bg_b << 0) 

219 

220 # Setup the WebP animation encoder 

221 enc = _webp.WebPAnimEncoder( 

222 im.size, 

223 background, 

224 loop, 

225 minimize_size, 

226 kmin, 

227 kmax, 

228 allow_mixed, 

229 verbose, 

230 ) 

231 

232 # Add each frame 

233 frame_idx = 0 

234 timestamp = 0 

235 cur_idx = im.tell() 

236 try: 

237 for ims in [im] + append_images: 

238 # Get number of frames in this image 

239 nfr = getattr(ims, "n_frames", 1) 

240 

241 for idx in range(nfr): 

242 ims.seek(idx) 

243 

244 frame = _convert_frame(ims) 

245 

246 # Append the frame to the animation encoder 

247 enc.add( 

248 frame.getim(), 

249 round(timestamp), 

250 lossless, 

251 quality, 

252 alpha_quality, 

253 method, 

254 ) 

255 

256 # Update timestamp and frame index 

257 if isinstance(duration, (list, tuple)): 

258 timestamp += duration[frame_idx] 

259 else: 

260 timestamp += duration 

261 frame_idx += 1 

262 

263 finally: 

264 im.seek(cur_idx) 

265 

266 # Force encoder to flush frames 

267 enc.add(None, round(timestamp), lossless, quality, alpha_quality, 0) 

268 

269 # Get the final output from the encoder 

270 data = enc.assemble(icc_profile, exif, xmp) 

271 if data is None: 

272 msg = "cannot write file as WebP (encoder returned None)" 

273 raise OSError(msg) 

274 

275 fp.write(data) 

276 

277 

278def _save(im: Image.Image, fp: IO[bytes], filename: str | bytes) -> None: 

279 lossless = im.encoderinfo.get("lossless", False) 

280 quality = im.encoderinfo.get("quality", 80) 

281 alpha_quality = im.encoderinfo.get("alpha_quality", 100) 

282 icc_profile = im.encoderinfo.get("icc_profile") or "" 

283 exif = im.encoderinfo.get("exif", b"") 

284 if isinstance(exif, Image.Exif): 

285 exif = exif.tobytes() 

286 if exif.startswith(b"Exif\x00\x00"): 

287 exif = exif[6:] 

288 xmp = im.encoderinfo.get("xmp", "") 

289 method = im.encoderinfo.get("method", 4) 

290 exact = 1 if im.encoderinfo.get("exact") else 0 

291 

292 im = _convert_frame(im) 

293 

294 data = _webp.WebPEncode( 

295 im.getim(), 

296 lossless, 

297 float(quality), 

298 float(alpha_quality), 

299 icc_profile, 

300 method, 

301 exact, 

302 exif, 

303 xmp, 

304 ) 

305 if data is None: 

306 msg = "cannot write file as WebP (encoder returned None)" 

307 raise OSError(msg) 

308 

309 fp.write(data) 

310 

311 

312Image.register_open(WebPImageFile.format, WebPImageFile, _accept) 

313if SUPPORTED: 

314 Image.register_save(WebPImageFile.format, _save) 

315 Image.register_save_all(WebPImageFile.format, _save_all) 

316 Image.register_extension(WebPImageFile.format, ".webp") 

317 Image.register_mime(WebPImageFile.format, "image/webp")