Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

483 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import logging 

35import os 

36import struct 

37from typing import IO, Any, NamedTuple, cast 

38 

39from . import ExifTags, Image 

40from ._util import DeferredError, is_path 

41 

42TYPE_CHECKING = False 

43if TYPE_CHECKING: 

44 from ._typing import StrOrBytesPath 

45 

46logger = logging.getLogger(__name__) 

47 

48MAXBLOCK = 65536 

49""" 

50By default, Pillow processes image data in blocks. This helps to prevent excessive use 

51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``. 

52 

53When reading an image, this is the number of bytes to read at once. 

54 

55When writing an image, this is the number of bytes to write at once. 

56If the image width times 4 is greater, then that will be used instead. 

57Plugins may also set a greater number. 

58 

59User code may set this to another number. 

60""" 

61 

62SAFEBLOCK = 1024 * 1024 

63 

64LOAD_TRUNCATED_IMAGES = False 

65"""Whether or not to load truncated image files. User code may change this.""" 

66 

67ERRORS = { 

68 -1: "image buffer overrun error", 

69 -2: "decoding error", 

70 -3: "unknown error", 

71 -8: "bad configuration", 

72 -9: "out of memory error", 

73} 

74""" 

75Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

77:meth:`.PyEncoder.encode_to_file`. 

78""" 

79 

80 

81# 

82# -------------------------------------------------------------------- 

83# Helpers 

84 

85 

86def _get_oserror(error: int, *, encoder: bool) -> OSError: 

87 try: 

88 msg = Image.core.getcodecstatus(error) 

89 except AttributeError: 

90 msg = ERRORS.get(error) 

91 if not msg: 

92 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

93 msg += f" when {'writing' if encoder else 'reading'} image file" 

94 return OSError(msg) 

95 

96 

97def _tilesort(t: _Tile) -> int: 

98 # sort on offset 

99 return t[2] 

100 

101 

102class _Tile(NamedTuple): 

103 codec_name: str 

104 extents: tuple[int, int, int, int] | None 

105 offset: int = 0 

106 args: tuple[Any, ...] | str | None = None 

107 

108 

109# 

110# -------------------------------------------------------------------- 

111# ImageFile base class 

112 

113 

114class ImageFile(Image.Image): 

115 """Base class for image file format handlers.""" 

116 

117 def __init__( 

118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

119 ) -> None: 

120 super().__init__() 

121 

122 self._min_frame = 0 

123 

124 self.custom_mimetype: str | None = None 

125 

126 self.tile: list[_Tile] = [] 

127 """ A list of tile descriptors """ 

128 

129 self.readonly = 1 # until we know better 

130 

131 self.decoderconfig: tuple[Any, ...] = () 

132 self.decodermaxblock = MAXBLOCK 

133 

134 self.fp: IO[bytes] | None 

135 self._fp: IO[bytes] | DeferredError 

136 if is_path(fp): 

137 # filename 

138 self.fp = open(fp, "rb") 

139 self.filename = os.fspath(fp) 

140 self._exclusive_fp = True 

141 else: 

142 # stream 

143 self.fp = cast(IO[bytes], fp) 

144 self.filename = filename if filename is not None else "" 

145 # can be overridden 

146 self._exclusive_fp = False 

147 

148 try: 

149 try: 

150 self._open() 

151 except ( 

152 IndexError, # end of data 

153 TypeError, # end of data (ord) 

154 KeyError, # unsupported mode 

155 EOFError, # got header but not the first frame 

156 struct.error, 

157 ) as v: 

158 raise SyntaxError(v) from v 

159 

160 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

161 msg = "not identified by this driver" 

162 raise SyntaxError(msg) 

163 except BaseException: 

164 # close the file only if we have opened it this constructor 

165 if self._exclusive_fp: 

166 self.fp.close() 

167 raise 

168 

169 def _open(self) -> None: 

170 pass 

171 

172 # Context manager support 

173 def __enter__(self) -> ImageFile: 

