Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

474 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import logging 

35import os 

36import struct 

37from typing import IO, Any, NamedTuple, cast 

38 

39from . import ExifTags, Image 

40from ._deprecate import deprecate 

41from ._util import DeferredError, is_path 

42 

43TYPE_CHECKING = False 

44if TYPE_CHECKING: 

45 from ._typing import StrOrBytesPath 

46 

47logger = logging.getLogger(__name__) 

48 

49MAXBLOCK = 65536 

50 

51SAFEBLOCK = 1024 * 1024 

52 

53LOAD_TRUNCATED_IMAGES = False 

54"""Whether or not to load truncated image files. User code may change this.""" 

55 

56ERRORS = { 

57 -1: "image buffer overrun error", 

58 -2: "decoding error", 

59 -3: "unknown error", 

60 -8: "bad configuration", 

61 -9: "out of memory error", 

62} 

63""" 

64Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

65:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

66:meth:`.PyEncoder.encode_to_file`. 

67""" 

68 

69 

70# 

71# -------------------------------------------------------------------- 

72# Helpers 

73 

74 

75def _get_oserror(error: int, *, encoder: bool) -> OSError: 

76 try: 

77 msg = Image.core.getcodecstatus(error) 

78 except AttributeError: 

79 msg = ERRORS.get(error) 

80 if not msg: 

81 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

82 msg += f" when {'writing' if encoder else 'reading'} image file" 

83 return OSError(msg) 

84 

85 

86def raise_oserror(error: int) -> OSError: 

87 deprecate( 

88 "raise_oserror", 

89 12, 

90 action="It is only useful for translating error codes returned by a codec's " 

91 "decode() method, which ImageFile already does automatically.", 

92 ) 

93 raise _get_oserror(error, encoder=False) 

94 

95 

96def _tilesort(t: _Tile) -> int: 

97 # sort on offset 

98 return t[2] 

99 

100 

101class _Tile(NamedTuple): 

102 codec_name: str 

103 extents: tuple[int, int, int, int] | None 

104 offset: int = 0 

105 args: tuple[Any, ...] | str | None = None 

106 

107 

108# 

109# -------------------------------------------------------------------- 

110# ImageFile base class 

111 

112 

113class ImageFile(Image.Image): 

114 """Base class for image file format handlers.""" 

115 

116 def __init__( 

117 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

118 ) -> None: 

119 super().__init__() 

120 

121 self._min_frame = 0 

122 

123 self.custom_mimetype: str | None = None 

124 

125 self.tile: list[_Tile] = [] 

126 """ A list of tile descriptors """ 

127 

128 self.readonly = 1 # until we know better 

129 

130 self.decoderconfig: tuple[Any, ...] = () 

131 self.decodermaxblock = MAXBLOCK 

132 

133 if is_path(fp): 

134 # filename 

135 self.fp = open(fp, "rb") 

136 self.filename = os.fspath(fp) 

137 self._exclusive_fp = True 

138 else: 

139 # stream 

140 self.fp = cast(IO[bytes], fp) 

141 self.filename = filename if filename is not None else "" 

142 # can be overridden 

143 self._exclusive_fp = False 

144 

145 try: 

146 try: 

147 self._open() 

148 except ( 

149 IndexError, # end of data 

150 TypeError, # end of data (ord) 

151 KeyError, # unsupported mode 

152 EOFError, # got header but not the first frame 

153 struct.error, 

154 ) as v: 

155 raise SyntaxError(v) from v 

156 

157 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

158 msg = "not identified by this driver" 

159 raise SyntaxError(msg) 

160 except BaseException: 

161 # close the file only if we have opened it this constructor 

162 if self._exclusive_fp: 

163 self.fp.close() 

164 raise 

165 

166 def _open(self) -> None: 

167 pass 

168 

169 def _close_fp(self): 

170 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError): 

171 if self._fp != self.fp: 

172 self._fp.close() 

173 self._fp = DeferredError(ValueError("Operation on closed image")) 

174 if self.fp: 

175 self.fp.close() 

176 

177 def close(self) -> None: 

178 """ 

179 Closes the file pointer, if possible. 

180 

181 This operation will destroy the image core and release its memory. 

182 The image data will be unusable afterward. 

183 

184 This function is required to close images that have multiple frames or 

185 have not had their file read and closed by the 

186 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for 

187 more information. 

188 """ 

189 try: 

190 self._close_fp() 

191 self.fp = None 

192 except Exception as msg: 

193 logger.debug("Error closing: %s", msg) 

