Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

471 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import logging 

35import os 

36import struct 

37from typing import IO, Any, NamedTuple, cast 

38 

39from . import ExifTags, Image 

40from ._util import DeferredError, is_path 

41 

42TYPE_CHECKING = False 

43if TYPE_CHECKING: 

44 from ._typing import StrOrBytesPath 

45 

46logger = logging.getLogger(__name__) 

47 

48MAXBLOCK = 65536 

49""" 

50By default, Pillow processes image data in blocks. This helps to prevent excessive use 

51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``. 

52 

53When reading an image, this is the number of bytes to read at once. 

54 

55When writing an image, this is the number of bytes to write at once. 

56If the image width times 4 is greater, then that will be used instead. 

57Plugins may also set a greater number. 

58 

59User code may set this to another number. 

60""" 

61 

62SAFEBLOCK = 1024 * 1024 

63 

64LOAD_TRUNCATED_IMAGES = False 

65"""Whether or not to load truncated image files. User code may change this.""" 

66 

67ERRORS = { 

68 -1: "image buffer overrun error", 

69 -2: "decoding error", 

70 -3: "unknown error", 

71 -8: "bad configuration", 

72 -9: "out of memory error", 

73} 

74""" 

75Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

77:meth:`.PyEncoder.encode_to_file`. 

78""" 

79 

80 

81# 

82# -------------------------------------------------------------------- 

83# Helpers 

84 

85 

86def _get_oserror(error: int, *, encoder: bool) -> OSError: 

87 try: 

88 msg = Image.core.getcodecstatus(error) 

89 except AttributeError: 

90 msg = ERRORS.get(error) 

91 if not msg: 

92 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

93 msg += f" when {'writing' if encoder else 'reading'} image file" 

94 return OSError(msg) 

95 

96 

97def _tilesort(t: _Tile) -> int: 

98 # sort on offset 

99 return t[2] 

100 

101 

102class _Tile(NamedTuple): 

103 codec_name: str 

104 extents: tuple[int, int, int, int] | None 

105 offset: int = 0 

106 args: tuple[Any, ...] | str | None = None 

107 

108 

109# 

110# -------------------------------------------------------------------- 

111# ImageFile base class 

112 

113 

114class ImageFile(Image.Image): 

115 """Base class for image file format handlers.""" 

116 

117 def __init__( 

118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

119 ) -> None: 

120 super().__init__() 

121 

122 self._min_frame = 0 

123 

124 self.custom_mimetype: str | None = None 

125 

126 self.tile: list[_Tile] = [] 

127 """ A list of tile descriptors """ 

128 

129 self.readonly = 1 # until we know better 

130 

131 self.decoderconfig: tuple[Any, ...] = () 

132 self.decodermaxblock = MAXBLOCK 

133 

134 if is_path(fp): 

135 # filename 

136 self.fp = open(fp, "rb") 

137 self.filename = os.fspath(fp) 

138 self._exclusive_fp = True 

139 else: 

140 # stream 

141 self.fp = cast(IO[bytes], fp) 

142 self.filename = filename if filename is not None else "" 

143 # can be overridden 

144 self._exclusive_fp = False 

145 

146 try: 

147 try: 

148 self._open() 

149 except ( 

150 IndexError, # end of data 

151 TypeError, # end of data (ord) 

152 KeyError, # unsupported mode 

153 EOFError, # got header but not the first frame 

154 struct.error, 

155 ) as v: 

156 raise SyntaxError(v) from v 

157 

158 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

159 msg = "not identified by this driver" 

160 raise SyntaxError(msg) 

161 except BaseException: 

162 # close the file only if we have opened it this constructor 

163 if self._exclusive_fp: 

164 self.fp.close() 

165 raise 

166 

167 def _open(self) -> None: 

168 pass 

169 

170 def _close_fp(self): 

171 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError): 

172 if self._fp != self.fp: 

173 self._fp.close() 

174 self._fp = DeferredError(ValueError("Operation on closed image")) 

175 if self.fp: 

176 self.fp.close() 

177 

178 def close(self) -> None: 

179 """ 

180 Closes the file pointer, if possible. 

181 

182 This operation will destroy the image core and release its memory. 

183 The image data will be unusable afterward. 

184 

185 This function is required to close images that have multiple frames or 

186 have not had their file read and closed by the 

187 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for 

188 more information. 

189 """ 

190 try: 

191 self._close_fp() 

192 self.fp = None 

193 except Exception as msg: 

194 logger.debug("Error closing: %s", msg) 

