Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

482 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import logging 

35import os 

36import struct 

37from typing import IO, Any, NamedTuple, cast 

38 

39from . import ExifTags, Image 

40from ._util import DeferredError, is_path 

41 

42TYPE_CHECKING = False 

43if TYPE_CHECKING: 

44 from ._typing import StrOrBytesPath 

45 

46logger = logging.getLogger(__name__) 

47 

48MAXBLOCK = 65536 

49""" 

50By default, Pillow processes image data in blocks. This helps to prevent excessive use 

51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``. 

52 

53When reading an image, this is the number of bytes to read at once. 

54 

55When writing an image, this is the number of bytes to write at once. 

56If the image width times 4 is greater, then that will be used instead. 

57Plugins may also set a greater number. 

58 

59User code may set this to another number. 

60""" 

61 

62SAFEBLOCK = 1024 * 1024 

63 

64LOAD_TRUNCATED_IMAGES = False 

65"""Whether or not to load truncated image files. User code may change this.""" 

66 

67ERRORS = { 

68 -1: "image buffer overrun error", 

69 -2: "decoding error", 

70 -3: "unknown error", 

71 -8: "bad configuration", 

72 -9: "out of memory error", 

73} 

74""" 

75Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

77:meth:`.PyEncoder.encode_to_file`. 

78""" 

79 

80 

81# 

82# -------------------------------------------------------------------- 

83# Helpers 

84 

85 

86def _get_oserror(error: int, *, encoder: bool) -> OSError: 

87 try: 

88 msg = Image.core.getcodecstatus(error) 

89 except AttributeError: 

90 msg = ERRORS.get(error) 

91 if not msg: 

92 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

93 msg += f" when {'writing' if encoder else 'reading'} image file" 

94 return OSError(msg) 

95 

96 

97def _tilesort(t: _Tile) -> int: 

98 # sort on offset 

99 return t[2] 

100 

101 

102class _Tile(NamedTuple): 

103 codec_name: str 

104 extents: tuple[int, int, int, int] | None 

105 offset: int = 0 

106 args: tuple[Any, ...] | str | None = None 

107 

108 

109# 

110# -------------------------------------------------------------------- 

111# ImageFile base class 

112 

113 

114class ImageFile(Image.Image): 

115 """Base class for image file format handlers.""" 

116 

117 def __init__( 

118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

119 ) -> None: 

120 super().__init__() 

121 

122 self._min_frame = 0 

123 

124 self.custom_mimetype: str | None = None 

125 

126 self.tile: list[_Tile] = [] 

127 """ A list of tile descriptors """ 

128 

129 self.readonly = 1 # until we know better 

130 

131 self.decoderconfig: tuple[Any, ...] = () 

132 self.decodermaxblock = MAXBLOCK 

133 

134 self.fp: IO[bytes] | None 

135 self._fp: IO[bytes] | DeferredError 

136 if is_path(fp): 

137 # filename 

138 self.fp = open(fp, "rb") 

139 self.filename = os.fspath(fp) 

140 self._exclusive_fp = True 

141 else: 

142 # stream 

143 self.fp = cast(IO[bytes], fp) 

144 self.filename = filename if filename is not None else "" 

145 # can be overridden 

146 self._exclusive_fp = False 

147 

148 try: 

149 try: 

150 self._open() 

151 except ( 

152 IndexError, # end of data 

153 TypeError, # end of data (ord) 

154 KeyError, # unsupported mode 

155 EOFError, # got header but not the first frame 

156 struct.error, 

157 ) as v: 

158 raise SyntaxError(v) from v 

159 

160 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

161 msg = "not identified by this driver" 

162 raise SyntaxError(msg) 

163 except BaseException: 

164 # close the file only if we have opened it this constructor 

165 if self._exclusive_fp: 

166 self.fp.close() 

167 raise 

168 

169 def _open(self) -> None: 

170 pass 

171 

172 # Context manager support 

