Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

474 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import logging 

35import os 

36import struct 

37from typing import IO, Any, NamedTuple, cast 

38 

39from . import ExifTags, Image 

40from ._util import DeferredError, is_path 

41 

42TYPE_CHECKING = False 

43if TYPE_CHECKING: 

44 from ._typing import StrOrBytesPath 

45 

46logger = logging.getLogger(__name__) 

47 

48MAXBLOCK = 65536 

49""" 

50By default, Pillow processes image data in blocks. This helps to prevent excessive use 

51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``. 

52 

53When reading an image, this is the number of bytes to read at once. 

54 

55When writing an image, this is the number of bytes to write at once. 

56If the image width times 4 is greater, then that will be used instead. 

57Plugins may also set a greater number. 

58 

59User code may set this to another number. 

60""" 

61 

62SAFEBLOCK = 1024 * 1024 

63 

64LOAD_TRUNCATED_IMAGES = False 

65"""Whether or not to load truncated image files. User code may change this.""" 

66 

67ERRORS = { 

68 -1: "image buffer overrun error", 

69 -2: "decoding error", 

70 -3: "unknown error", 

71 -8: "bad configuration", 

72 -9: "out of memory error", 

73} 

74""" 

75Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

77:meth:`.PyEncoder.encode_to_file`. 

78""" 

79 

80 

81# 

82# -------------------------------------------------------------------- 

83# Helpers 

84 

85 

86def _get_oserror(error: int, *, encoder: bool) -> OSError: 

87 try: 

88 msg = Image.core.getcodecstatus(error) 

89 except AttributeError: 

90 msg = ERRORS.get(error) 

91 if not msg: 

92 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

93 msg += f" when {'writing' if encoder else 'reading'} image file" 

94 return OSError(msg) 

95 

96 

97def _tilesort(t: _Tile) -> int: 

98 # sort on offset 

99 return t[2] 

100 

101 

102class _Tile(NamedTuple): 

103 codec_name: str 

104 extents: tuple[int, int, int, int] | None 

105 offset: int = 0 

106 args: tuple[Any, ...] | str | None = None 

107 

108 

109# 

110# -------------------------------------------------------------------- 

111# ImageFile base class 

112 

113 

114class ImageFile(Image.Image): 

115 """Base class for image file format handlers.""" 

116 

117 def __init__( 

118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

119 ) -> None: 

120 super().__init__() 

121 

122 self._min_frame = 0 

123 

124 self.custom_mimetype: str | None = None 

125 

126 self.tile: list[_Tile] = [] 

127 """ A list of tile descriptors """ 

128 

129 self.readonly = 1 # until we know better 

130 

131 self.decoderconfig: tuple[Any, ...] = () 

132 self.decodermaxblock = MAXBLOCK 

133 

134 if is_path(fp): 

135 # filename 

136 self.fp = open(fp, "rb") 

137 self.filename = os.fspath(fp) 

138 self._exclusive_fp = True 

139 else: 

140 # stream 

141 self.fp = cast(IO[bytes], fp) 

142 self.filename = filename if filename is not None else "" 

143 # can be overridden 

144 self._exclusive_fp = False 

145 

146 try: 

147 try: 

148 self._open() 

149 except ( 

150 IndexError, # end of data 

151 TypeError, # end of data (ord) 

152 KeyError, # unsupported mode 

153 EOFError, # got header but not the first frame 

154 struct.error, 

155 ) as v: 

156 raise SyntaxError(v) from v 

157 

158 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

159 msg = "not identified by this driver" 

160 raise SyntaxError(msg) 

161 except BaseException: 

162 # close the file only if we have opened it this constructor 

163 if self._exclusive_fp: 

164 self.fp.close() 

165 raise 

166 

167 def _open(self) -> None: 

168 pass 

169 

170 def _close_fp(self): 

171 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError): 

172 if self._fp != self.fp: 

