Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 57%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

482 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import logging 

35import os 

36import struct 

37from typing import IO, Any, NamedTuple, cast 

38 

39from . import ExifTags, Image 

40from ._util import DeferredError, is_path 

41 

42TYPE_CHECKING = False 

43if TYPE_CHECKING: 

44 from ._typing import StrOrBytesPath 

45 

46logger = logging.getLogger(__name__) 

47 

48MAXBLOCK = 65536 

49""" 

50By default, Pillow processes image data in blocks. This helps to prevent excessive use 

51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``. 

52 

53When reading an image, this is the number of bytes to read at once. 

54 

55When writing an image, this is the number of bytes to write at once. 

56If the image width times 4 is greater, then that will be used instead. 

57Plugins may also set a greater number. 

58 

59User code may set this to another number. 

60""" 

61 

62SAFEBLOCK = 1024 * 1024 

63 

64LOAD_TRUNCATED_IMAGES = False 

65"""Whether or not to load truncated image files. User code may change this.""" 

66 

67ERRORS = { 

68 -1: "image buffer overrun error", 

69 -2: "decoding error", 

70 -3: "unknown error", 

71 -8: "bad configuration", 

72 -9: "out of memory error", 

73} 

74""" 

75Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

77:meth:`.PyEncoder.encode_to_file`. 

78""" 

79 

80 

81# 

82# -------------------------------------------------------------------- 

83# Helpers 

84 

85 

86def _get_oserror(error: int, *, encoder: bool) -> OSError: 

87 try: 

88 msg = Image.core.getcodecstatus(error) 

89 except AttributeError: 

90 msg = ERRORS.get(error) 

91 if not msg: 

92 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

93 msg += f" when {'writing' if encoder else 'reading'} image file" 

94 return OSError(msg) 

95 

96 

97def _tilesort(t: _Tile) -> int: 

98 # sort on offset 

99 return t[2] 

100 

101 

102class _Tile(NamedTuple): 

103 codec_name: str 

104 extents: tuple[int, int, int, int] | None 

105 offset: int = 0 

106 args: tuple[Any, ...] | str | None = None 

107 

108 

109# 

110# -------------------------------------------------------------------- 

111# ImageFile base class 

112 

113 

114class ImageFile(Image.Image): 

115 """Base class for image file format handlers.""" 

116 

117 def __init__( 

118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

119 ) -> None: 

120 super().__init__() 

121 

122 self._min_frame = 0 

123 

124 self.custom_mimetype: str | None = None 

125 

126 self.tile: list[_Tile] = [] 

127 """ A list of tile descriptors """ 

128 

129 self.readonly = 1 # until we know better 

130 

131 self.decoderconfig: tuple[Any, ...] = () 

132 self.decodermaxblock = MAXBLOCK 

133 

134 self.fp: IO[bytes] | None 

135 self._fp: IO[bytes] | DeferredError 

136 if is_path(fp): 

137 # filename 

138 self.fp = open(fp, "rb") 

139 self.filename = os.fspath(fp) 

140 self._exclusive_fp = True 

141 else: 

142 # stream 

143 self.fp = cast(IO[bytes], fp) 

144 self.filename = filename if filename is not None else "" 

145 # can be overridden 

146 self._exclusive_fp = False 

147 

148 try: 

149 try: 

150 self._open() 

151 

152 if isinstance(self, StubImageFile): 

153 if loader := self._load(): 

154 loader.open(self) 

155 except ( 

156 IndexError, # end of data 

157 TypeError, # end of data (ord) 

158 KeyError, # unsupported mode 

159 EOFError, # got header but not the first frame 

160 struct.error, 

161 ) as v: 

162 raise SyntaxError(v) from v 

163 

164 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

165 msg = "not identified by this driver" 

166 raise SyntaxError(msg) 

167 except BaseException: 

168 # close the file only if we have opened it this constructor 

169 if self._exclusive_fp: 

170 self.fp.close() 