173 def __enter__(self) -> ImageFile: 

174 return self 

175 

176 def _close_fp(self) -> None: 

177 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError): 

178 if self._fp != self.fp: 

179 self._fp.close() 

180 self._fp = DeferredError(ValueError("Operation on closed image")) 

181 if self.fp: 

182 self.fp.close() 

183 

184 def __exit__(self, *args: object) -> None: 

185 if getattr(self, "_exclusive_fp", False): 

186 self._close_fp() 

187 self.fp = None 

188 

189 def close(self) -> None: 

190 """ 

191 Closes the file pointer, if possible. 

192 

193 This operation will destroy the image core and release its memory. 

194 The image data will be unusable afterward. 

195 

196 This function is required to close images that have multiple frames or 

197 have not had their file read and closed by the 

198 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for 

199 more information. 

200 """ 

201 try: 

202 self._close_fp() 

203 self.fp = None 

204 except Exception as msg: 

205 logger.debug("Error closing: %s", msg) 

206 

207 super().close() 

208 

209 def get_child_images(self) -> list[ImageFile]: 

210 child_images = [] 

211 exif = self.getexif() 

212 ifds = [] 

213 if ExifTags.Base.SubIFDs in exif: 

214 subifd_offsets = exif[ExifTags.Base.SubIFDs] 

215 if subifd_offsets: 

216 if not isinstance(subifd_offsets, tuple): 

217 subifd_offsets = (subifd_offsets,) 

218 for subifd_offset in subifd_offsets: 

219 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset)) 

220 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1) 

221 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset): 

222 assert exif._info is not None 

223 ifds.append((ifd1, exif._info.next)) 

224 

225 offset = None 

226 for ifd, ifd_offset in ifds: 

227 assert self.fp is not None 

228 current_offset = self.fp.tell() 

229 if offset is None: 

230 offset = current_offset 

231 

232 fp = self.fp 

233 if ifd is not None: 

234 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset) 

235 if thumbnail_offset is not None: 

236 thumbnail_offset += getattr(self, "_exif_offset", 0) 

237 self.fp.seek(thumbnail_offset) 

238 

239 length = ifd.get(ExifTags.Base.JpegIFByteCount) 

240 assert isinstance(length, int) 

241 data = self.fp.read(length) 

242 fp = io.BytesIO(data) 

243 

244 with Image.open(fp) as im: 

245 from . import TiffImagePlugin 

246 

247 if thumbnail_offset is None and isinstance( 

248 im, TiffImagePlugin.TiffImageFile 

249 ): 

250 im._frame_pos = [ifd_offset] 

251 im._seek(0) 

252 im.load() 

253 child_images.append(im) 

254 

255 if offset is not None: 

256 assert self.fp is not None 

257 self.fp.seek(offset) 

258 return child_images 

259 

260 def get_format_mimetype(self) -> str | None: 

261 if self.custom_mimetype: 

262 return self.custom_mimetype 

263 if self.format is not None: 

264 return Image.MIME.get(self.format.upper()) 

265 return None 

266 

267 def __getstate__(self) -> list[Any]: 

268 return super().__getstate__() + [self.filename] 

269 

270 def __setstate__(self, state: list[Any]) -> None: 

271 self.tile = [] 

272 if len(state) > 5: 

273 self.filename = state[5] 

274 super().__setstate__(state) 

275 

276 def verify(self) -> None: 

277 """Check file integrity""" 

278 

279 # raise exception if something's wrong. must be called 

280 # directly after open, and closes file when finished. 

281 if self._exclusive_fp and self.fp: 

282 self.fp.close() 

283 self.fp = None 

284 

285 def load(self) -> Image.core.PixelAccess | None: 

286 """Load image data based on tile list""" 

287 

288 if not self.tile and self._im is None: 

289 msg = "cannot load this image" 

290 raise OSError(msg) 

291 

292 pixel = Image.Image.load(self) 

293 if not self.tile: 