195 

196 super().close() 

197 

198 def get_child_images(self) -> list[ImageFile]: 

199 child_images = [] 

200 exif = self.getexif() 

201 ifds = [] 

202 if ExifTags.Base.SubIFDs in exif: 

203 subifd_offsets = exif[ExifTags.Base.SubIFDs] 

204 if subifd_offsets: 

205 if not isinstance(subifd_offsets, tuple): 

206 subifd_offsets = (subifd_offsets,) 

207 for subifd_offset in subifd_offsets: 

208 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset)) 

209 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1) 

210 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset): 

211 assert exif._info is not None 

212 ifds.append((ifd1, exif._info.next)) 

213 

214 offset = None 

215 for ifd, ifd_offset in ifds: 

216 assert self.fp is not None 

217 current_offset = self.fp.tell() 

218 if offset is None: 

219 offset = current_offset 

220 

221 fp = self.fp 

222 if ifd is not None: 

223 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset) 

224 if thumbnail_offset is not None: 

225 thumbnail_offset += getattr(self, "_exif_offset", 0) 

226 self.fp.seek(thumbnail_offset) 

227 

228 length = ifd.get(ExifTags.Base.JpegIFByteCount) 

229 assert isinstance(length, int) 

230 data = self.fp.read(length) 

231 fp = io.BytesIO(data) 

232 

233 with Image.open(fp) as im: 

234 from . import TiffImagePlugin 

235 

236 if thumbnail_offset is None and isinstance( 

237 im, TiffImagePlugin.TiffImageFile 

238 ): 

239 im._frame_pos = [ifd_offset] 

240 im._seek(0) 

241 im.load() 

242 child_images.append(im) 

243 

244 if offset is not None: 

245 assert self.fp is not None 

246 self.fp.seek(offset) 

247 return child_images 

248 

249 def get_format_mimetype(self) -> str | None: 

250 if self.custom_mimetype: 

251 return self.custom_mimetype 

252 if self.format is not None: 

253 return Image.MIME.get(self.format.upper()) 

254 return None 

255 

256 def __getstate__(self) -> list[Any]: 

257 return super().__getstate__() + [self.filename] 

258 

259 def __setstate__(self, state: list[Any]) -> None: 

260 self.tile = [] 

261 if len(state) > 5: 

262 self.filename = state[5] 

263 super().__setstate__(state) 

264 

265 def verify(self) -> None: 

266 """Check file integrity""" 

267 

268 # raise exception if something's wrong. must be called 

269 # directly after open, and closes file when finished. 

270 if self._exclusive_fp: 

271 self.fp.close() 

272 self.fp = None 

273 

274 def load(self) -> Image.core.PixelAccess | None: 

275 """Load image data based on tile list""" 

276 

277 if not self.tile and self._im is None: 

278 msg = "cannot load this image" 

279 raise OSError(msg) 

280 

281 pixel = Image.Image.load(self) 

282 if not self.tile: 

283 return pixel 

284 

285 self.map: mmap.mmap | None = None 

286 use_mmap = self.filename and len(self.tile) == 1 

287 

288 readonly = 0 

289 

290 # look for read/seek overrides 

291 if hasattr(self, "load_read"): 

292 read = self.load_read 

293 # don't use mmap if there are custom read/seek functions 

294 use_mmap = False 

295 else: 

296 read = self.fp.read 

297 

298 if hasattr(self, "load_seek"): 

299 seek = self.load_seek 

300 use_mmap = False 

301 else: 

302 seek = self.fp.seek 

303 

304 if use_mmap: 

305 # try memory mapping 

306 decoder_name, extents, offset, args = self.tile[0] 

307 if isinstance(args, str): 

308 args = (args, 0, 1) 

309 if ( 

310 decoder_name == "raw" 

311 and isinstance(args, tuple) 

312 and len(args) >= 3 

313 and args[0] == self.mode 

314 and args[0] in Image._MAPMODES 

315 ): 

316 try: 

317 # use mmap, if possible 

318 import mmap 

319 

320 with open(self.filename) as fp: 

321 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

322 if offset + self.size[1] * args[1] > self.map.size(): 

323 msg = "buffer is not large enough" 

324 raise OSError(msg) 

325 self.im = Image.core.map_buffer( 

326 self.map, self.size, decoder_name, offset, args 

327 ) 

328 readonly = 1 

329 # After trashing self.im, 

330 # we might need to reload the palette data. 

331 if self.palette: 

332 self.palette.dirty = 1 