174 return self 

175 

176 def _close_fp(self) -> None: 

177 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError): 

178 if self._fp != self.fp: 

179 self._fp.close() 

180 self._fp = DeferredError(ValueError("Operation on closed image")) 

181 if self.fp: 

182 self.fp.close() 

183 

184 def __exit__(self, *args: object) -> None: 

185 if getattr(self, "_exclusive_fp", False): 

186 self._close_fp() 

187 self.fp = None 

188 

189 def close(self) -> None: 

190 """ 

191 Closes the file pointer, if possible. 

192 

193 This operation will destroy the image core and release its memory. 

194 The image data will be unusable afterward. 

195 

196 This function is required to close images that have multiple frames or 

197 have not had their file read and closed by the 

198 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for 

199 more information. 

200 """ 

201 try: 

202 self._close_fp() 

203 self.fp = None 

204 except Exception as msg: 

205 logger.debug("Error closing: %s", msg) 

206 

207 super().close() 

208 

209 def get_child_images(self) -> list[ImageFile]: 

210 child_images = [] 

211 exif = self.getexif() 

212 ifds = [] 

213 if ExifTags.Base.SubIFDs in exif: 

214 subifd_offsets = exif[ExifTags.Base.SubIFDs] 

215 if subifd_offsets: 

216 if not isinstance(subifd_offsets, tuple): 

217 subifd_offsets = (subifd_offsets,) 

218 for subifd_offset in subifd_offsets: 

219 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset)) 

220 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1) 

221 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset): 

222 assert exif._info is not None 

223 ifds.append((ifd1, exif._info.next)) 

224 

225 offset = None 

226 for ifd, ifd_offset in ifds: 

227 assert self.fp is not None 

228 current_offset = self.fp.tell() 

229 if offset is None: 

230 offset = current_offset 

231 

232 fp = self.fp 

233 if ifd is not None: 

234 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset) 

235 if thumbnail_offset is not None: 

236 thumbnail_offset += getattr(self, "_exif_offset", 0) 

237 self.fp.seek(thumbnail_offset) 

238 

239 length = ifd.get(ExifTags.Base.JpegIFByteCount) 

240 assert isinstance(length, int) 

241 data = self.fp.read(length) 

242 fp = io.BytesIO(data) 

243 

244 with Image.open(fp) as im: 

245 from . import TiffImagePlugin 

246 

247 if thumbnail_offset is None and isinstance( 

248 im, TiffImagePlugin.TiffImageFile 

249 ): 

250 im._frame_pos = [ifd_offset] 

251 im._seek(0) 

252 im.load() 

253 child_images.append(im) 

254 

255 if offset is not None: 

256 assert self.fp is not None 

257 self.fp.seek(offset) 

258 return child_images 

259 

260 def get_format_mimetype(self) -> str | None: 

261 if self.custom_mimetype: 

262 return self.custom_mimetype 

263 if self.format is not None: 

264 return Image.MIME.get(self.format.upper()) 

265 return None 

266 

267 def __getstate__(self) -> list[Any]: 

268 return super().__getstate__() + [self.filename] 

269 

270 def __setstate__(self, state: list[Any]) -> None: 

271 self.tile = [] 

272 if len(state) > 5: 

273 self.filename = state[5] 

274 super().__setstate__(state) 

275 

276 def verify(self) -> None: 

277 """Check file integrity""" 

278 

279 # raise exception if something's wrong. must be called 

280 # directly after open, and closes file when finished. 

281 if self._exclusive_fp and self.fp: 

282 self.fp.close() 

283 self.fp = None 

284 

285 def load(self) -> Image.core.PixelAccess | None: 

286 """Load image data based on tile list""" 

287 

288 if not self.tile and self._im is None: 

289 msg = "cannot load this image" 

290 raise OSError(msg) 

291 

292 pixel = Image.Image.load(self) 

293 if not self.tile: 

294 return pixel 

295 