171 raise 

172 

173 def _open(self) -> None: 

174 pass 

175 

176 # Context manager support 

177 def __enter__(self) -> ImageFile: 

178 return self 

179 

180 def _close_fp(self) -> None: 

181 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError): 

182 if self._fp != self.fp: 

183 self._fp.close() 

184 self._fp = DeferredError(ValueError("Operation on closed image")) 

185 if self.fp: 

186 self.fp.close() 

187 

188 def __exit__(self, *args: object) -> None: 

189 if getattr(self, "_exclusive_fp", False): 

190 self._close_fp() 

191 self.fp = None 

192 

193 def close(self) -> None: 

194 """ 

195 Closes the file pointer, if possible. 

196 

197 This operation will destroy the image core and release its memory. 

198 The image data will be unusable afterward. 

199 

200 This function is required to close images that have multiple frames or 

201 have not had their file read and closed by the 

202 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for 

203 more information. 

204 """ 

205 try: 

206 self._close_fp() 

207 self.fp = None 

208 except Exception as msg: 

209 logger.debug("Error closing: %s", msg) 

210 

211 super().close() 

212 

213 def get_child_images(self) -> list[ImageFile]: 

214 child_images = [] 

215 exif = self.getexif() 

216 ifds = [] 

217 if ExifTags.Base.SubIFDs in exif: 

218 subifd_offsets = exif[ExifTags.Base.SubIFDs] 

219 if subifd_offsets: 

220 if not isinstance(subifd_offsets, tuple): 

221 subifd_offsets = (subifd_offsets,) 

222 ifds = [ 

223 (exif._get_ifd_dict(subifd_offset), subifd_offset) 

224 for subifd_offset in subifd_offsets 

225 ] 

226 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1) 

227 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset): 

228 assert exif._info is not None 

229 ifds.append((ifd1, exif._info.next)) 

230 

231 offset = None 

232 for ifd, ifd_offset in ifds: 

233 assert self.fp is not None 

234 current_offset = self.fp.tell() 

235 if offset is None: 

236 offset = current_offset 

237 

238 fp = self.fp 

239 if ifd is not None: 

240 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset) 

241 if thumbnail_offset is not None: 

242 thumbnail_offset += getattr(self, "_exif_offset", 0) 

243 self.fp.seek(thumbnail_offset) 

244 

245 length = ifd.get(ExifTags.Base.JpegIFByteCount) 

246 assert isinstance(length, int) 

247 data = self.fp.read(length) 

248 fp = io.BytesIO(data) 

249 

250 with Image.open(fp) as im: 

251 from . import TiffImagePlugin 

252 

253 if thumbnail_offset is None and isinstance( 

254 im, TiffImagePlugin.TiffImageFile 

255 ): 

256 im._frame_pos = [ifd_offset] 

257 im._seek(0) 

258 im.load() 

259 child_images.append(im) 

260 

261 if offset is not None: 

262 assert self.fp is not None 

263 self.fp.seek(offset) 

264 return child_images 

265 

266 def get_format_mimetype(self) -> str | None: 

267 if self.custom_mimetype: 

268 return self.custom_mimetype 

269 if self.format is not None: 

270 return Image.MIME.get(self.format.upper()) 

271 return None 

272 

273 def __getstate__(self) -> list[Any]: 

274 return super().__getstate__() + [self.filename] 

275 

276 def __setstate__(self, state: list[Any]) -> None: 

277 self.tile = [] 

278 if len(state) > 5: 

279 self.filename = state[5] 

280 super().__setstate__(state) 

281 

282 def verify(self) -> None: 

283 """Check file integrity""" 

284 

285 # raise exception if something's wrong. must be called 

286 # directly after open, and closes file when finished. 

287 if self._exclusive_fp and self.fp: 

288 self.fp.close() 

289 self.fp = None 

290 

291 def load(self) -> Image.core.PixelAccess | None: 

292 """Load image data based on tile list""" 