333 except (AttributeError, OSError, ImportError): 

334 self.map = None 

335 

336 self.load_prepare() 

337 err_code = -3 # initialize to unknown error 

338 if not self.map: 

339 # sort tiles in file order 

340 self.tile.sort(key=_tilesort) 

341 

342 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

343 prefix = getattr(self, "tile_prefix", b"") 

344 

345 # Remove consecutive duplicates that only differ by their offset 

346 self.tile = [ 

347 list(tiles)[-1] 

348 for _, tiles in itertools.groupby( 

349 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

350 ) 

351 ] 

352 for i, (decoder_name, extents, offset, args) in enumerate(self.tile): 

353 seek(offset) 

354 decoder = Image._getdecoder( 

355 self.mode, decoder_name, args, self.decoderconfig 

356 ) 

357 try: 

358 decoder.setimage(self.im, extents) 

359 if decoder.pulls_fd: 

360 decoder.setfd(self.fp) 

361 err_code = decoder.decode(b"")[1] 

362 else: 

363 b = prefix 

364 while True: 

365 read_bytes = self.decodermaxblock 

366 if i + 1 < len(self.tile): 

367 next_offset = self.tile[i + 1].offset 

368 if next_offset > offset: 

369 read_bytes = next_offset - offset 

370 try: 

371 s = read(read_bytes) 

372 except (IndexError, struct.error) as e: 

373 # truncated png/gif 

374 if LOAD_TRUNCATED_IMAGES: 

375 break 

376 else: 

377 msg = "image file is truncated" 

378 raise OSError(msg) from e 

379 

380 if not s: # truncated jpeg 

381 if LOAD_TRUNCATED_IMAGES: 

382 break 

383 else: 

384 msg = ( 

385 "image file is truncated " 

386 f"({len(b)} bytes not processed)" 

387 ) 

388 raise OSError(msg) 

389 

390 b = b + s 

391 n, err_code = decoder.decode(b) 

392 if n < 0: 

393 break 

394 b = b[n:] 

395 finally: 

396 # Need to cleanup here to prevent leaks 

397 decoder.cleanup() 

398 

399 self.tile = [] 

400 self.readonly = readonly 

401 

402 self.load_end() 

403 

404 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

405 self.fp.close() 

406 self.fp = None 

407 

408 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

409 # still raised if decoder fails to return anything 

410 raise _get_oserror(err_code, encoder=False) 

411 

412 return Image.Image.load(self) 

413 

414 def load_prepare(self) -> None: 

415 # create image memory if necessary 

416 if self._im is None: 

417 self.im = Image.core.new(self.mode, self.size) 

418 # create palette (optional) 

419 if self.mode == "P": 

420 Image.Image.load(self) 

421 

422 def load_end(self) -> None: 

423 # may be overridden 

424 pass 

425 

426 # may be defined for contained formats 

427 # def load_seek(self, pos: int) -> None: 

428 # pass 

429 

430 # may be defined for blocked formats (e.g. PNG) 

431 # def load_read(self, read_bytes: int) -> bytes: 

432 # pass 

433 

434 def _seek_check(self, frame: int) -> bool: 

435 if ( 

436 frame < self._min_frame 

437 # Only check upper limit on frames if additional seek operations 

438 # are not required to do so 

439 or ( 

440 not (hasattr(self, "_n_frames") and self._n_frames is None) 

441 and frame >= getattr(self, "n_frames") + self._min_frame 

442 ) 

443 ): 

444 msg = "attempt to seek outside sequence" 

445 raise EOFError(msg) 

446 

447 return self.tell() != frame 

448 

449 

450class StubHandler(abc.ABC): 

451 def open(self, im: StubImageFile) -> None: 

452 pass 

453 

454 @abc.abstractmethod 

455 def load(self, im: StubImageFile) -> Image.Image: 

456 pass 

457 

458 

459class StubImageFile(ImageFile, metaclass=abc.ABCMeta): 

460 """ 

461 Base class for stub image loaders. 

462 

463 A stub loader is an image loader that can identify files of a 

464 certain format, but relies on external code to load the file. 

465 """ 

466 

467 @abc.abstractmethod 

468 def _open(self) -> None: 

469 pass 

470 

471 def load(self) -> Image.core.PixelAccess | None: 

472 loader = self._load() 

473 if loader is None: 

474 msg = f"cannot find loader for this {self.format} file" 

475 raise OSError(msg) 

476 image = loader.load(self) 

477 assert image is not None 

478 # become the other object (!) 