296 self.map: mmap.mmap | None = None 

297 use_mmap = self.filename and len(self.tile) == 1 

298 

299 assert self.fp is not None 

300 readonly = 0 

301 

302 # look for read/seek overrides 

303 if hasattr(self, "load_read"): 

304 read = self.load_read 

305 # don't use mmap if there are custom read/seek functions 

306 use_mmap = False 

307 else: 

308 read = self.fp.read 

309 

310 if hasattr(self, "load_seek"): 

311 seek = self.load_seek 

312 use_mmap = False 

313 else: 

314 seek = self.fp.seek 

315 

316 if use_mmap: 

317 # try memory mapping 

318 decoder_name, extents, offset, args = self.tile[0] 

319 if isinstance(args, str): 

320 args = (args, 0, 1) 

321 if ( 

322 decoder_name == "raw" 

323 and isinstance(args, tuple) 

324 and len(args) >= 3 

325 and args[0] == self.mode 

326 and args[0] in Image._MAPMODES 

327 ): 

328 if offset < 0: 

329 msg = "Tile offset cannot be negative" 

330 raise ValueError(msg) 

331 try: 

332 # use mmap, if possible 

333 import mmap 

334 

335 with open(self.filename) as fp: 

336 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

337 if offset + self.size[1] * args[1] > self.map.size(): 

338 msg = "buffer is not large enough" 

339 raise OSError(msg) 

340 self.im = Image.core.map_buffer( 

341 self.map, self.size, decoder_name, offset, args 

342 ) 

343 readonly = 1 

344 # After trashing self.im, 

345 # we might need to reload the palette data. 

346 if self.palette: 

347 self.palette.dirty = 1 

348 except (AttributeError, OSError, ImportError): 

349 self.map = None 

350 

351 self.load_prepare() 

352 err_code = -3 # initialize to unknown error 

353 if not self.map: 

354 # sort tiles in file order 

355 self.tile.sort(key=_tilesort) 

356 

357 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

358 prefix = getattr(self, "tile_prefix", b"") 

359 

360 # Remove consecutive duplicates that only differ by their offset 

361 self.tile = [ 

362 list(tiles)[-1] 

363 for _, tiles in itertools.groupby( 

364 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

365 ) 

366 ] 

367 for i, (decoder_name, extents, offset, args) in enumerate(self.tile): 

368 seek(offset) 

369 decoder = Image._getdecoder( 

370 self.mode, decoder_name, args, self.decoderconfig 

371 ) 

372 try: 

373 decoder.setimage(self.im, extents) 

374 if decoder.pulls_fd: 

375 decoder.setfd(self.fp) 

376 err_code = decoder.decode(b"")[1] 

377 else: 

378 b = prefix 

379 while True: 

380 read_bytes = self.decodermaxblock 

381 if i + 1 < len(self.tile): 

382 next_offset = self.tile[i + 1].offset 

383 if next_offset > offset: 

384 read_bytes = next_offset - offset 

385 try: 

386 s = read(read_bytes) 

387 except (IndexError, struct.error) as e: 

388 # truncated png/gif 

389 if LOAD_TRUNCATED_IMAGES: 

390 break 

391 else: 

392 msg = "image file is truncated" 

393 raise OSError(msg) from e 

394 

395 if not s: # truncated jpeg 

396 if LOAD_TRUNCATED_IMAGES: 

397 break 

398 else: 

399 msg = ( 

400 "image file is truncated " 

401 f"({len(b)} bytes not processed)" 

402 ) 

403 raise OSError(msg) 

404 

405 b = b + s 

406 n, err_code = decoder.decode(b) 

407 if n < 0: 

408 break 

409 b = b[n:] 

410 finally: 

411 # Need to cleanup here to prevent leaks 

412 decoder.cleanup() 

413 

414 self.tile = [] 

415 self.readonly = readonly 

416 

417 self.load_end() 

418 

419 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

420 self.fp.close() 

421 self.fp = None 