294 return pixel 

295 

296 self.map: mmap.mmap | None = None 

297 use_mmap = self.filename and len(self.tile) == 1 

298 

299 assert self.fp is not None 

300 readonly = 0 

301 

302 # look for read/seek overrides 

303 if hasattr(self, "load_read"): 

304 read = self.load_read 

305 # don't use mmap if there are custom read/seek functions 

306 use_mmap = False 

307 else: 

308 read = self.fp.read 

309 

310 if hasattr(self, "load_seek"): 

311 seek = self.load_seek 

312 use_mmap = False 

313 else: 

314 seek = self.fp.seek 

315 

316 if use_mmap: 

317 # try memory mapping 

318 decoder_name, extents, offset, args = self.tile[0] 

319 if isinstance(args, str): 

320 args = (args, 0, 1) 

321 if ( 

322 decoder_name == "raw" 

323 and isinstance(args, tuple) 

324 and len(args) >= 3 

325 and args[0] == self.mode 

326 and args[0] in Image._MAPMODES 

327 ): 

328 if offset < 0: 

329 msg = "Tile offset cannot be negative" 

330 raise ValueError(msg) 

331 try: 

332 # use mmap, if possible 

333 import mmap 

334 

335 with open(self.filename) as fp: 

336 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

337 if offset + self.size[1] * args[1] > self.map.size(): 

338 msg = "buffer is not large enough" 

339 raise OSError(msg) 

340 self.im = Image.core.map_buffer( 

341 self.map, self.size, decoder_name, offset, args 

342 ) 

343 readonly = 1 

344 # After trashing self.im, 

345 # we might need to reload the palette data. 

346 if self.palette: 

347 self.palette.dirty = 1 

348 except (AttributeError, OSError, ImportError): 

349 self.map = None 

350 

351 self.load_prepare() 

352 err_code = -3 # initialize to unknown error 

353 if not self.map: 

354 # sort tiles in file order 

355 self.tile.sort(key=_tilesort) 

356 

357 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

358 prefix = getattr(self, "tile_prefix", b"") 

359 

360 # Remove consecutive duplicates that only differ by their offset 

361 self.tile = [ 

362 list(tiles)[-1] 

363 for _, tiles in itertools.groupby( 

364 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

365 ) 

366 ] 

367 for i, (decoder_name, extents, offset, args) in enumerate(self.tile): 

368 seek(offset) 

369 decoder = Image._getdecoder( 

370 self.mode, decoder_name, args, self.decoderconfig 

371 ) 

372 try: 

373 decoder.setimage(self.im, extents) 

374 if decoder.pulls_fd: 

375 decoder.setfd(self.fp) 

376 err_code = decoder.decode(b"")[1] 

377 else: 

378 b = prefix 

379 while True: 

380 read_bytes = self.decodermaxblock 

381 if i + 1 < len(self.tile): 

382 next_offset = self.tile[i + 1].offset 

383 if next_offset > offset: 

384 read_bytes = next_offset - offset 

385 try: 

386 s = read(read_bytes) 

387 except (IndexError, struct.error) as e: 

388 # truncated png/gif 

389 if LOAD_TRUNCATED_IMAGES: 

390 break 

391 else: 

392 msg = "image file is truncated" 

393 raise OSError(msg) from e 

394 

395 if not s: # truncated jpeg 

396 if LOAD_TRUNCATED_IMAGES: 

397 break 

398 else: 

399 msg = ( 

400 "image file is truncated " 

401 f"({len(b)} bytes not processed)" 

402 ) 

403 raise OSError(msg) 

404 

405 b = b + s 

406 n, err_code = decoder.decode(b) 

407 if n < 0: 

408 break 

409 b = b[n:] 

410 finally: 

411 # Need to cleanup here to prevent leaks 

412 decoder.cleanup() 

413 

414 self.tile = [] 

415 self.readonly = readonly 

416 

417 self.load_end() 

418 

419 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