293 

294 if not self.tile and self._im is None: 

295 msg = "cannot load this image" 

296 raise OSError(msg) 

297 

298 pixel = Image.Image.load(self) 

299 if not self.tile: 

300 return pixel 

301 

302 self.map: mmap.mmap | None = None 

303 use_mmap = self.filename and len(self.tile) == 1 

304 

305 assert self.fp is not None 

306 readonly = 0 

307 

308 # look for read/seek overrides 

309 if hasattr(self, "load_read"): 

310 read = self.load_read 

311 # don't use mmap if there are custom read/seek functions 

312 use_mmap = False 

313 else: 

314 read = self.fp.read 

315 

316 if hasattr(self, "load_seek"): 

317 seek = self.load_seek 

318 use_mmap = False 

319 else: 

320 seek = self.fp.seek 

321 

322 if use_mmap: 

323 # try memory mapping 

324 decoder_name, extents, offset, args = self.tile[0] 

325 if isinstance(args, str): 

326 args = (args, 0, 1) 

327 if ( 

328 decoder_name == "raw" 

329 and isinstance(args, tuple) 

330 and len(args) >= 3 

331 and args[0] == self.mode 

332 and args[0] in Image._MAPMODES 

333 ): 

334 if offset < 0: 

335 msg = "Tile offset cannot be negative" 

336 raise ValueError(msg) 

337 try: 

338 # use mmap, if possible 

339 import mmap 

340 

341 with open(self.filename) as fp: 

342 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

343 if offset + self.size[1] * args[1] > self.map.size(): 

344 msg = "buffer is not large enough" 

345 raise OSError(msg) 

346 self.im = Image.core.map_buffer( 

347 self.map, self.size, decoder_name, offset, args 

348 ) 

349 readonly = 1 

350 # After trashing self.im, 

351 # we might need to reload the palette data. 

352 if self.palette: 

353 self.palette.dirty = 1 

354 except (AttributeError, OSError, ImportError): 

355 self.map = None 

356 

357 self.load_prepare() 

358 err_code = -3 # initialize to unknown error 

359 if not self.map: 

360 # sort tiles in file order 

361 self.tile.sort(key=_tilesort) 

362 

363 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

364 prefix = getattr(self, "tile_prefix", b"") 

365 

366 # Remove consecutive duplicates that only differ by their offset 

367 self.tile = [ 

368 list(tiles)[-1] 

369 for _, tiles in itertools.groupby( 

370 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

371 ) 

372 ] 

373 for i, (decoder_name, extents, offset, args) in enumerate(self.tile): 

374 seek(offset) 

375 decoder = Image._getdecoder( 

376 self.mode, decoder_name, args, self.decoderconfig 

377 ) 

378 try: 

379 decoder.setimage(self.im, extents) 

380 if decoder.pulls_fd: 

381 decoder.setfd(self.fp) 

382 err_code = decoder.decode(b"")[1] 

383 else: 

384 b = prefix 

385 while True: 

386 read_bytes = self.decodermaxblock 

387 if i + 1 < len(self.tile): 

388 next_offset = self.tile[i + 1].offset 

389 if next_offset > offset: 

390 read_bytes = next_offset - offset 

391 try: 

392 s = read(read_bytes) 

393 except (IndexError, struct.error) as e: 

394 # truncated png/gif 

395 if LOAD_TRUNCATED_IMAGES: 

396 break 

397 else: 

398 msg = "image file is truncated" 

399 raise OSError(msg) from e 

400 

401 if not s: # truncated jpeg 

402 if LOAD_TRUNCATED_IMAGES: 

403 break 

404 else: 

405 msg = ( 

406 "image file is truncated " 

407 f"({len(b)} bytes not processed)" 

408 ) 

409 raise OSError(msg) 

410 

411 b = b + s 

412 n, err_code = decoder.decode(b) 

413 if n < 0: 

414 break 

415 b = b[n:] 

416 finally: 