422 

423 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

424 # still raised if decoder fails to return anything 

425 raise _get_oserror(err_code, encoder=False) 

426 

427 return Image.Image.load(self) 

428 

429 def load_prepare(self) -> None: 

430 # create image memory if necessary 

431 if self._im is None: 

432 self.im = Image.core.new(self.mode, self.size) 

433 # create palette (optional) 

434 if self.mode == "P": 

435 Image.Image.load(self) 

436 

437 def load_end(self) -> None: 

438 # may be overridden 

439 pass 

440 

441 # may be defined for contained formats 

442 # def load_seek(self, pos: int) -> None: 

443 # pass 

444 

445 # may be defined for blocked formats (e.g. PNG) 

446 # def load_read(self, read_bytes: int) -> bytes: 

447 # pass 

448 

449 def _seek_check(self, frame: int) -> bool: 

450 if ( 

451 frame < self._min_frame 

452 # Only check upper limit on frames if additional seek operations 

453 # are not required to do so 

454 or ( 

455 not (hasattr(self, "_n_frames") and self._n_frames is None) 

456 and frame >= getattr(self, "n_frames") + self._min_frame 

457 ) 

458 ): 

459 msg = "attempt to seek outside sequence" 

460 raise EOFError(msg) 

461 

462 return self.tell() != frame 

463 

464 

465class StubHandler(abc.ABC): 

466 def open(self, im: StubImageFile) -> None: 

467 pass 

468 

469 @abc.abstractmethod 

470 def load(self, im: StubImageFile) -> Image.Image: 

471 pass 

472 

473 

474class StubImageFile(ImageFile, metaclass=abc.ABCMeta): 

475 """ 

476 Base class for stub image loaders. 

477 

478 A stub loader is an image loader that can identify files of a 

479 certain format, but relies on external code to load the file. 

480 """ 

481 

482 @abc.abstractmethod 

483 def _open(self) -> None: 

484 pass 

485 

486 def load(self) -> Image.core.PixelAccess | None: 

487 loader = self._load() 

488 if loader is None: 

489 msg = f"cannot find loader for this {self.format} file" 

490 raise OSError(msg) 

491 image = loader.load(self) 

492 assert image is not None 

493 # become the other object (!) 

494 self.__class__ = image.__class__ # type: ignore[assignment] 

495 self.__dict__ = image.__dict__ 

496 return image.load() 

497 

498 @abc.abstractmethod 

499 def _load(self) -> StubHandler | None: 

500 """(Hook) Find actual image loader.""" 

501 pass 

502 

503 

504class Parser: 

505 """ 

506 Incremental image parser. This class implements the standard 

507 feed/close consumer interface. 

508 """ 

509 

510 incremental = None 

511 image: Image.Image | None = None 

512 data: bytes | None = None 

513 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

514 offset = 0 

515 finished = 0 

516 

517 def reset(self) -> None: 

518 """ 

519 (Consumer) Reset the parser. Note that you can only call this 

520 method immediately after you've created a parser; parser 

521 instances cannot be reused. 

522 """ 

523 assert self.data is None, "cannot reuse parsers" 

524 

525 def feed(self, data: bytes) -> None: 

526 """ 

527 (Consumer) Feed data to the parser. 

528 

529 :param data: A string buffer. 

530 :exception OSError: If the parser failed to parse the image file. 

531 """ 

532 # collect data 

533 

534 if self.finished: 

535 return 

536 

537 if self.data is None: 

538 self.data = data 

539 else: 

540 self.data = self.data + data 

541 

542 # parse what we have 

543 if self.decoder: 

544 if self.offset > 0: 

545 # skip header 

546 skip = min(len(self.data), self.offset) 

547 self.data = self.data[skip:] 

548 self.offset = self.offset - skip 

549 if self.offset > 0 or not self.data: 

550 return 

551 

552 n, e = self.decoder.decode(self.data) 

553 

554 if n < 0: 