420 self.fp.close() 

421 self.fp = None 

422 

423 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

424 # still raised if decoder fails to return anything 

425 raise _get_oserror(err_code, encoder=False) 

426 

427 return Image.Image.load(self) 

428 

429 def load_prepare(self) -> None: 

430 # create image memory if necessary 

431 if self._im is None: 

432 self.im = Image.core.new(self.mode, self.size) 

433 # create palette (optional) 

434 if self.mode == "P": 

435 Image.Image.load(self) 

436 

437 def load_end(self) -> None: 

438 # may be overridden 

439 pass 

440 

441 # may be defined for contained formats 

442 # def load_seek(self, pos: int) -> None: 

443 # pass 

444 

445 # may be defined for blocked formats (e.g. PNG) 

446 # def load_read(self, read_bytes: int) -> bytes: 

447 # pass 

448 

449 def _seek_check(self, frame: int) -> bool: 

450 if ( 

451 frame < self._min_frame 

452 # Only check upper limit on frames if additional seek operations 

453 # are not required to do so 

454 or ( 

455 not (hasattr(self, "_n_frames") and self._n_frames is None) 

456 and frame >= getattr(self, "n_frames") + self._min_frame 

457 ) 

458 ): 

459 msg = "attempt to seek outside sequence" 

460 raise EOFError(msg) 

461 

462 return self.tell() != frame 

463 

464 

465class StubHandler(abc.ABC): 

466 def open(self, im: StubImageFile) -> None: 

467 pass 

468 

469 @abc.abstractmethod 

470 def load(self, im: StubImageFile) -> Image.Image: 

471 pass 

472 

473 

474class StubImageFile(ImageFile, metaclass=abc.ABCMeta): 

475 """ 

476 Base class for stub image loaders. 

477 

478 A stub loader is an image loader that can identify files of a 

479 certain format, but relies on external code to load the file. 

480 """ 

481 

482 @abc.abstractmethod 

483 def _open(self) -> None: 

484 pass 

485 

486 def load(self) -> Image.core.PixelAccess | None: 

487 loader = self._load() 

488 if loader is None: 

489 msg = f"cannot find loader for this {self.format} file" 

490 raise OSError(msg) 

491 image = loader.load(self) 

492 assert image is not None 

493 # become the other object (!) 

494 self.__class__ = image.__class__ # type: ignore[assignment] 

495 self.__dict__ = image.__dict__ 

496 return image.load() 

497 

498 @abc.abstractmethod 

499 def _load(self) -> StubHandler | None: 

500 """(Hook) Find actual image loader.""" 

501 pass 

502 

503 

504class Parser: 

505 """ 

506 Incremental image parser. This class implements the standard 

507 feed/close consumer interface. 

508 """ 

509 

510 incremental = None 

511 image: Image.Image | None = None 

512 data: bytes | None = None 

513 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

514 offset = 0 

515 finished = 0 

516 

517 def reset(self) -> None: 

518 """ 

519 (Consumer) Reset the parser. Note that you can only call this 

520 method immediately after you've created a parser; parser 

521 instances cannot be reused. 

522 """ 

523 assert self.data is None, "cannot reuse parsers" 

524 

525 def feed(self, data: bytes) -> None: 

526 """ 

527 (Consumer) Feed data to the parser. 

528 

529 :param data: A string buffer. 

530 :exception OSError: If the parser failed to parse the image file. 

531 """ 

532 # collect data 

533 

534 if self.finished: 

535 return 

536 

537 if self.data is None: 

538 self.data = data 

539 else: 

540 self.data = self.data + data 

541 

542 # parse what we have 

543 if self.decoder: 

544 if self.offset > 0: 

545 # skip header 

546 skip = min(len(self.data), self.offset) 

547 self.data = self.data[skip:] 

548 self.offset = self.offset - skip 

549 if self.offset > 0 or not self.data: 

550 return 

551 

552 n, e = self.decoder.decode(self.data) 