194 

195 super().close() 

196 

197 def get_child_images(self) -> list[ImageFile]: 

198 child_images = [] 

199 exif = self.getexif() 

200 ifds = [] 

201 if ExifTags.Base.SubIFDs in exif: 

202 subifd_offsets = exif[ExifTags.Base.SubIFDs] 

203 if subifd_offsets: 

204 if not isinstance(subifd_offsets, tuple): 

205 subifd_offsets = (subifd_offsets,) 

206 for subifd_offset in subifd_offsets: 

207 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset)) 

208 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1) 

209 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset): 

210 assert exif._info is not None 

211 ifds.append((ifd1, exif._info.next)) 

212 

213 offset = None 

214 for ifd, ifd_offset in ifds: 

215 assert self.fp is not None 

216 current_offset = self.fp.tell() 

217 if offset is None: 

218 offset = current_offset 

219 

220 fp = self.fp 

221 if ifd is not None: 

222 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset) 

223 if thumbnail_offset is not None: 

224 thumbnail_offset += getattr(self, "_exif_offset", 0) 

225 self.fp.seek(thumbnail_offset) 

226 

227 length = ifd.get(ExifTags.Base.JpegIFByteCount) 

228 assert isinstance(length, int) 

229 data = self.fp.read(length) 

230 fp = io.BytesIO(data) 

231 

232 with Image.open(fp) as im: 

233 from . import TiffImagePlugin 

234 

235 if thumbnail_offset is None and isinstance( 

236 im, TiffImagePlugin.TiffImageFile 

237 ): 

238 im._frame_pos = [ifd_offset] 

239 im._seek(0) 

240 im.load() 

241 child_images.append(im) 

242 

243 if offset is not None: 

244 assert self.fp is not None 

245 self.fp.seek(offset) 

246 return child_images 

247 

248 def get_format_mimetype(self) -> str | None: 

249 if self.custom_mimetype: 

250 return self.custom_mimetype 

251 if self.format is not None: 

252 return Image.MIME.get(self.format.upper()) 

253 return None 

254 

255 def __getstate__(self) -> list[Any]: 

256 return super().__getstate__() + [self.filename] 

257 

258 def __setstate__(self, state: list[Any]) -> None: 

259 self.tile = [] 

260 if len(state) > 5: 

261 self.filename = state[5] 

262 super().__setstate__(state) 

263 

264 def verify(self) -> None: 

265 """Check file integrity""" 

266 

267 # raise exception if something's wrong. must be called 

268 # directly after open, and closes file when finished. 

269 if self._exclusive_fp: 

270 self.fp.close() 

271 self.fp = None 

272 

273 def load(self) -> Image.core.PixelAccess | None: 

274 """Load image data based on tile list""" 

275 

276 if not self.tile and self._im is None: 

277 msg = "cannot load this image" 

278 raise OSError(msg) 

279 

280 pixel = Image.Image.load(self) 

281 if not self.tile: 

282 return pixel 

283 

284 self.map: mmap.mmap | None = None 

285 use_mmap = self.filename and len(self.tile) == 1 

286 

287 readonly = 0 

288 

289 # look for read/seek overrides 

290 if hasattr(self, "load_read"): 

291 read = self.load_read 

292 # don't use mmap if there are custom read/seek functions 

293 use_mmap = False 

294 else: 

295 read = self.fp.read 

296 

297 if hasattr(self, "load_seek"): 

298 seek = self.load_seek 

299 use_mmap = False 

300 else: 

301 seek = self.fp.seek 

302 

303 if use_mmap: 

304 # try memory mapping 

305 decoder_name, extents, offset, args = self.tile[0] 

306 if isinstance(args, str): 

307 args = (args, 0, 1) 

308 if ( 

309 decoder_name == "raw" 

310 and isinstance(args, tuple) 

311 and len(args) >= 3 

312 and args[0] == self.mode 

313 and args[0] in Image._MAPMODES 

314 ): 

315 try: 

316 # use mmap, if possible 

317 import mmap 

318 

319 with open(self.filename) as fp: 

320 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

321 if offset + self.size[1] * args[1] > self.map.size(): 

322 msg = "buffer is not large enough" 

323 raise OSError(msg) 

324 self.im = Image.core.map_buffer( 

325 self.map, self.size, decoder_name, offset, args 

326 ) 

327 readonly = 1 

328 # After trashing self.im, 

329 # we might need to reload the palette data. 

330 if self.palette: 

331 self.palette.dirty = 1 