417 # Need to cleanup here to prevent leaks 

418 decoder.cleanup() 

419 

420 self.tile = [] 

421 self.readonly = readonly 

422 

423 self.load_end() 

424 

425 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

426 self.fp.close() 

427 self.fp = None 

428 

429 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

430 # still raised if decoder fails to return anything 

431 raise _get_oserror(err_code, encoder=False) 

432 

433 return Image.Image.load(self) 

434 

435 def load_prepare(self) -> None: 

436 # create image memory if necessary 

437 if self._im is None: 

438 self.im = Image.core.new(self.mode, self.size) 

439 # create palette (optional) 

440 if self.mode == "P": 

441 Image.Image.load(self) 

442 

443 def load_end(self) -> None: 

444 # may be overridden 

445 pass 

446 

447 # may be defined for contained formats 

448 # def load_seek(self, pos: int) -> None: 

449 # pass 

450 

451 # may be defined for blocked formats (e.g. PNG) 

452 # def load_read(self, read_bytes: int) -> bytes: 

453 # pass 

454 

455 def _seek_check(self, frame: int) -> bool: 

456 if ( 

457 frame < self._min_frame 

458 # Only check upper limit on frames if additional seek operations 

459 # are not required to do so 

460 or ( 

461 not (hasattr(self, "_n_frames") and self._n_frames is None) 

462 and frame >= getattr(self, "n_frames") + self._min_frame 

463 ) 

464 ): 

465 msg = "attempt to seek outside sequence" 

466 raise EOFError(msg) 

467 

468 return self.tell() != frame 

469 

470 

471class StubHandler(abc.ABC): 

472 def open(self, im: StubImageFile) -> None: 

473 pass 

474 

475 @abc.abstractmethod 

476 def load(self, im: StubImageFile) -> Image.Image: 

477 pass 

478 

479 

480class StubImageFile(ImageFile, metaclass=abc.ABCMeta): 

481 """ 

482 Base class for stub image loaders. 

483 

484 A stub loader is an image loader that can identify files of a 

485 certain format, but relies on external code to load the file. 

486 """ 

487 

488 @abc.abstractmethod 

489 def _open(self) -> None: 

490 pass 

491 

492 def load(self) -> Image.core.PixelAccess | None: 

493 loader = self._load() 

494 if loader is None: 

495 msg = f"cannot find loader for this {self.format} file" 

496 raise OSError(msg) 

497 image = loader.load(self) 

498 assert image is not None 

499 # become the other object (!) 

500 self.__class__ = image.__class__ # type: ignore[assignment] 

501 self.__dict__ = image.__dict__ 

502 return image.load() 

503 

504 @abc.abstractmethod 

505 def _load(self) -> StubHandler | None: 

506 """(Hook) Find actual image loader.""" 

507 pass 

508 

509 

510class Parser: 

511 """ 

512 Incremental image parser. This class implements the standard 

513 feed/close consumer interface. 

514 """ 

515 

516 incremental = None 

517 image: Image.Image | None = None 

518 data: bytes | None = None 

519 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

520 offset = 0 

521 finished = 0 

522 

523 def reset(self) -> None: 

524 """ 

525 (Consumer) Reset the parser. Note that you can only call this 

526 method immediately after you've created a parser; parser 

527 instances cannot be reused. 

528 """ 

529 assert self.data is None, "cannot reuse parsers" 

530 

531 def feed(self, data: bytes) -> None: 

532 """ 

533 (Consumer) Feed data to the parser. 

534 

535 :param data: A string buffer. 

536 :exception OSError: If the parser failed to parse the image file. 

537 """ 

538 # collect data 

539 

540 if self.finished: 

541 return 

542 

543 if self.data is None: 

544 self.data = data 

545 else: 

546 self.data = self.data + data 

547 

548 # parse what we have 

549 if self.decoder: 

550 if self.offset > 0: 

551 # skip header 

552 skip = min(len(self.data), self.offset) 