173 self._fp.close() 

174 self._fp = DeferredError(ValueError("Operation on closed image")) 

175 if self.fp: 

176 self.fp.close() 

177 

178 def close(self) -> None: 

179 """ 

180 Closes the file pointer, if possible. 

181 

182 This operation will destroy the image core and release its memory. 

183 The image data will be unusable afterward. 

184 

185 This function is required to close images that have multiple frames or 

186 have not had their file read and closed by the 

187 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for 

188 more information. 

189 """ 

190 try: 

191 self._close_fp() 

192 self.fp = None 

193 except Exception as msg: 

194 logger.debug("Error closing: %s", msg) 

195 

196 super().close() 

197 

198 def get_child_images(self) -> list[ImageFile]: 

199 child_images = [] 

200 exif = self.getexif() 

201 ifds = [] 

202 if ExifTags.Base.SubIFDs in exif: 

203 subifd_offsets = exif[ExifTags.Base.SubIFDs] 

204 if subifd_offsets: 

205 if not isinstance(subifd_offsets, tuple): 

206 subifd_offsets = (subifd_offsets,) 

207 for subifd_offset in subifd_offsets: 

208 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset)) 

209 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1) 

210 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset): 

211 assert exif._info is not None 

212 ifds.append((ifd1, exif._info.next)) 

213 

214 offset = None 

215 for ifd, ifd_offset in ifds: 

216 assert self.fp is not None 

217 current_offset = self.fp.tell() 

218 if offset is None: 

219 offset = current_offset 

220 

221 fp = self.fp 

222 if ifd is not None: 

223 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset) 

224 if thumbnail_offset is not None: 

225 thumbnail_offset += getattr(self, "_exif_offset", 0) 

226 self.fp.seek(thumbnail_offset) 

227 

228 length = ifd.get(ExifTags.Base.JpegIFByteCount) 

229 assert isinstance(length, int) 

230 data = self.fp.read(length) 

231 fp = io.BytesIO(data) 

232 

233 with Image.open(fp) as im: 

234 from . import TiffImagePlugin 

235 

236 if thumbnail_offset is None and isinstance( 

237 im, TiffImagePlugin.TiffImageFile 

238 ): 

239 im._frame_pos = [ifd_offset] 

240 im._seek(0) 

241 im.load() 

242 child_images.append(im) 

243 

244 if offset is not None: 

245 assert self.fp is not None 

246 self.fp.seek(offset) 

247 return child_images 

248 

249 def get_format_mimetype(self) -> str | None: 

250 if self.custom_mimetype: 

251 return self.custom_mimetype 

252 if self.format is not None: 

253 return Image.MIME.get(self.format.upper()) 

254 return None 

255 

256 def __getstate__(self) -> list[Any]: 

257 return super().__getstate__() + [self.filename] 

258 

259 def __setstate__(self, state: list[Any]) -> None: 

260 self.tile = [] 

261 if len(state) > 5: 

262 self.filename = state[5] 

263 super().__setstate__(state) 

264 

265 def verify(self) -> None: 

266 """Check file integrity""" 

267 

268 # raise exception if something's wrong. must be called 

269 # directly after open, and closes file when finished. 

270 if self._exclusive_fp: 

271 self.fp.close() 

272 self.fp = None 

273 

274 def load(self) -> Image.core.PixelAccess | None: 

275 """Load image data based on tile list""" 

276 

277 if not self.tile and self._im is None: 

278 msg = "cannot load this image" 

279 raise OSError(msg) 

280 

281 pixel = Image.Image.load(self) 

282 if not self.tile: 

283 return pixel 

284 

285 self.map: mmap.mmap | None = None 

286 use_mmap = self.filename and len(self.tile) == 1 

287 

288 readonly = 0 

289 

290 # look for read/seek overrides 

291 if hasattr(self, "load_read"): 

292 read = self.load_read 