555 # end of stream 

556 self.data = None 

557 self.finished = 1 

558 if e < 0: 

559 # decoding error 

560 self.image = None 

561 raise _get_oserror(e, encoder=False) 

562 else: 

563 # end of image 

564 return 

565 self.data = self.data[n:] 

566 

567 elif self.image: 

568 # if we end up here with no decoder, this file cannot 

569 # be incrementally parsed. wait until we've gotten all 

570 # available data 

571 pass 

572 

573 else: 

574 # attempt to open this file 

575 try: 

576 with io.BytesIO(self.data) as fp: 

577 im = Image.open(fp) 

578 except OSError: 

579 pass # not enough data 

580 else: 

581 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

582 if flag or len(im.tile) != 1: 

583 # custom load code, or multiple tiles 

584 self.decode = None 

585 else: 

586 # initialize decoder 

587 im.load_prepare() 

588 d, e, o, a = im.tile[0] 

589 im.tile = [] 

590 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

591 self.decoder.setimage(im.im, e) 

592 

593 # calculate decoder offset 

594 self.offset = o 

595 if self.offset <= len(self.data): 

596 self.data = self.data[self.offset :] 

597 self.offset = 0 

598 

599 self.image = im 

600 

601 def __enter__(self) -> Parser: 

602 return self 

603 

604 def __exit__(self, *args: object) -> None: 

605 self.close() 

606 

607 def close(self) -> Image.Image: 

608 """ 

609 (Consumer) Close the stream. 

610 

611 :returns: An image object. 

612 :exception OSError: If the parser failed to parse the image file either 

613 because it cannot be identified or cannot be 

614 decoded. 

615 """ 

616 # finish decoding 

617 if self.decoder: 

618 # get rid of what's left in the buffers 

619 self.feed(b"") 

620 self.data = self.decoder = None 

621 if not self.finished: 

622 msg = "image was incomplete" 

623 raise OSError(msg) 

624 if not self.image: 

625 msg = "cannot parse this image" 

626 raise OSError(msg) 

627 if self.data: 

628 # incremental parsing not possible; reopen the file 

629 # not that we have all data 

630 with io.BytesIO(self.data) as fp: 

631 try: 

632 self.image = Image.open(fp) 

633 finally: 

634 self.image.load() 

635 return self.image 

636 

637 

638# -------------------------------------------------------------------- 

639 

640 

641def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

642 """Helper to save image based on tile list 

643 

644 :param im: Image object. 

645 :param fp: File object. 

646 :param tile: Tile list. 

647 :param bufsize: Optional buffer size 

648 """ 

649 

650 im.load() 

651 if not hasattr(im, "encoderconfig"): 

652 im.encoderconfig = () 

653 tile.sort(key=_tilesort) 

654 # FIXME: make MAXBLOCK a configuration parameter 

655 # It would be great if we could have the encoder specify what it needs 

656 # But, it would need at least the image size in most cases. RawEncode is 

657 # a tricky case. 

658 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

659 try: 

660 fh = fp.fileno() 

661 fp.flush() 

662 _encode_tile(im, fp, tile, bufsize, fh) 

663 except (AttributeError, io.UnsupportedOperation) as exc: 

664 _encode_tile(im, fp, tile, bufsize, None, exc) 

665 if hasattr(fp, "flush"): 

666 fp.flush() 

667 

668 

669def _encode_tile( 

670 im: Image.Image, 

671 fp: IO[bytes], 

672 tile: list[_Tile], 

673 bufsize: int, 

674 fh: int | None, 

675 exc: BaseException | None = None, 

676) -> None: 

677 for encoder_name, extents, offset, args in tile: 

678 if offset > 0: 

679 fp.seek(offset) 

680 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

681 try: 

682 encoder.setimage(im.im, extents) 

683 if encoder.pushes_fd: 

684 encoder.setfd(fp) 

685 errcode = encoder.encode_to_pyfd()[1] 

686 else: 