553 self.data = self.data[skip:] 

554 self.offset = self.offset - skip 

555 if self.offset > 0 or not self.data: 

556 return 

557 

558 n, e = self.decoder.decode(self.data) 

559 

560 if n < 0: 

561 # end of stream 

562 self.data = None 

563 self.finished = 1 

564 if e < 0: 

565 # decoding error 

566 self.image = None 

567 raise _get_oserror(e, encoder=False) 

568 else: 

569 # end of image 

570 return 

571 self.data = self.data[n:] 

572 

573 elif self.image: 

574 # if we end up here with no decoder, this file cannot 

575 # be incrementally parsed. wait until we've gotten all 

576 # available data 

577 pass 

578 

579 else: 

580 # attempt to open this file 

581 try: 

582 with io.BytesIO(self.data) as fp: 

583 im = Image.open(fp) 

584 except OSError: 

585 pass # not enough data 

586 else: 

587 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

588 if not flag and len(im.tile) == 1: 

589 # initialize decoder 

590 im.load_prepare() 

591 d, e, o, a = im.tile[0] 

592 im.tile = [] 

593 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

594 self.decoder.setimage(im.im, e) 

595 

596 # calculate decoder offset 

597 self.offset = o 

598 if self.offset <= len(self.data): 

599 self.data = self.data[self.offset :] 

600 self.offset = 0 

601 

602 self.image = im 

603 

604 def __enter__(self) -> Parser: 

605 return self 

606 

607 def __exit__(self, *args: object) -> None: 

608 self.close() 

609 

610 def close(self) -> Image.Image: 

611 """ 

612 (Consumer) Close the stream. 

613 

614 :returns: An image object. 

615 :exception OSError: If the parser failed to parse the image file either 

616 because it cannot be identified or cannot be 

617 decoded. 

618 """ 

619 # finish decoding 

620 if self.decoder: 

621 # get rid of what's left in the buffers 

622 self.feed(b"") 

623 self.data = self.decoder = None 

624 if not self.finished: 

625 msg = "image was incomplete" 

626 raise OSError(msg) 

627 if not self.image: 

628 msg = "cannot parse this image" 

629 raise OSError(msg) 

630 if self.data: 

631 # incremental parsing not possible; reopen the file 

632 # not that we have all data 

633 with io.BytesIO(self.data) as fp: 

634 try: 

635 self.image = Image.open(fp) 

636 finally: 

637 self.image.load() 

638 return self.image 

639 

640 

641# -------------------------------------------------------------------- 

642 

643 

644def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

645 """Helper to save image based on tile list 

646 

647 :param im: Image object. 

648 :param fp: File object. 

649 :param tile: Tile list. 

650 :param bufsize: Optional buffer size 

651 """ 

652 

653 im.load() 

654 if not hasattr(im, "encoderconfig"): 

655 im.encoderconfig = () 

656 tile.sort(key=_tilesort) 

657 # FIXME: make MAXBLOCK a configuration parameter 

658 # It would be great if we could have the encoder specify what it needs 

659 # But, it would need at least the image size in most cases. RawEncode is 

660 # a tricky case. 

661 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

662 try: 

663 fh = fp.fileno() 

664 fp.flush() 

665 _encode_tile(im, fp, tile, bufsize, fh) 

666 except (AttributeError, io.UnsupportedOperation) as exc: 

667 _encode_tile(im, fp, tile, bufsize, None, exc) 

668 if hasattr(fp, "flush"): 

669 fp.flush() 

670 

671 

672def _encode_tile( 

673 im: Image.Image, 

674 fp: IO[bytes], 

675 tile: list[_Tile], 

676 bufsize: int, 

677 fh: int | None, 

678 exc: BaseException | None = None, 

679) -> None: 

680 for encoder_name, extents, offset, args in tile: 

681 if offset > 0: 

682 fp.seek(offset) 

683 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

684 try: 

685 encoder.setimage(im.im, extents) 