293 # don't use mmap if there are custom read/seek functions 

294 use_mmap = False 

295 else: 

296 read = self.fp.read 

297 

298 if hasattr(self, "load_seek"): 

299 seek = self.load_seek 

300 use_mmap = False 

301 else: 

302 seek = self.fp.seek 

303 

304 if use_mmap: 

305 # try memory mapping 

306 decoder_name, extents, offset, args = self.tile[0] 

307 if isinstance(args, str): 

308 args = (args, 0, 1) 

309 if ( 

310 decoder_name == "raw" 

311 and isinstance(args, tuple) 

312 and len(args) >= 3 

313 and args[0] == self.mode 

314 and args[0] in Image._MAPMODES 

315 ): 

316 if offset < 0: 

317 msg = "Tile offset cannot be negative" 

318 raise ValueError(msg) 

319 try: 

320 # use mmap, if possible 

321 import mmap 

322 

323 with open(self.filename) as fp: 

324 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

325 if offset + self.size[1] * args[1] > self.map.size(): 

326 msg = "buffer is not large enough" 

327 raise OSError(msg) 

328 self.im = Image.core.map_buffer( 

329 self.map, self.size, decoder_name, offset, args 

330 ) 

331 readonly = 1 

332 # After trashing self.im, 

333 # we might need to reload the palette data. 

334 if self.palette: 

335 self.palette.dirty = 1 

336 except (AttributeError, OSError, ImportError): 

337 self.map = None 

338 

339 self.load_prepare() 

340 err_code = -3 # initialize to unknown error 

341 if not self.map: 

342 # sort tiles in file order 

343 self.tile.sort(key=_tilesort) 

344 

345 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

346 prefix = getattr(self, "tile_prefix", b"") 

347 

348 # Remove consecutive duplicates that only differ by their offset 

349 self.tile = [ 

350 list(tiles)[-1] 

351 for _, tiles in itertools.groupby( 

352 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

353 ) 

354 ] 

355 for i, (decoder_name, extents, offset, args) in enumerate(self.tile): 

356 seek(offset) 

357 decoder = Image._getdecoder( 

358 self.mode, decoder_name, args, self.decoderconfig 

359 ) 

360 try: 

361 decoder.setimage(self.im, extents) 

362 if decoder.pulls_fd: 

363 decoder.setfd(self.fp) 

364 err_code = decoder.decode(b"")[1] 

365 else: 

366 b = prefix 

367 while True: 

368 read_bytes = self.decodermaxblock 

369 if i + 1 < len(self.tile): 

370 next_offset = self.tile[i + 1].offset 

371 if next_offset > offset: 

372 read_bytes = next_offset - offset 

373 try: 

374 s = read(read_bytes) 

375 except (IndexError, struct.error) as e: 

376 # truncated png/gif 

377 if LOAD_TRUNCATED_IMAGES: 

378 break 

379 else: 

380 msg = "image file is truncated" 

381 raise OSError(msg) from e 

382 

383 if not s: # truncated jpeg 

384 if LOAD_TRUNCATED_IMAGES: 

385 break 

386 else: 

387 msg = ( 

388 "image file is truncated " 

389 f"({len(b)} bytes not processed)" 

390 ) 

391 raise OSError(msg) 

392 

393 b = b + s 

394 n, err_code = decoder.decode(b) 

395 if n < 0: 

396 break 

397 b = b[n:] 

398 finally: 

399 # Need to cleanup here to prevent leaks 

400 decoder.cleanup() 

401 

402 self.tile = [] 

403 self.readonly = readonly 

404 

405 self.load_end() 

406 

407 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

408 self.fp.close() 

409 self.fp = None 

410 

411 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

412 # still raised if decoder fails to return anything 

413 raise _get_oserror(err_code, encoder=False) 

414 

415 return Image.Image.load(self) 

416 

417 def load_prepare(self) -> None: 

418 # create image memory if necessary 