553 

554 if n < 0: 

555 # end of stream 

556 self.data = None 

557 self.finished = 1 

558 if e < 0: 

559 # decoding error 

560 self.image = None 

561 raise _get_oserror(e, encoder=False) 

562 else: 

563 # end of image 

564 return 

565 self.data = self.data[n:] 

566 

567 elif self.image: 

568 # if we end up here with no decoder, this file cannot 

569 # be incrementally parsed. wait until we've gotten all 

570 # available data 

571 pass 

572 

573 else: 

574 # attempt to open this file 

575 try: 

576 with io.BytesIO(self.data) as fp: 

577 im = Image.open(fp) 

578 except OSError: 

579 pass # not enough data 

580 else: 

581 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

582 if not flag and len(im.tile) == 1: 

583 # initialize decoder 

584 im.load_prepare() 

585 d, e, o, a = im.tile[0] 

586 im.tile = [] 

587 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

588 self.decoder.setimage(im.im, e) 

589 

590 # calculate decoder offset 

591 self.offset = o 

592 if self.offset <= len(self.data): 

593 self.data = self.data[self.offset :] 

594 self.offset = 0 

595 

596 self.image = im 

597 

598 def __enter__(self) -> Parser: 

599 return self 

600 

601 def __exit__(self, *args: object) -> None: 

602 self.close() 

603 

604 def close(self) -> Image.Image: 

605 """ 

606 (Consumer) Close the stream. 

607 

608 :returns: An image object. 

609 :exception OSError: If the parser failed to parse the image file either 

610 because it cannot be identified or cannot be 

611 decoded. 

612 """ 

613 # finish decoding 

614 if self.decoder: 

615 # get rid of what's left in the buffers 

616 self.feed(b"") 

617 self.data = self.decoder = None 

618 if not self.finished: 

619 msg = "image was incomplete" 

620 raise OSError(msg) 

621 if not self.image: 

622 msg = "cannot parse this image" 

623 raise OSError(msg) 

624 if self.data: 

625 # incremental parsing not possible; reopen the file 

626 # not that we have all data 

627 with io.BytesIO(self.data) as fp: 

628 try: 

629 self.image = Image.open(fp) 

630 finally: 

631 self.image.load() 

632 return self.image 

633 

634 

635# -------------------------------------------------------------------- 

636 

637 

638def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

639 """Helper to save image based on tile list 

640 

641 :param im: Image object. 

642 :param fp: File object. 

643 :param tile: Tile list. 

644 :param bufsize: Optional buffer size 

645 """ 

646 

647 im.load() 

648 if not hasattr(im, "encoderconfig"): 

649 im.encoderconfig = () 

650 tile.sort(key=_tilesort) 

651 # FIXME: make MAXBLOCK a configuration parameter 

652 # It would be great if we could have the encoder specify what it needs 

653 # But, it would need at least the image size in most cases. RawEncode is 

654 # a tricky case. 

655 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

656 try: 

657 fh = fp.fileno() 

658 fp.flush() 

659 _encode_tile(im, fp, tile, bufsize, fh) 

660 except (AttributeError, io.UnsupportedOperation) as exc: 

661 _encode_tile(im, fp, tile, bufsize, None, exc) 

662 if hasattr(fp, "flush"): 

663 fp.flush() 

664 

665 

666def _encode_tile( 

667 im: Image.Image, 

668 fp: IO[bytes], 

669 tile: list[_Tile], 

670 bufsize: int, 

671 fh: int | None, 

672 exc: BaseException | None = None, 

673) -> None: 

674 for encoder_name, extents, offset, args in tile: 

675 if offset > 0: 

676 fp.seek(offset) 

677 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

678 try: 

679 encoder.setimage(im.im, extents) 

680 if encoder.pushes_fd: 

681 encoder.setfd(fp) 

682 errcode = encoder.encode_to_pyfd()[1] 

683 else: 

684 if exc: 