479 self.__class__ = image.__class__ # type: ignore[assignment] 

480 self.__dict__ = image.__dict__ 

481 return image.load() 

482 

483 @abc.abstractmethod 

484 def _load(self) -> StubHandler | None: 

485 """(Hook) Find actual image loader.""" 

486 pass 

487 

488 

489class Parser: 

490 """ 

491 Incremental image parser. This class implements the standard 

492 feed/close consumer interface. 

493 """ 

494 

495 incremental = None 

496 image: Image.Image | None = None 

497 data: bytes | None = None 

498 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

499 offset = 0 

500 finished = 0 

501 

502 def reset(self) -> None: 

503 """ 

504 (Consumer) Reset the parser. Note that you can only call this 

505 method immediately after you've created a parser; parser 

506 instances cannot be reused. 

507 """ 

508 assert self.data is None, "cannot reuse parsers" 

509 

510 def feed(self, data: bytes) -> None: 

511 """ 

512 (Consumer) Feed data to the parser. 

513 

514 :param data: A string buffer. 

515 :exception OSError: If the parser failed to parse the image file. 

516 """ 

517 # collect data 

518 

519 if self.finished: 

520 return 

521 

522 if self.data is None: 

523 self.data = data 

524 else: 

525 self.data = self.data + data 

526 

527 # parse what we have 

528 if self.decoder: 

529 if self.offset > 0: 

530 # skip header 

531 skip = min(len(self.data), self.offset) 

532 self.data = self.data[skip:] 

533 self.offset = self.offset - skip 

534 if self.offset > 0 or not self.data: 

535 return 

536 

537 n, e = self.decoder.decode(self.data) 

538 

539 if n < 0: 

540 # end of stream 

541 self.data = None 

542 self.finished = 1 

543 if e < 0: 

544 # decoding error 

545 self.image = None 

546 raise _get_oserror(e, encoder=False) 

547 else: 

548 # end of image 

549 return 

550 self.data = self.data[n:] 

551 

552 elif self.image: 

553 # if we end up here with no decoder, this file cannot 

554 # be incrementally parsed. wait until we've gotten all 

555 # available data 

556 pass 

557 

558 else: 

559 # attempt to open this file 

560 try: 

561 with io.BytesIO(self.data) as fp: 

562 im = Image.open(fp) 

563 except OSError: 

564 pass # not enough data 

565 else: 

566 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

567 if flag or len(im.tile) != 1: 

568 # custom load code, or multiple tiles 

569 self.decode = None 

570 else: 

571 # initialize decoder 

572 im.load_prepare() 

573 d, e, o, a = im.tile[0] 

574 im.tile = [] 

575 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

576 self.decoder.setimage(im.im, e) 

577 

578 # calculate decoder offset 

579 self.offset = o 

580 if self.offset <= len(self.data): 

581 self.data = self.data[self.offset :] 

582 self.offset = 0 

583 

584 self.image = im 

585 

586 def __enter__(self) -> Parser: 

587 return self 

588 

589 def __exit__(self, *args: object) -> None: 

590 self.close() 

591 

592 def close(self) -> Image.Image: 

593 """ 

594 (Consumer) Close the stream. 

595 

596 :returns: An image object. 

597 :exception OSError: If the parser failed to parse the image file either 

598 because it cannot be identified or cannot be 

599 decoded. 

600 """ 

601 # finish decoding 

602 if self.decoder: 

603 # get rid of what's left in the buffers 

604 self.feed(b"") 

605 self.data = self.decoder = None 

606 if not self.finished: 

607 msg = "image was incomplete" 

608 raise OSError(msg) 

609 if not self.image: 

610 msg = "cannot parse this image" 

611 raise OSError(msg) 

612 if self.data: 

613 # incremental parsing not possible; reopen the file 

614 # not that we have all data 

615 with io.BytesIO(self.data) as fp: 

616 try: 

617 self.image = Image.open(fp) 

618 finally: 

619 self.image.load() 

620 return self.image 

621 

622 

623# -------------------------------------------------------------------- 

624 

625 

626def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

627 """Helper to save image based on tile list 

628 

629 :param im: Image object. 

630 :param fp: File object. 

631 :param tile: Tile list. 

632 :param bufsize: Optional buffer size 

633 """ 

634 

635 im.load() 

636 if not hasattr(im, "encoderconfig"): 

637 im.encoderconfig = () 

638 tile.sort(key=_tilesort) 

639 # FIXME: make MAXBLOCK a configuration parameter 