419 if self._im is None: 

420 self.im = Image.core.new(self.mode, self.size) 

421 # create palette (optional) 

422 if self.mode == "P": 

423 Image.Image.load(self) 

424 

425 def load_end(self) -> None: 

426 # may be overridden 

427 pass 

428 

429 # may be defined for contained formats 

430 # def load_seek(self, pos: int) -> None: 

431 # pass 

432 

433 # may be defined for blocked formats (e.g. PNG) 

434 # def load_read(self, read_bytes: int) -> bytes: 

435 # pass 

436 

437 def _seek_check(self, frame: int) -> bool: 

438 if ( 

439 frame < self._min_frame 

440 # Only check upper limit on frames if additional seek operations 

441 # are not required to do so 

442 or ( 

443 not (hasattr(self, "_n_frames") and self._n_frames is None) 

444 and frame >= getattr(self, "n_frames") + self._min_frame 

445 ) 

446 ): 

447 msg = "attempt to seek outside sequence" 

448 raise EOFError(msg) 

449 

450 return self.tell() != frame 

451 

452 

453class StubHandler(abc.ABC): 

454 def open(self, im: StubImageFile) -> None: 

455 pass 

456 

457 @abc.abstractmethod 

458 def load(self, im: StubImageFile) -> Image.Image: 

459 pass 

460 

461 

462class StubImageFile(ImageFile, metaclass=abc.ABCMeta): 

463 """ 

464 Base class for stub image loaders. 

465 

466 A stub loader is an image loader that can identify files of a 

467 certain format, but relies on external code to load the file. 

468 """ 

469 

470 @abc.abstractmethod 

471 def _open(self) -> None: 

472 pass 

473 

474 def load(self) -> Image.core.PixelAccess | None: 

475 loader = self._load() 

476 if loader is None: 

477 msg = f"cannot find loader for this {self.format} file" 

478 raise OSError(msg) 

479 image = loader.load(self) 

480 assert image is not None 

481 # become the other object (!) 

482 self.__class__ = image.__class__ # type: ignore[assignment] 

483 self.__dict__ = image.__dict__ 

484 return image.load() 

485 

486 @abc.abstractmethod 

487 def _load(self) -> StubHandler | None: 

488 """(Hook) Find actual image loader.""" 

489 pass 

490 

491 

492class Parser: 

493 """ 

494 Incremental image parser. This class implements the standard 

495 feed/close consumer interface. 

496 """ 

497 

498 incremental = None 

499 image: Image.Image | None = None 

500 data: bytes | None = None 

501 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

502 offset = 0 

503 finished = 0 

504 

505 def reset(self) -> None: 

506 """ 

507 (Consumer) Reset the parser. Note that you can only call this 

508 method immediately after you've created a parser; parser 

509 instances cannot be reused. 

510 """ 

511 assert self.data is None, "cannot reuse parsers" 

512 

513 def feed(self, data: bytes) -> None: 

514 """ 

515 (Consumer) Feed data to the parser. 

516 

517 :param data: A string buffer. 

518 :exception OSError: If the parser failed to parse the image file. 

519 """ 

520 # collect data 

521 

522 if self.finished: 

523 return 

524 

525 if self.data is None: 

526 self.data = data 

527 else: 

528 self.data = self.data + data 

529 

530 # parse what we have 

531 if self.decoder: 

532 if self.offset > 0: 

533 # skip header 

534 skip = min(len(self.data), self.offset) 

535 self.data = self.data[skip:] 

536 self.offset = self.offset - skip 

537 if self.offset > 0 or not self.data: 

538 return 

539 

540 n, e = self.decoder.decode(self.data) 

541 

542 if n < 0: 

543 # end of stream 

544 self.data = None 

545 self.finished = 1 

546 if e < 0: 

547 # decoding error 

548 self.image = None 

549 raise _get_oserror(e, encoder=False) 