686 if encoder.pushes_fd: 

687 encoder.setfd(fp) 

688 errcode = encoder.encode_to_pyfd()[1] 

689 else: 

690 if exc: 

691 # compress to Python file-compatible object 

692 while True: 

693 errcode, data = encoder.encode(bufsize)[1:] 

694 fp.write(data) 

695 if errcode: 

696 break 

697 else: 

698 # slight speedup: compress to real file object 

699 assert fh is not None 

700 errcode = encoder.encode_to_file(fh, bufsize) 

701 if errcode < 0: 

702 raise _get_oserror(errcode, encoder=True) from exc 

703 finally: 

704 encoder.cleanup() 

705 

706 

707def _safe_read(fp: IO[bytes], size: int) -> bytes: 

708 """ 

709 Reads large blocks in a safe way. Unlike fp.read(n), this function 

710 doesn't trust the user. If the requested size is larger than 

711 SAFEBLOCK, the file is read block by block. 

712 

713 :param fp: File handle. Must implement a <b>read</b> method. 

714 :param size: Number of bytes to read. 

715 :returns: A string containing <i>size</i> bytes of data. 

716 

717 Raises an OSError if the file is truncated and the read cannot be completed 

718 

719 """ 

720 if size <= 0: 

721 return b"" 

722 if size <= SAFEBLOCK: 

723 data = fp.read(size) 

724 if len(data) < size: 

725 msg = "Truncated File Read" 

726 raise OSError(msg) 

727 return data 

728 blocks: list[bytes] = [] 

729 remaining_size = size 

730 while remaining_size > 0: 

731 block = fp.read(min(remaining_size, SAFEBLOCK)) 

732 if not block: 

733 break 

734 blocks.append(block) 

735 remaining_size -= len(block) 

736 if sum(len(block) for block in blocks) < size: 

737 msg = "Truncated File Read" 

738 raise OSError(msg) 

739 return b"".join(blocks) 

740 

741 

742class PyCodecState: 

743 def __init__(self) -> None: 

744 self.xsize = 0 

745 self.ysize = 0 

746 self.xoff = 0 

747 self.yoff = 0 

748 

749 def extents(self) -> tuple[int, int, int, int]: 

750 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

751 

752 

753class PyCodec: 

754 fd: IO[bytes] | None 

755 

756 def __init__(self, mode: str, *args: Any) -> None: 

757 self.im: Image.core.ImagingCore | None = None 

758 self.state = PyCodecState() 

759 self.fd = None 

760 self.mode = mode 

761 self.init(args) 

762 

763 def init(self, args: tuple[Any, ...]) -> None: 

764 """ 

765 Override to perform codec specific initialization 

766 

767 :param args: Tuple of arg items from the tile entry 

768 :returns: None 

769 """ 

770 self.args = args 

771 

772 def cleanup(self) -> None: 

773 """ 

774 Override to perform codec specific cleanup 

775 

776 :returns: None 

777 """ 

778 pass 

779 

780 def setfd(self, fd: IO[bytes]) -> None: 

781 """ 

782 Called from ImageFile to set the Python file-like object 

783 

784 :param fd: A Python file-like object 

785 :returns: None 

786 """ 

787 self.fd = fd 

788 

789 def setimage( 

790 self, 

791 im: Image.core.ImagingCore, 

792 extents: tuple[int, int, int, int] | None = None, 

793 ) -> None: 

794 """ 

795 Called from ImageFile to set the core output image for the codec 

796 

797 :param im: A core image object 

798 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

799 for this tile 

800 :returns: None 

801 """ 

802 

803 # following c code 

804 self.im = im 

805 

806 if extents: 

807 x0, y0, x1, y1 = extents 

808 

809 if x0 < 0 or y0 < 0 or x1 > self.im.size[0] or y1 > self.im.size[1]: 

810 msg = "Tile cannot extend outside image" 

811 raise ValueError(msg) 

812 