640 # It would be great if we could have the encoder specify what it needs 

641 # But, it would need at least the image size in most cases. RawEncode is 

642 # a tricky case. 

643 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

644 try: 

645 fh = fp.fileno() 

646 fp.flush() 

647 _encode_tile(im, fp, tile, bufsize, fh) 

648 except (AttributeError, io.UnsupportedOperation) as exc: 

649 _encode_tile(im, fp, tile, bufsize, None, exc) 

650 if hasattr(fp, "flush"): 

651 fp.flush() 

652 

653 

654def _encode_tile( 

655 im: Image.Image, 

656 fp: IO[bytes], 

657 tile: list[_Tile], 

658 bufsize: int, 

659 fh: int | None, 

660 exc: BaseException | None = None, 

661) -> None: 

662 for encoder_name, extents, offset, args in tile: 

663 if offset > 0: 

664 fp.seek(offset) 

665 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

666 try: 

667 encoder.setimage(im.im, extents) 

668 if encoder.pushes_fd: 

669 encoder.setfd(fp) 

670 errcode = encoder.encode_to_pyfd()[1] 

671 else: 

672 if exc: 

673 # compress to Python file-compatible object 

674 while True: 

675 errcode, data = encoder.encode(bufsize)[1:] 

676 fp.write(data) 

677 if errcode: 

678 break 

679 else: 

680 # slight speedup: compress to real file object 

681 assert fh is not None 

682 errcode = encoder.encode_to_file(fh, bufsize) 

683 if errcode < 0: 

684 raise _get_oserror(errcode, encoder=True) from exc 

685 finally: 

686 encoder.cleanup() 

687 

688 

689def _safe_read(fp: IO[bytes], size: int) -> bytes: 

690 """ 

691 Reads large blocks in a safe way. Unlike fp.read(n), this function 

692 doesn't trust the user. If the requested size is larger than 

693 SAFEBLOCK, the file is read block by block. 

694 

695 :param fp: File handle. Must implement a <b>read</b> method. 

696 :param size: Number of bytes to read. 

697 :returns: A string containing <i>size</i> bytes of data. 

698 

699 Raises an OSError if the file is truncated and the read cannot be completed 

700 

701 """ 

702 if size <= 0: 

703 return b"" 

704 if size <= SAFEBLOCK: 

705 data = fp.read(size) 

706 if len(data) < size: 

707 msg = "Truncated File Read" 

708 raise OSError(msg) 

709 return data 

710 blocks: list[bytes] = [] 

711 remaining_size = size 

712 while remaining_size > 0: 

713 block = fp.read(min(remaining_size, SAFEBLOCK)) 

714 if not block: 

715 break 

716 blocks.append(block) 

717 remaining_size -= len(block) 

718 if sum(len(block) for block in blocks) < size: 

719 msg = "Truncated File Read" 

720 raise OSError(msg) 

721 return b"".join(blocks) 

722 

723 

724class PyCodecState: 

725 def __init__(self) -> None: 

726 self.xsize = 0 

727 self.ysize = 0 

728 self.xoff = 0 

729 self.yoff = 0 

730 

731 def extents(self) -> tuple[int, int, int, int]: 

732 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

733 

734 

735class PyCodec: 

736 fd: IO[bytes] | None 

737 

738 def __init__(self, mode: str, *args: Any) -> None: 

739 self.im: Image.core.ImagingCore | None = None 

740 self.state = PyCodecState() 

741 self.fd = None 

742 self.mode = mode 

743 self.init(args) 

744 

745 def init(self, args: tuple[Any, ...]) -> None: 

746 """ 

747 Override to perform codec specific initialization 

748 

749 :param args: Tuple of arg items from the tile entry 

750 :returns: None 

751 """ 

752 self.args = args 

753 

754 def cleanup(self) -> None: 

755 """ 

756 Override to perform codec specific cleanup 

757 

758 :returns: None 

759 """ 

760 pass 

761 

762 def setfd(self, fd: IO[bytes]) -> None: 

763 """ 

764 Called from ImageFile to set the Python file-like object 

765 

766 :param fd: A Python file-like object 

767 :returns: None 

768 """ 

769 self.fd = fd 

770 

771 def setimage( 

772 self, 

773 im: Image.core.ImagingCore, 

774 extents: tuple[int, int, int, int] | None = None, 

775 ) -> None: 

776 """ 

777 Called from ImageFile to set the core output image for the codec 

778 

779 :param im: A core image object 

780 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

781 for this tile 

782 :returns: None 

783 """ 