550 else: 

551 # end of image 

552 return 

553 self.data = self.data[n:] 

554 

555 elif self.image: 

556 # if we end up here with no decoder, this file cannot 

557 # be incrementally parsed. wait until we've gotten all 

558 # available data 

559 pass 

560 

561 else: 

562 # attempt to open this file 

563 try: 

564 with io.BytesIO(self.data) as fp: 

565 im = Image.open(fp) 

566 except OSError: 

567 pass # not enough data 

568 else: 

569 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

570 if flag or len(im.tile) != 1: 

571 # custom load code, or multiple tiles 

572 self.decode = None 

573 else: 

574 # initialize decoder 

575 im.load_prepare() 

576 d, e, o, a = im.tile[0] 

577 im.tile = [] 

578 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

579 self.decoder.setimage(im.im, e) 

580 

581 # calculate decoder offset 

582 self.offset = o 

583 if self.offset <= len(self.data): 

584 self.data = self.data[self.offset :] 

585 self.offset = 0 

586 

587 self.image = im 

588 

589 def __enter__(self) -> Parser: 

590 return self 

591 

592 def __exit__(self, *args: object) -> None: 

593 self.close() 

594 

595 def close(self) -> Image.Image: 

596 """ 

597 (Consumer) Close the stream. 

598 

599 :returns: An image object. 

600 :exception OSError: If the parser failed to parse the image file either 

601 because it cannot be identified or cannot be 

602 decoded. 

603 """ 

604 # finish decoding 

605 if self.decoder: 

606 # get rid of what's left in the buffers 

607 self.feed(b"") 

608 self.data = self.decoder = None 

609 if not self.finished: 

610 msg = "image was incomplete" 

611 raise OSError(msg) 

612 if not self.image: 

613 msg = "cannot parse this image" 

614 raise OSError(msg) 

615 if self.data: 

616 # incremental parsing not possible; reopen the file 

617 # not that we have all data 

618 with io.BytesIO(self.data) as fp: 

619 try: 

620 self.image = Image.open(fp) 

621 finally: 

622 self.image.load() 

623 return self.image 

624 

625 

626# -------------------------------------------------------------------- 

627 

628 

629def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

630 """Helper to save image based on tile list 

631 

632 :param im: Image object. 

633 :param fp: File object. 

634 :param tile: Tile list. 

635 :param bufsize: Optional buffer size 

636 """ 

637 

638 im.load() 

639 if not hasattr(im, "encoderconfig"): 

640 im.encoderconfig = () 

641 tile.sort(key=_tilesort) 

642 # FIXME: make MAXBLOCK a configuration parameter 

643 # It would be great if we could have the encoder specify what it needs 

644 # But, it would need at least the image size in most cases. RawEncode is 

645 # a tricky case. 

646 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

647 try: 

648 fh = fp.fileno() 

649 fp.flush() 

650 _encode_tile(im, fp, tile, bufsize, fh) 

651 except (AttributeError, io.UnsupportedOperation) as exc: 

652 _encode_tile(im, fp, tile, bufsize, None, exc) 

653 if hasattr(fp, "flush"): 

654 fp.flush() 

655 

656 

657def _encode_tile( 

658 im: Image.Image, 

659 fp: IO[bytes], 

660 tile: list[_Tile], 

661 bufsize: int, 

662 fh: int | None, 

663 exc: BaseException | None = None, 

664) -> None: 

665 for encoder_name, extents, offset, args in tile: 

666 if offset > 0: 

667 fp.seek(offset) 

668 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

669 try: 

670 encoder.setimage(im.im, extents) 

671 if encoder.pushes_fd: 

672 encoder.setfd(fp) 

673 errcode = encoder.encode_to_pyfd()[1] 

674 else: 

675 if exc: 

676 # compress to Python file-compatible object 

677 while True: 

678 errcode, data = encoder.encode(bufsize)[1:] 