332 except (AttributeError, OSError, ImportError): 

333 self.map = None 

334 

335 self.load_prepare() 

336 err_code = -3 # initialize to unknown error 

337 if not self.map: 

338 # sort tiles in file order 

339 self.tile.sort(key=_tilesort) 

340 

341 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

342 prefix = getattr(self, "tile_prefix", b"") 

343 

344 # Remove consecutive duplicates that only differ by their offset 

345 self.tile = [ 

346 list(tiles)[-1] 

347 for _, tiles in itertools.groupby( 

348 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

349 ) 

350 ] 

351 for i, (decoder_name, extents, offset, args) in enumerate(self.tile): 

352 seek(offset) 

353 decoder = Image._getdecoder( 

354 self.mode, decoder_name, args, self.decoderconfig 

355 ) 

356 try: 

357 decoder.setimage(self.im, extents) 

358 if decoder.pulls_fd: 

359 decoder.setfd(self.fp) 

360 err_code = decoder.decode(b"")[1] 

361 else: 

362 b = prefix 

363 while True: 

364 read_bytes = self.decodermaxblock 

365 if i + 1 < len(self.tile): 

366 next_offset = self.tile[i + 1].offset 

367 if next_offset > offset: 

368 read_bytes = next_offset - offset 

369 try: 

370 s = read(read_bytes) 

371 except (IndexError, struct.error) as e: 

372 # truncated png/gif 

373 if LOAD_TRUNCATED_IMAGES: 

374 break 

375 else: 

376 msg = "image file is truncated" 

377 raise OSError(msg) from e 

378 

379 if not s: # truncated jpeg 

380 if LOAD_TRUNCATED_IMAGES: 

381 break 

382 else: 

383 msg = ( 

384 "image file is truncated " 

385 f"({len(b)} bytes not processed)" 

386 ) 

387 raise OSError(msg) 

388 

389 b = b + s 

390 n, err_code = decoder.decode(b) 

391 if n < 0: 

392 break 

393 b = b[n:] 

394 finally: 

395 # Need to cleanup here to prevent leaks 

396 decoder.cleanup() 

397 

398 self.tile = [] 

399 self.readonly = readonly 

400 

401 self.load_end() 

402 

403 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

404 self.fp.close() 

405 self.fp = None 

406 

407 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

408 # still raised if decoder fails to return anything 

409 raise _get_oserror(err_code, encoder=False) 

410 

411 return Image.Image.load(self) 

412 

413 def load_prepare(self) -> None: 

414 # create image memory if necessary 

415 if self._im is None: 

416 self.im = Image.core.new(self.mode, self.size) 

417 # create palette (optional) 

418 if self.mode == "P": 

419 Image.Image.load(self) 

420 

421 def load_end(self) -> None: 

422 # may be overridden 

423 pass 

424 

425 # may be defined for contained formats 

426 # def load_seek(self, pos: int) -> None: 

427 # pass 

428 

429 # may be defined for blocked formats (e.g. PNG) 

430 # def load_read(self, read_bytes: int) -> bytes: 

431 # pass 

432 

433 def _seek_check(self, frame: int) -> bool: 

434 if ( 

435 frame < self._min_frame 

436 # Only check upper limit on frames if additional seek operations 

437 # are not required to do so 

438 or ( 

439 not (hasattr(self, "_n_frames") and self._n_frames is None) 

440 and frame >= getattr(self, "n_frames") + self._min_frame 

441 ) 

442 ): 

443 msg = "attempt to seek outside sequence" 

444 raise EOFError(msg) 

445 

446 return self.tell() != frame 

447 

448 

449class StubHandler(abc.ABC): 

450 def open(self, im: StubImageFile) -> None: 

451 pass 

452 

453 @abc.abstractmethod 

454 def load(self, im: StubImageFile) -> Image.Image: 

455 pass 

456 

457 

458class StubImageFile(ImageFile, metaclass=abc.ABCMeta): 

459 """ 

460 Base class for stub image loaders. 

461 

462 A stub loader is an image loader that can identify files of a 

463 certain format, but relies on external code to load the file. 

464 """ 

465 

466 @abc.abstractmethod 

467 def _open(self) -> None: 

468 pass 

469 

470 def load(self) -> Image.core.PixelAccess | None: 

471 loader = self._load() 

472 if loader is None: 

473 msg = f"cannot find loader for this {self.format} file" 

474 raise OSError(msg) 

475 image = loader.load(self) 

476 assert image is not None 

477 # become the other object (!) 