685 # compress to Python file-compatible object 

686 while True: 

687 errcode, data = encoder.encode(bufsize)[1:] 

688 fp.write(data) 

689 if errcode: 

690 break 

691 else: 

692 # slight speedup: compress to real file object 

693 assert fh is not None 

694 errcode = encoder.encode_to_file(fh, bufsize) 

695 if errcode < 0: 

696 raise _get_oserror(errcode, encoder=True) from exc 

697 finally: 

698 encoder.cleanup() 

699 

700 

701def _safe_read(fp: IO[bytes], size: int) -> bytes: 

702 """ 

703 Reads large blocks in a safe way. Unlike fp.read(n), this function 

704 doesn't trust the user. If the requested size is larger than 

705 SAFEBLOCK, the file is read block by block. 

706 

707 :param fp: File handle. Must implement a <b>read</b> method. 

708 :param size: Number of bytes to read. 

709 :returns: A string containing <i>size</i> bytes of data. 

710 

711 Raises an OSError if the file is truncated and the read cannot be completed 

712 

713 """ 

714 if size <= 0: 

715 return b"" 

716 if size <= SAFEBLOCK: 

717 data = fp.read(size) 

718 if len(data) < size: 

719 msg = "Truncated File Read" 

720 raise OSError(msg) 

721 return data 

722 blocks: list[bytes] = [] 

723 remaining_size = size 

724 while remaining_size > 0: 

725 block = fp.read(min(remaining_size, SAFEBLOCK)) 

726 if not block: 

727 break 

728 blocks.append(block) 

729 remaining_size -= len(block) 

730 if sum(len(block) for block in blocks) < size: 

731 msg = "Truncated File Read" 

732 raise OSError(msg) 

733 return b"".join(blocks) 

734 

735 

736class PyCodecState: 

737 def __init__(self) -> None: 

738 self.xsize = 0 

739 self.ysize = 0 

740 self.xoff = 0 

741 self.yoff = 0 

742 

743 def extents(self) -> tuple[int, int, int, int]: 

744 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

745 

746 

747class PyCodec: 

748 fd: IO[bytes] | None 

749 

750 def __init__(self, mode: str, *args: Any) -> None: 

751 self.im: Image.core.ImagingCore | None = None 

752 self.state = PyCodecState() 

753 self.fd = None 

754 self.mode = mode 

755 self.init(args) 

756 

757 def init(self, args: tuple[Any, ...]) -> None: 

758 """ 

759 Override to perform codec specific initialization 

760 

761 :param args: Tuple of arg items from the tile entry 

762 :returns: None 

763 """ 

764 self.args = args 

765 

766 def cleanup(self) -> None: 

767 """ 

768 Override to perform codec specific cleanup 

769 

770 :returns: None 

771 """ 

772 pass 

773 

774 def setfd(self, fd: IO[bytes]) -> None: 

775 """ 

776 Called from ImageFile to set the Python file-like object 

777 

778 :param fd: A Python file-like object 

779 :returns: None 

780 """ 

781 self.fd = fd 

782 

783 def setimage( 

784 self, 

785 im: Image.core.ImagingCore, 

786 extents: tuple[int, int, int, int] | None = None, 

787 ) -> None: 

788 """ 

789 Called from ImageFile to set the core output image for the codec 

790 

791 :param im: A core image object 

792 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

793 for this tile 

794 :returns: None 

795 """ 

796 

797 # following c code 

798 self.im = im 

799 

800 if extents: 

801 x0, y0, x1, y1 = extents 

802 else: 

803 x0, y0, x1, y1 = (0, 0, 0, 0) 

804 

805 if x0 == 0 and x1 == 0: 

806 self.state.xsize, self.state.ysize = self.im.size 

807 else: 

808 self.state.xoff = x0 

809 self.state.yoff = y0 

810 self.state.xsize = x1 - x0 

811 self.state.ysize = y1 - y0 

812 