813 self.state.xoff = x0 

814 self.state.yoff = y0 

815 self.state.xsize = x1 - x0 

816 self.state.ysize = y1 - y0 

817 else: 

818 self.state.xsize, self.state.ysize = self.im.size 

819 

820 if self.state.xsize <= 0 or self.state.ysize <= 0: 

821 msg = "Size must be positive" 

822 raise ValueError(msg) 

823 

824 

825class PyDecoder(PyCodec): 

826 """ 

827 Python implementation of a format decoder. Override this class and 

828 add the decoding logic in the :meth:`decode` method. 

829 

830 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

831 """ 

832 

833 _pulls_fd = False 

834 

835 @property 

836 def pulls_fd(self) -> bool: 

837 return self._pulls_fd 

838 

839 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

840 """ 

841 Override to perform the decoding process. 

842 

843 :param buffer: A bytes object with the data to be decoded. 

844 :returns: A tuple of ``(bytes consumed, errcode)``. 

845 If finished with decoding return -1 for the bytes consumed. 

846 Err codes are from :data:`.ImageFile.ERRORS`. 

847 """ 

848 msg = "unavailable in base decoder" 

849 raise NotImplementedError(msg) 

850 

851 def set_as_raw( 

852 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

853 ) -> None: 

854 """ 

855 Convenience method to set the internal image from a stream of raw data 

856 

857 :param data: Bytes to be set 

858 :param rawmode: The rawmode to be used for the decoder. 

859 If not specified, it will default to the mode of the image 

860 :param extra: Extra arguments for the decoder. 

861 :returns: None 

862 """ 

863 

864 if not rawmode: 

865 rawmode = self.mode 

866 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

867 assert self.im is not None 

868 d.setimage(self.im, self.state.extents()) 

869 s = d.decode(data) 

870 

871 if s[0] >= 0: 

872 msg = "not enough image data" 

873 raise ValueError(msg) 

874 if s[1] != 0: 

875 msg = "cannot decode image data" 

876 raise ValueError(msg) 

877 

878 

879class PyEncoder(PyCodec): 

880 """ 

881 Python implementation of a format encoder. Override this class and 

882 add the decoding logic in the :meth:`encode` method. 

883 

884 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

885 """ 

886 

887 _pushes_fd = False 

888 

889 @property 

890 def pushes_fd(self) -> bool: 

891 return self._pushes_fd 

892 

893 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

894 """ 

895 Override to perform the encoding process. 

896 

897 :param bufsize: Buffer size. 

898 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

899 If finished with encoding return 1 for the error code. 

900 Err codes are from :data:`.ImageFile.ERRORS`. 

901 """ 

902 msg = "unavailable in base encoder" 

903 raise NotImplementedError(msg) 

904 

905 def encode_to_pyfd(self) -> tuple[int, int]: 

906 """ 

907 If ``pushes_fd`` is ``True``, then this method will be used, 

908 and ``encode()`` will only be called once. 

909 

910 :returns: A tuple of ``(bytes consumed, errcode)``. 

911 Err codes are from :data:`.ImageFile.ERRORS`. 

912 """ 

913 if not self.pushes_fd: 

914 return 0, -8 # bad configuration 

915 bytes_consumed, errcode, data = self.encode(0) 

916 if data: 

917 assert self.fd is not None 

918 self.fd.write(data) 

919 return bytes_consumed, errcode 

920 

921 def encode_to_file(self, fh: int, bufsize: int) -> int: 

922 """ 

923 :param fh: File handle. 

924 :param bufsize: Buffer size. 

925 

926 :returns: If finished successfully, return 0. 

927 Otherwise, return an error code. Err codes are from 

928 :data:`.ImageFile.ERRORS`. 

929 """ 

930 errcode = 0 

931 while errcode == 0: 

932 status, errcode, buf = self.encode(bufsize) 

933 if status > 0: 

934 os.write(fh, buf[status:]) 

935 return errcode