687 if exc: 

688 # compress to Python file-compatible object 

689 while True: 

690 errcode, data = encoder.encode(bufsize)[1:] 

691 fp.write(data) 

692 if errcode: 

693 break 

694 else: 

695 # slight speedup: compress to real file object 

696 assert fh is not None 

697 errcode = encoder.encode_to_file(fh, bufsize) 

698 if errcode < 0: 

699 raise _get_oserror(errcode, encoder=True) from exc 

700 finally: 

701 encoder.cleanup() 

702 

703 

704def _safe_read(fp: IO[bytes], size: int) -> bytes: 

705 """ 

706 Reads large blocks in a safe way. Unlike fp.read(n), this function 

707 doesn't trust the user. If the requested size is larger than 

708 SAFEBLOCK, the file is read block by block. 

709 

710 :param fp: File handle. Must implement a <b>read</b> method. 

711 :param size: Number of bytes to read. 

712 :returns: A string containing <i>size</i> bytes of data. 

713 

714 Raises an OSError if the file is truncated and the read cannot be completed 

715 

716 """ 

717 if size <= 0: 

718 return b"" 

719 if size <= SAFEBLOCK: 

720 data = fp.read(size) 

721 if len(data) < size: 

722 msg = "Truncated File Read" 

723 raise OSError(msg) 

724 return data 

725 blocks: list[bytes] = [] 

726 remaining_size = size 

727 while remaining_size > 0: 

728 block = fp.read(min(remaining_size, SAFEBLOCK)) 

729 if not block: 

730 break 

731 blocks.append(block) 

732 remaining_size -= len(block) 

733 if sum(len(block) for block in blocks) < size: 

734 msg = "Truncated File Read" 

735 raise OSError(msg) 

736 return b"".join(blocks) 

737 

738 

739class PyCodecState: 

740 def __init__(self) -> None: 

741 self.xsize = 0 

742 self.ysize = 0 

743 self.xoff = 0 

744 self.yoff = 0 

745 

746 def extents(self) -> tuple[int, int, int, int]: 

747 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

748 

749 

750class PyCodec: 

751 fd: IO[bytes] | None 

752 

753 def __init__(self, mode: str, *args: Any) -> None: 

754 self.im: Image.core.ImagingCore | None = None 

755 self.state = PyCodecState() 

756 self.fd = None 

757 self.mode = mode 

758 self.init(args) 

759 

760 def init(self, args: tuple[Any, ...]) -> None: 

761 """ 

762 Override to perform codec specific initialization 

763 

764 :param args: Tuple of arg items from the tile entry 

765 :returns: None 

766 """ 

767 self.args = args 

768 

769 def cleanup(self) -> None: 

770 """ 

771 Override to perform codec specific cleanup 

772 

773 :returns: None 

774 """ 

775 pass 

776 

777 def setfd(self, fd: IO[bytes]) -> None: 

778 """ 

779 Called from ImageFile to set the Python file-like object 

780 

781 :param fd: A Python file-like object 

782 :returns: None 

783 """ 

784 self.fd = fd 

785 

786 def setimage( 

787 self, 

788 im: Image.core.ImagingCore, 

789 extents: tuple[int, int, int, int] | None = None, 

790 ) -> None: 

791 """ 

792 Called from ImageFile to set the core output image for the codec 

793 

794 :param im: A core image object 

795 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

796 for this tile 

797 :returns: None 

798 """ 

799 

800 # following c code 

801 self.im = im 

802 

803 if extents: 

804 x0, y0, x1, y1 = extents 

805 else: 

806 x0, y0, x1, y1 = (0, 0, 0, 0) 

807 

808 if x0 == 0 and x1 == 0: 

809 self.state.xsize, self.state.ysize = self.im.size 

810 else: 

811 self.state.xoff = x0 

812 self.state.yoff = y0 

813 self.state.xsize = x1 - x0 

814 self.state.ysize = y1 - y0 

815 