679 fp.write(data) 

680 if errcode: 

681 break 

682 else: 

683 # slight speedup: compress to real file object 

684 assert fh is not None 

685 errcode = encoder.encode_to_file(fh, bufsize) 

686 if errcode < 0: 

687 raise _get_oserror(errcode, encoder=True) from exc 

688 finally: 

689 encoder.cleanup() 

690 

691 

692def _safe_read(fp: IO[bytes], size: int) -> bytes: 

693 """ 

694 Reads large blocks in a safe way. Unlike fp.read(n), this function 

695 doesn't trust the user. If the requested size is larger than 

696 SAFEBLOCK, the file is read block by block. 

697 

698 :param fp: File handle. Must implement a <b>read</b> method. 

699 :param size: Number of bytes to read. 

700 :returns: A string containing <i>size</i> bytes of data. 

701 

702 Raises an OSError if the file is truncated and the read cannot be completed 

703 

704 """ 

705 if size <= 0: 

706 return b"" 

707 if size <= SAFEBLOCK: 

708 data = fp.read(size) 

709 if len(data) < size: 

710 msg = "Truncated File Read" 

711 raise OSError(msg) 

712 return data 

713 blocks: list[bytes] = [] 

714 remaining_size = size 

715 while remaining_size > 0: 

716 block = fp.read(min(remaining_size, SAFEBLOCK)) 

717 if not block: 

718 break 

719 blocks.append(block) 

720 remaining_size -= len(block) 

721 if sum(len(block) for block in blocks) < size: 

722 msg = "Truncated File Read" 

723 raise OSError(msg) 

724 return b"".join(blocks) 

725 

726 

727class PyCodecState: 

728 def __init__(self) -> None: 

729 self.xsize = 0 

730 self.ysize = 0 

731 self.xoff = 0 

732 self.yoff = 0 

733 

734 def extents(self) -> tuple[int, int, int, int]: 

735 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

736 

737 

738class PyCodec: 

739 fd: IO[bytes] | None 

740 

741 def __init__(self, mode: str, *args: Any) -> None: 

742 self.im: Image.core.ImagingCore | None = None 

743 self.state = PyCodecState() 

744 self.fd = None 

745 self.mode = mode 

746 self.init(args) 

747 

748 def init(self, args: tuple[Any, ...]) -> None: 

749 """ 

750 Override to perform codec specific initialization 

751 

752 :param args: Tuple of arg items from the tile entry 

753 :returns: None 

754 """ 

755 self.args = args 

756 

757 def cleanup(self) -> None: 

758 """ 

759 Override to perform codec specific cleanup 

760 

761 :returns: None 

762 """ 

763 pass 

764 

765 def setfd(self, fd: IO[bytes]) -> None: 

766 """ 

767 Called from ImageFile to set the Python file-like object 

768 

769 :param fd: A Python file-like object 

770 :returns: None 

771 """ 

772 self.fd = fd 

773 

774 def setimage( 

775 self, 

776 im: Image.core.ImagingCore, 

777 extents: tuple[int, int, int, int] | None = None, 

778 ) -> None: 

779 """ 

780 Called from ImageFile to set the core output image for the codec 

781 

782 :param im: A core image object 

783 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

784 for this tile 

785 :returns: None 

786 """ 

787 

788 # following c code 

789 self.im = im 

790 

791 if extents: 

792 (x0, y0, x1, y1) = extents 

793 else: 

794 (x0, y0, x1, y1) = (0, 0, 0, 0) 

795 

796 if x0 == 0 and x1 == 0: 

797 self.state.xsize, self.state.ysize = self.im.size 

798 else: 

799 self.state.xoff = x0 

800 self.state.yoff = y0 

801 self.state.xsize = x1 - x0 

802 self.state.ysize = y1 - y0 

803 

804 if self.state.xsize <= 0 or self.state.ysize <= 0: 