784 

785 # following c code 

786 self.im = im 

787 

788 if extents: 

789 (x0, y0, x1, y1) = extents 

790 else: 

791 (x0, y0, x1, y1) = (0, 0, 0, 0) 

792 

793 if x0 == 0 and x1 == 0: 

794 self.state.xsize, self.state.ysize = self.im.size 

795 else: 

796 self.state.xoff = x0 

797 self.state.yoff = y0 

798 self.state.xsize = x1 - x0 

799 self.state.ysize = y1 - y0 

800 

801 if self.state.xsize <= 0 or self.state.ysize <= 0: 

802 msg = "Size cannot be negative" 

803 raise ValueError(msg) 

804 

805 if ( 

806 self.state.xsize + self.state.xoff > self.im.size[0] 

807 or self.state.ysize + self.state.yoff > self.im.size[1] 

808 ): 

809 msg = "Tile cannot extend outside image" 

810 raise ValueError(msg) 

811 

812 

813class PyDecoder(PyCodec): 

814 """ 

815 Python implementation of a format decoder. Override this class and 

816 add the decoding logic in the :meth:`decode` method. 

817 

818 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

819 """ 

820 

821 _pulls_fd = False 

822 

823 @property 

824 def pulls_fd(self) -> bool: 

825 return self._pulls_fd 

826 

827 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

828 """ 

829 Override to perform the decoding process. 

830 

831 :param buffer: A bytes object with the data to be decoded. 

832 :returns: A tuple of ``(bytes consumed, errcode)``. 

833 If finished with decoding return -1 for the bytes consumed. 

834 Err codes are from :data:`.ImageFile.ERRORS`. 

835 """ 

836 msg = "unavailable in base decoder" 

837 raise NotImplementedError(msg) 

838 

839 def set_as_raw( 

840 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

841 ) -> None: 

842 """ 

843 Convenience method to set the internal image from a stream of raw data 

844 

845 :param data: Bytes to be set 

846 :param rawmode: The rawmode to be used for the decoder. 

847 If not specified, it will default to the mode of the image 

848 :param extra: Extra arguments for the decoder. 

849 :returns: None 

850 """ 

851 

852 if not rawmode: 

853 rawmode = self.mode 

854 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

855 assert self.im is not None 

856 d.setimage(self.im, self.state.extents()) 

857 s = d.decode(data) 

858 

859 if s[0] >= 0: 

860 msg = "not enough image data" 

861 raise ValueError(msg) 

862 if s[1] != 0: 

863 msg = "cannot decode image data" 

864 raise ValueError(msg) 

865 

866 

867class PyEncoder(PyCodec): 

868 """ 

869 Python implementation of a format encoder. Override this class and 

870 add the decoding logic in the :meth:`encode` method. 

871 

872 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

873 """ 

874 

875 _pushes_fd = False 

876 

877 @property 

878 def pushes_fd(self) -> bool: 

879 return self._pushes_fd 

880 

881 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

882 """ 

883 Override to perform the encoding process. 

884 

885 :param bufsize: Buffer size. 

886 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

887 If finished with encoding return 1 for the error code. 

888 Err codes are from :data:`.ImageFile.ERRORS`. 

889 """ 

890 msg = "unavailable in base encoder" 

891 raise NotImplementedError(msg) 

892 

893 def encode_to_pyfd(self) -> tuple[int, int]: 

894 """ 

895 If ``pushes_fd`` is ``True``, then this method will be used, 

896 and ``encode()`` will only be called once. 

897 

898 :returns: A tuple of ``(bytes consumed, errcode)``. 

899 Err codes are from :data:`.ImageFile.ERRORS`. 

900 """ 

901 if not self.pushes_fd: 

902 return 0, -8 # bad configuration 

903 bytes_consumed, errcode, data = self.encode(0) 

904 if data: 

905 assert self.fd is not None 

906 self.fd.write(data) 

907 return bytes_consumed, errcode 

908 

909 def encode_to_file(self, fh: int, bufsize: int) -> int: 

910 """ 

911 :param fh: File handle. 

912 :param bufsize: Buffer size. 

913 

914 :returns: If finished successfully, return 0. 

915 Otherwise, return an error code. Err codes are from 

916 :data:`.ImageFile.ERRORS`. 

917 """ 

918 errcode = 0 

919 while errcode == 0: 

920 status, errcode, buf = self.encode(bufsize) 

921 if status > 0: 

922 os.write(fh, buf[status:]) 

923 return errcode