478 self.__class__ = image.__class__ # type: ignore[assignment] 

479 self.__dict__ = image.__dict__ 

480 return image.load() 

481 

482 @abc.abstractmethod 

483 def _load(self) -> StubHandler | None: 

484 """(Hook) Find actual image loader.""" 

485 pass 

486 

487 

488class Parser: 

489 """ 

490 Incremental image parser. This class implements the standard 

491 feed/close consumer interface. 

492 """ 

493 

494 incremental = None 

495 image: Image.Image | None = None 

496 data: bytes | None = None 

497 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

498 offset = 0 

499 finished = 0 

500 

501 def reset(self) -> None: 

502 """ 

503 (Consumer) Reset the parser. Note that you can only call this 

504 method immediately after you've created a parser; parser 

505 instances cannot be reused. 

506 """ 

507 assert self.data is None, "cannot reuse parsers" 

508 

509 def feed(self, data: bytes) -> None: 

510 """ 

511 (Consumer) Feed data to the parser. 

512 

513 :param data: A string buffer. 

514 :exception OSError: If the parser failed to parse the image file. 

515 """ 

516 # collect data 

517 

518 if self.finished: 

519 return 

520 

521 if self.data is None: 

522 self.data = data 

523 else: 

524 self.data = self.data + data 

525 

526 # parse what we have 

527 if self.decoder: 

528 if self.offset > 0: 

529 # skip header 

530 skip = min(len(self.data), self.offset) 

531 self.data = self.data[skip:] 

532 self.offset = self.offset - skip 

533 if self.offset > 0 or not self.data: 

534 return 

535 

536 n, e = self.decoder.decode(self.data) 

537 

538 if n < 0: 

539 # end of stream 

540 self.data = None 

541 self.finished = 1 

542 if e < 0: 

543 # decoding error 

544 self.image = None 

545 raise _get_oserror(e, encoder=False) 

546 else: 

547 # end of image 

548 return 

549 self.data = self.data[n:] 

550 

551 elif self.image: 

552 # if we end up here with no decoder, this file cannot 

553 # be incrementally parsed. wait until we've gotten all 

554 # available data 

555 pass 

556 

557 else: 

558 # attempt to open this file 

559 try: 

560 with io.BytesIO(self.data) as fp: 

561 im = Image.open(fp) 

562 except OSError: 

563 pass # not enough data 

564 else: 

565 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

566 if flag or len(im.tile) != 1: 

567 # custom load code, or multiple tiles 

568 self.decode = None 

569 else: 

570 # initialize decoder 

571 im.load_prepare() 

572 d, e, o, a = im.tile[0] 

573 im.tile = [] 

574 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

575 self.decoder.setimage(im.im, e) 

576 

577 # calculate decoder offset 

578 self.offset = o 

579 if self.offset <= len(self.data): 

580 self.data = self.data[self.offset :] 

581 self.offset = 0 

582 

583 self.image = im 

584 

585 def __enter__(self) -> Parser: 

586 return self 

587 

588 def __exit__(self, *args: object) -> None: 

589 self.close() 

590 

591 def close(self) -> Image.Image: 

592 """ 

593 (Consumer) Close the stream. 

594 

595 :returns: An image object. 

596 :exception OSError: If the parser failed to parse the image file either 

597 because it cannot be identified or cannot be 

598 decoded. 

599 """ 

600 # finish decoding 

601 if self.decoder: 

602 # get rid of what's left in the buffers 

603 self.feed(b"") 

604 self.data = self.decoder = None 

605 if not self.finished: 

606 msg = "image was incomplete" 

607 raise OSError(msg) 

608 if not self.image: 

609 msg = "cannot parse this image" 

610 raise OSError(msg) 

611 if self.data: 

612 # incremental parsing not possible; reopen the file 

613 # not that we have all data 

614 with io.BytesIO(self.data) as fp: 

615 try: 

616 self.image = Image.open(fp) 

617 finally: 

618 self.image.load() 

619 return self.image 

620 

621 

622# -------------------------------------------------------------------- 

623 

624 

625def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

626 """Helper to save image based on tile list 

627 

628 :param im: Image object. 

629 :param fp: File object. 

630 :param tile: Tile list. 

631 :param bufsize: Optional buffer size 

632 """ 

633 

634 im.load() 

635 if not hasattr(im, "encoderconfig"): 

636 im.encoderconfig = () 

637 tile.sort(key=_tilesort) 

638 # FIXME: make MAXBLOCK a configuration parameter 