805 msg = "Size cannot be negative" 

806 raise ValueError(msg) 

807 

808 if ( 

809 self.state.xsize + self.state.xoff > self.im.size[0] 

810 or self.state.ysize + self.state.yoff > self.im.size[1] 

811 ): 

812 msg = "Tile cannot extend outside image" 

813 raise ValueError(msg) 

814 

815 

816class PyDecoder(PyCodec): 

817 """ 

818 Python implementation of a format decoder. Override this class and 

819 add the decoding logic in the :meth:`decode` method. 

820 

821 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

822 """ 

823 

824 _pulls_fd = False 

825 

826 @property 

827 def pulls_fd(self) -> bool: 

828 return self._pulls_fd 

829 

830 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

831 """ 

832 Override to perform the decoding process. 

833 

834 :param buffer: A bytes object with the data to be decoded. 

835 :returns: A tuple of ``(bytes consumed, errcode)``. 

836 If finished with decoding return -1 for the bytes consumed. 

837 Err codes are from :data:`.ImageFile.ERRORS`. 

838 """ 

839 msg = "unavailable in base decoder" 

840 raise NotImplementedError(msg) 

841 

842 def set_as_raw( 

843 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

844 ) -> None: 

845 """ 

846 Convenience method to set the internal image from a stream of raw data 

847 

848 :param data: Bytes to be set 

849 :param rawmode: The rawmode to be used for the decoder. 

850 If not specified, it will default to the mode of the image 

851 :param extra: Extra arguments for the decoder. 

852 :returns: None 

853 """ 

854 

855 if not rawmode: 

856 rawmode = self.mode 

857 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

858 assert self.im is not None 

859 d.setimage(self.im, self.state.extents()) 

860 s = d.decode(data) 

861 

862 if s[0] >= 0: 

863 msg = "not enough image data" 

864 raise ValueError(msg) 

865 if s[1] != 0: 

866 msg = "cannot decode image data" 

867 raise ValueError(msg) 

868 

869 

870class PyEncoder(PyCodec): 

871 """ 

872 Python implementation of a format encoder. Override this class and 

873 add the decoding logic in the :meth:`encode` method. 

874 

875 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

876 """ 

877 

878 _pushes_fd = False 

879 

880 @property 

881 def pushes_fd(self) -> bool: 

882 return self._pushes_fd 

883 

884 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

885 """ 

886 Override to perform the encoding process. 

887 

888 :param bufsize: Buffer size. 

889 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

890 If finished with encoding return 1 for the error code. 

891 Err codes are from :data:`.ImageFile.ERRORS`. 

892 """ 

893 msg = "unavailable in base encoder" 

894 raise NotImplementedError(msg) 

895 

896 def encode_to_pyfd(self) -> tuple[int, int]: 

897 """ 

898 If ``pushes_fd`` is ``True``, then this method will be used, 

899 and ``encode()`` will only be called once. 

900 

901 :returns: A tuple of ``(bytes consumed, errcode)``. 

902 Err codes are from :data:`.ImageFile.ERRORS`. 

903 """ 

904 if not self.pushes_fd: 

905 return 0, -8 # bad configuration 

906 bytes_consumed, errcode, data = self.encode(0) 

907 if data: 

908 assert self.fd is not None 

909 self.fd.write(data) 

910 return bytes_consumed, errcode 

911 

912 def encode_to_file(self, fh: int, bufsize: int) -> int: 

913 """ 

914 :param fh: File handle. 

915 :param bufsize: Buffer size. 

916 

917 :returns: If finished successfully, return 0. 

918 Otherwise, return an error code. Err codes are from 

919 :data:`.ImageFile.ERRORS`. 

920 """ 

921 errcode = 0 

922 while errcode == 0: 

923 status, errcode, buf = self.encode(bufsize) 

924 if status > 0: 

925 os.write(fh, buf[status:]) 

926 return errcode