813 if self.state.xsize <= 0 or self.state.ysize <= 0: 

814 msg = "Size must be positive" 

815 raise ValueError(msg) 

816 

817 if ( 

818 self.state.xsize + self.state.xoff > self.im.size[0] 

819 or self.state.ysize + self.state.yoff > self.im.size[1] 

820 ): 

821 msg = "Tile cannot extend outside image" 

822 raise ValueError(msg) 

823 

824 

825class PyDecoder(PyCodec): 

826 """ 

827 Python implementation of a format decoder. Override this class and 

828 add the decoding logic in the :meth:`decode` method. 

829 

830 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

831 """ 

832 

833 _pulls_fd = False 

834 

835 @property 

836 def pulls_fd(self) -> bool: 

837 return self._pulls_fd 

838 

839 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

840 """ 

841 Override to perform the decoding process. 

842 

843 :param buffer: A bytes object with the data to be decoded. 

844 :returns: A tuple of ``(bytes consumed, errcode)``. 

845 If finished with decoding return -1 for the bytes consumed. 

846 Err codes are from :data:`.ImageFile.ERRORS`. 

847 """ 

848 msg = "unavailable in base decoder" 

849 raise NotImplementedError(msg) 

850 

851 def set_as_raw( 

852 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

853 ) -> None: 

854 """ 

855 Convenience method to set the internal image from a stream of raw data 

856 

857 :param data: Bytes to be set 

858 :param rawmode: The rawmode to be used for the decoder. 

859 If not specified, it will default to the mode of the image 

860 :param extra: Extra arguments for the decoder. 

861 :returns: None 

862 """ 

863 

864 if not rawmode: 

865 rawmode = self.mode 

866 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

867 assert self.im is not None 

868 d.setimage(self.im, self.state.extents()) 

869 s = d.decode(data) 

870 

871 if s[0] >= 0: 

872 msg = "not enough image data" 

873 raise ValueError(msg) 

874 if s[1] != 0: 

875 msg = "cannot decode image data" 

876 raise ValueError(msg) 

877 

878 

879class PyEncoder(PyCodec): 

880 """ 

881 Python implementation of a format encoder. Override this class and 

882 add the decoding logic in the :meth:`encode` method. 

883 

884 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

885 """ 

886 

887 _pushes_fd = False 

888 

889 @property 

890 def pushes_fd(self) -> bool: 

891 return self._pushes_fd 

892 

893 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

894 """ 

895 Override to perform the encoding process. 

896 

897 :param bufsize: Buffer size. 

898 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

899 If finished with encoding return 1 for the error code. 

900 Err codes are from :data:`.ImageFile.ERRORS`. 

901 """ 

902 msg = "unavailable in base encoder" 

903 raise NotImplementedError(msg) 

904 

905 def encode_to_pyfd(self) -> tuple[int, int]: 

906 """ 

907 If ``pushes_fd`` is ``True``, then this method will be used, 

908 and ``encode()`` will only be called once. 

909 

910 :returns: A tuple of ``(bytes consumed, errcode)``. 

911 Err codes are from :data:`.ImageFile.ERRORS`. 

912 """ 

913 if not self.pushes_fd: 

914 return 0, -8 # bad configuration 

915 bytes_consumed, errcode, data = self.encode(0) 

916 if data: 

917 assert self.fd is not None 

918 self.fd.write(data) 

919 return bytes_consumed, errcode 

920 

921 def encode_to_file(self, fh: int, bufsize: int) -> int: 

922 """ 

923 :param fh: File handle. 

924 :param bufsize: Buffer size. 

925 

926 :returns: If finished successfully, return 0. 

927 Otherwise, return an error code. Err codes are from 

928 :data:`.ImageFile.ERRORS`. 

929 """ 

930 errcode = 0 

931 while errcode == 0: 

932 status, errcode, buf = self.encode(bufsize) 

933 if status > 0: 

934 os.write(fh, buf[status:]) 

935 return errcode