639 # It would be great if we could have the encoder specify what it needs 

640 # But, it would need at least the image size in most cases. RawEncode is 

641 # a tricky case. 

642 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

643 try: 

644 fh = fp.fileno() 

645 fp.flush() 

646 _encode_tile(im, fp, tile, bufsize, fh) 

647 except (AttributeError, io.UnsupportedOperation) as exc: 

648 _encode_tile(im, fp, tile, bufsize, None, exc) 

649 if hasattr(fp, "flush"): 

650 fp.flush() 

651 

652 

653def _encode_tile( 

654 im: Image.Image, 

655 fp: IO[bytes], 

656 tile: list[_Tile], 

657 bufsize: int, 

658 fh: int | None, 

659 exc: BaseException | None = None, 

660) -> None: 

661 for encoder_name, extents, offset, args in tile: 

662 if offset > 0: 

663 fp.seek(offset) 

664 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

665 try: 

666 encoder.setimage(im.im, extents) 

667 if encoder.pushes_fd: 

668 encoder.setfd(fp) 

669 errcode = encoder.encode_to_pyfd()[1] 

670 else: 

671 if exc: 

672 # compress to Python file-compatible object 

673 while True: 

674 errcode, data = encoder.encode(bufsize)[1:] 

675 fp.write(data) 

676 if errcode: 

677 break 

678 else: 

679 # slight speedup: compress to real file object 

680 assert fh is not None 

681 errcode = encoder.encode_to_file(fh, bufsize) 

682 if errcode < 0: 

683 raise _get_oserror(errcode, encoder=True) from exc 

684 finally: 

685 encoder.cleanup() 

686 

687 

688def _safe_read(fp: IO[bytes], size: int) -> bytes: 

689 """ 

690 Reads large blocks in a safe way. Unlike fp.read(n), this function 

691 doesn't trust the user. If the requested size is larger than 

692 SAFEBLOCK, the file is read block by block. 

693 

694 :param fp: File handle. Must implement a <b>read</b> method. 

695 :param size: Number of bytes to read. 

696 :returns: A string containing <i>size</i> bytes of data. 

697 

698 Raises an OSError if the file is truncated and the read cannot be completed 

699 

700 """ 

701 if size <= 0: 

702 return b"" 

703 if size <= SAFEBLOCK: 

704 data = fp.read(size) 

705 if len(data) < size: 

706 msg = "Truncated File Read" 

707 raise OSError(msg) 

708 return data 

709 blocks: list[bytes] = [] 

710 remaining_size = size 

711 while remaining_size > 0: 

712 block = fp.read(min(remaining_size, SAFEBLOCK)) 

713 if not block: 

714 break 

715 blocks.append(block) 

716 remaining_size -= len(block) 

717 if sum(len(block) for block in blocks) < size: 

718 msg = "Truncated File Read" 

719 raise OSError(msg) 

720 return b"".join(blocks) 

721 

722 

723class PyCodecState: 

724 def __init__(self) -> None: 

725 self.xsize = 0 

726 self.ysize = 0 

727 self.xoff = 0 

728 self.yoff = 0 

729 

730 def extents(self) -> tuple[int, int, int, int]: 

731 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

732 

733 

734class PyCodec: 

735 fd: IO[bytes] | None 

736 

737 def __init__(self, mode: str, *args: Any) -> None: 

738 self.im: Image.core.ImagingCore | None = None 

739 self.state = PyCodecState() 

740 self.fd = None 

741 self.mode = mode 

742 self.init(args) 

743 

744 def init(self, args: tuple[Any, ...]) -> None: 

745 """ 

746 Override to perform codec specific initialization 

747 

748 :param args: Tuple of arg items from the tile entry 

749 :returns: None 

750 """ 

751 self.args = args 

752 

753 def cleanup(self) -> None: 

754 """ 

755 Override to perform codec specific cleanup 

756 

757 :returns: None 

758 """ 

759 pass 

760 

761 def setfd(self, fd: IO[bytes]) -> None: 

762 """ 

763 Called from ImageFile to set the Python file-like object 

764 

765 :param fd: A Python file-like object 

766 :returns: None 

767 """ 

768 self.fd = fd 

769 

770 def setimage( 

771 self, 

772 im: Image.core.ImagingCore, 

773 extents: tuple[int, int, int, int] | None = None, 

774 ) -> None: 

775 """ 

776 Called from ImageFile to set the core output image for the codec 

777 

778 :param im: A core image object 

779 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

780 for this tile 

781 :returns: None 

782 """ 