816 if self.state.xsize <= 0 or self.state.ysize <= 0: 

817 msg = "Size must be positive" 

818 raise ValueError(msg) 

819 

820 if ( 

821 self.state.xsize + self.state.xoff > self.im.size[0] 

822 or self.state.ysize + self.state.yoff > self.im.size[1] 

823 ): 

824 msg = "Tile cannot extend outside image" 

825 raise ValueError(msg) 

826 

827 

828class PyDecoder(PyCodec): 

829 """ 

830 Python implementation of a format decoder. Override this class and 

831 add the decoding logic in the :meth:`decode` method. 

832 

833 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

834 """ 

835 

836 _pulls_fd = False 

837 

838 @property 

839 def pulls_fd(self) -> bool: 

840 return self._pulls_fd 

841 

842 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

843 """ 

844 Override to perform the decoding process. 

845 

846 :param buffer: A bytes object with the data to be decoded. 

847 :returns: A tuple of ``(bytes consumed, errcode)``. 

848 If finished with decoding return -1 for the bytes consumed. 

849 Err codes are from :data:`.ImageFile.ERRORS`. 

850 """ 

851 msg = "unavailable in base decoder" 

852 raise NotImplementedError(msg) 

853 

854 def set_as_raw( 

855 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

856 ) -> None: 

857 """ 

858 Convenience method to set the internal image from a stream of raw data 

859 

860 :param data: Bytes to be set 

861 :param rawmode: The rawmode to be used for the decoder. 

862 If not specified, it will default to the mode of the image 

863 :param extra: Extra arguments for the decoder. 

864 :returns: None 

865 """ 

866 

867 if not rawmode: 

868 rawmode = self.mode 

869 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

870 assert self.im is not None 

871 d.setimage(self.im, self.state.extents()) 

872 s = d.decode(data) 

873 

874 if s[0] >= 0: 

875 msg = "not enough image data" 

876 raise ValueError(msg) 

877 if s[1] != 0: 

878 msg = "cannot decode image data" 

879 raise ValueError(msg) 

880 

881 

882class PyEncoder(PyCodec): 

883 """ 

884 Python implementation of a format encoder. Override this class and 

885 add the decoding logic in the :meth:`encode` method. 

886 

887 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

888 """ 

889 

890 _pushes_fd = False 

891 

892 @property 

893 def pushes_fd(self) -> bool: 

894 return self._pushes_fd 

895 

896 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

897 """ 

898 Override to perform the encoding process. 

899 

900 :param bufsize: Buffer size. 

901 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

902 If finished with encoding return 1 for the error code. 

903 Err codes are from :data:`.ImageFile.ERRORS`. 

904 """ 

905 msg = "unavailable in base encoder" 

906 raise NotImplementedError(msg) 

907 

908 def encode_to_pyfd(self) -> tuple[int, int]: 

909 """ 

910 If ``pushes_fd`` is ``True``, then this method will be used, 

911 and ``encode()`` will only be called once. 

912 

913 :returns: A tuple of ``(bytes consumed, errcode)``. 

914 Err codes are from :data:`.ImageFile.ERRORS`. 

915 """ 

916 if not self.pushes_fd: 

917 return 0, -8 # bad configuration 

918 bytes_consumed, errcode, data = self.encode(0) 

919 if data: 

920 assert self.fd is not None 

921 self.fd.write(data) 

922 return bytes_consumed, errcode 

923 

924 def encode_to_file(self, fh: int, bufsize: int) -> int: 

925 """ 

926 :param fh: File handle. 

927 :param bufsize: Buffer size. 

928 

929 :returns: If finished successfully, return 0. 

930 Otherwise, return an error code. Err codes are from 

931 :data:`.ImageFile.ERRORS`. 

932 """ 

933 errcode = 0 

934 while errcode == 0: 

935 status, errcode, buf = self.encode(bufsize) 

936 if status > 0: 

937 os.write(fh, buf[status:]) 

938 return errcode