783 

784 # following c code 

785 self.im = im 

786 

787 if extents: 

788 (x0, y0, x1, y1) = extents 

789 else: 

790 (x0, y0, x1, y1) = (0, 0, 0, 0) 

791 

792 if x0 == 0 and x1 == 0: 

793 self.state.xsize, self.state.ysize = self.im.size 

794 else: 

795 self.state.xoff = x0 

796 self.state.yoff = y0 

797 self.state.xsize = x1 - x0 

798 self.state.ysize = y1 - y0 

799 

800 if self.state.xsize <= 0 or self.state.ysize <= 0: 

801 msg = "Size cannot be negative" 

802 raise ValueError(msg) 

803 

804 if ( 

805 self.state.xsize + self.state.xoff > self.im.size[0] 

806 or self.state.ysize + self.state.yoff > self.im.size[1] 

807 ): 

808 msg = "Tile cannot extend outside image" 

809 raise ValueError(msg) 

810 

811 

812class PyDecoder(PyCodec): 

813 """ 

814 Python implementation of a format decoder. Override this class and 

815 add the decoding logic in the :meth:`decode` method. 

816 

817 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

818 """ 

819 

820 _pulls_fd = False 

821 

822 @property 

823 def pulls_fd(self) -> bool: 

824 return self._pulls_fd 

825 

826 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

827 """ 

828 Override to perform the decoding process. 

829 

830 :param buffer: A bytes object with the data to be decoded. 

831 :returns: A tuple of ``(bytes consumed, errcode)``. 

832 If finished with decoding return -1 for the bytes consumed. 

833 Err codes are from :data:`.ImageFile.ERRORS`. 

834 """ 

835 msg = "unavailable in base decoder" 

836 raise NotImplementedError(msg) 

837 

838 def set_as_raw( 

839 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

840 ) -> None: 

841 """ 

842 Convenience method to set the internal image from a stream of raw data 

843 

844 :param data: Bytes to be set 

845 :param rawmode: The rawmode to be used for the decoder. 

846 If not specified, it will default to the mode of the image 

847 :param extra: Extra arguments for the decoder. 

848 :returns: None 

849 """ 

850 

851 if not rawmode: 

852 rawmode = self.mode 

853 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

854 assert self.im is not None 

855 d.setimage(self.im, self.state.extents()) 

856 s = d.decode(data) 

857 

858 if s[0] >= 0: 

859 msg = "not enough image data" 

860 raise ValueError(msg) 

861 if s[1] != 0: 

862 msg = "cannot decode image data" 

863 raise ValueError(msg) 

864 

865 

866class PyEncoder(PyCodec): 

867 """ 

868 Python implementation of a format encoder. Override this class and 

869 add the decoding logic in the :meth:`encode` method. 

870 

871 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

872 """ 

873 

874 _pushes_fd = False 

875 

876 @property 

877 def pushes_fd(self) -> bool: 

878 return self._pushes_fd 

879 

880 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

881 """ 

882 Override to perform the encoding process. 

883 

884 :param bufsize: Buffer size. 

885 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

886 If finished with encoding return 1 for the error code. 

887 Err codes are from :data:`.ImageFile.ERRORS`. 

888 """ 

889 msg = "unavailable in base encoder" 

890 raise NotImplementedError(msg) 

891 

892 def encode_to_pyfd(self) -> tuple[int, int]: 

893 """ 

894 If ``pushes_fd`` is ``True``, then this method will be used, 

895 and ``encode()`` will only be called once. 

896 

897 :returns: A tuple of ``(bytes consumed, errcode)``. 

898 Err codes are from :data:`.ImageFile.ERRORS`. 

899 """ 

900 if not self.pushes_fd: 

901 return 0, -8 # bad configuration 

902 bytes_consumed, errcode, data = self.encode(0) 

903 if data: 

904 assert self.fd is not None 

905 self.fd.write(data) 

906 return bytes_consumed, errcode 

907 

908 def encode_to_file(self, fh: int, bufsize: int) -> int: 

909 """ 

910 :param fh: File handle. 

911 :param bufsize: Buffer size. 

912 

913 :returns: If finished successfully, return 0. 

914 Otherwise, return an error code. Err codes are from 

915 :data:`.ImageFile.ERRORS`. 

916 """ 

917 errcode = 0 

918 while errcode == 0: 

919 status, errcode, buf = self.encode(bufsize) 

920 if status > 0: 

921 os.write(fh, buf[status:]) 

922 return errcode