Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

470 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import logging 

35import os 

36import struct 

37from typing import IO, Any, NamedTuple, cast 

38 

39from . import ExifTags, Image 

40from ._util import DeferredError, is_path 

41 

42TYPE_CHECKING = False 

43if TYPE_CHECKING: 

44 from ._typing import StrOrBytesPath 

45 

46logger = logging.getLogger(__name__) 

47 

48MAXBLOCK = 65536 

49 

50SAFEBLOCK = 1024 * 1024 

51 

52LOAD_TRUNCATED_IMAGES = False 

53"""Whether or not to load truncated image files. User code may change this.""" 

54 

55ERRORS = { 

56 -1: "image buffer overrun error", 

57 -2: "decoding error", 

58 -3: "unknown error", 

59 -8: "bad configuration", 

60 -9: "out of memory error", 

61} 

62""" 

63Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

64:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

65:meth:`.PyEncoder.encode_to_file`. 

66""" 

67 

68 

69# 

70# -------------------------------------------------------------------- 

71# Helpers 

72 

73 

74def _get_oserror(error: int, *, encoder: bool) -> OSError: 

75 try: 

76 msg = Image.core.getcodecstatus(error) 

77 except AttributeError: 

78 msg = ERRORS.get(error) 

79 if not msg: 

80 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

81 msg += f" when {'writing' if encoder else 'reading'} image file" 

82 return OSError(msg) 

83 

84 

85def _tilesort(t: _Tile) -> int: 

86 # sort on offset 

87 return t[2] 

88 

89 

90class _Tile(NamedTuple): 

91 codec_name: str 

92 extents: tuple[int, int, int, int] | None 

93 offset: int = 0 

94 args: tuple[Any, ...] | str | None = None 

95 

96 

97# 

98# -------------------------------------------------------------------- 

99# ImageFile base class 

100 

101 

102class ImageFile(Image.Image): 

103 """Base class for image file format handlers.""" 

104 

105 def __init__( 

106 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

107 ) -> None: 

108 super().__init__() 

109 

110 self._min_frame = 0 

111 

112 self.custom_mimetype: str | None = None 

113 

114 self.tile: list[_Tile] = [] 

115 """ A list of tile descriptors """ 

116 

117 self.readonly = 1 # until we know better 

118 

119 self.decoderconfig: tuple[Any, ...] = () 

120 self.decodermaxblock = MAXBLOCK 

121 

122 if is_path(fp): 

123 # filename 

124 self.fp = open(fp, "rb") 

125 self.filename = os.fspath(fp) 

126 self._exclusive_fp = True 

127 else: 

128 # stream 

129 self.fp = cast(IO[bytes], fp) 

130 self.filename = filename if filename is not None else "" 

131 # can be overridden 

132 self._exclusive_fp = False 

133 

134 try: 

135 try: 

136 self._open() 

137 except ( 

138 IndexError, # end of data 

139 TypeError, # end of data (ord) 

140 KeyError, # unsupported mode 

141 EOFError, # got header but not the first frame 

142 struct.error, 

143 ) as v: 

144 raise SyntaxError(v) from v 

145 

146 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

147 msg = "not identified by this driver" 

148 raise SyntaxError(msg) 

149 except BaseException: 

150 # close the file only if we have opened it this constructor 

151 if self._exclusive_fp: 

152 self.fp.close() 

153 raise 

154 

155 def _open(self) -> None: 

156 pass 

157 

158 def _close_fp(self): 

159 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError): 

160 if self._fp != self.fp: 

161 self._fp.close() 

162 self._fp = DeferredError(ValueError("Operation on closed image")) 

163 if self.fp: 

164 self.fp.close() 

165 

166 def close(self) -> None: 

167 """ 

168 Closes the file pointer, if possible. 

169 

170 This operation will destroy the image core and release its memory. 

171 The image data will be unusable afterward. 

172 

173 This function is required to close images that have multiple frames or 

174 have not had their file read and closed by the 

175 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for 

176 more information. 

177 """ 

178 try: 

179 self._close_fp() 

180 self.fp = None 

181 except Exception as msg: 

182 logger.debug("Error closing: %s", msg) 

183 

184 super().close() 

185 

186 def get_child_images(self) -> list[ImageFile]: 

187 child_images = [] 

188 exif = self.getexif() 

189 ifds = [] 

190 if ExifTags.Base.SubIFDs in exif: 

191 subifd_offsets = exif[ExifTags.Base.SubIFDs] 

192 if subifd_offsets: 

193 if not isinstance(subifd_offsets, tuple): 

194 subifd_offsets = (subifd_offsets,) 

195 for subifd_offset in subifd_offsets: 

196 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset)) 

197 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1) 

198 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset): 

199 assert exif._info is not None 

200 ifds.append((ifd1, exif._info.next)) 

201 

202 offset = None 

203 for ifd, ifd_offset in ifds: 

204 assert self.fp is not None 

205 current_offset = self.fp.tell() 

206 if offset is None: 

207 offset = current_offset 

208 

209 fp = self.fp 

210 if ifd is not None: 

211 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset) 

212 if thumbnail_offset is not None: 

213 thumbnail_offset += getattr(self, "_exif_offset", 0) 

214 self.fp.seek(thumbnail_offset) 

215 

216 length = ifd.get(ExifTags.Base.JpegIFByteCount) 

217 assert isinstance(length, int) 

218 data = self.fp.read(length) 

219 fp = io.BytesIO(data) 

220 

221 with Image.open(fp) as im: 

222 from . import TiffImagePlugin 

223 

224 if thumbnail_offset is None and isinstance( 

225 im, TiffImagePlugin.TiffImageFile 

226 ): 

227 im._frame_pos = [ifd_offset] 

228 im._seek(0) 

229 im.load() 

230 child_images.append(im) 

231 

232 if offset is not None: 

233 assert self.fp is not None 

234 self.fp.seek(offset) 

235 return child_images 

236 

237 def get_format_mimetype(self) -> str | None: 

238 if self.custom_mimetype: 

239 return self.custom_mimetype 

240 if self.format is not None: 

241 return Image.MIME.get(self.format.upper()) 

242 return None 

243 

244 def __getstate__(self) -> list[Any]: 

245 return super().__getstate__() + [self.filename] 

246 

247 def __setstate__(self, state: list[Any]) -> None: 

248 self.tile = [] 

249 if len(state) > 5: 

250 self.filename = state[5] 

251 super().__setstate__(state) 

252 

253 def verify(self) -> None: 

254 """Check file integrity""" 

255 

256 # raise exception if something's wrong. must be called 

257 # directly after open, and closes file when finished. 

258 if self._exclusive_fp: 

259 self.fp.close() 

260 self.fp = None 

261 

262 def load(self) -> Image.core.PixelAccess | None: 

263 """Load image data based on tile list""" 

264 

265 if not self.tile and self._im is None: 

266 msg = "cannot load this image" 

267 raise OSError(msg) 

268 

269 pixel = Image.Image.load(self) 

270 if not self.tile: 

271 return pixel 

272 

273 self.map: mmap.mmap | None = None 

274 use_mmap = self.filename and len(self.tile) == 1 

275 

276 readonly = 0 

277 

278 # look for read/seek overrides 

279 if hasattr(self, "load_read"): 

280 read = self.load_read 

281 # don't use mmap if there are custom read/seek functions 

282 use_mmap = False 

283 else: 

284 read = self.fp.read 

285 

286 if hasattr(self, "load_seek"): 

287 seek = self.load_seek 

288 use_mmap = False 

289 else: 

290 seek = self.fp.seek 

291 

292 if use_mmap: 

293 # try memory mapping 

294 decoder_name, extents, offset, args = self.tile[0] 

295 if isinstance(args, str): 

296 args = (args, 0, 1) 

297 if ( 

298 decoder_name == "raw" 

299 and isinstance(args, tuple) 

300 and len(args) >= 3 

301 and args[0] == self.mode 

302 and args[0] in Image._MAPMODES 

303 ): 

304 try: 

305 # use mmap, if possible 

306 import mmap 

307 

308 with open(self.filename) as fp: 

309 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

310 if offset + self.size[1] * args[1] > self.map.size(): 

311 msg = "buffer is not large enough" 

312 raise OSError(msg) 

313 self.im = Image.core.map_buffer( 

314 self.map, self.size, decoder_name, offset, args 

315 ) 

316 readonly = 1 

317 # After trashing self.im, 

318 # we might need to reload the palette data. 

319 if self.palette: 

320 self.palette.dirty = 1 

321 except (AttributeError, OSError, ImportError): 

322 self.map = None 

323 

324 self.load_prepare() 

325 err_code = -3 # initialize to unknown error 

326 if not self.map: 

327 # sort tiles in file order 

328 self.tile.sort(key=_tilesort) 

329 

330 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

331 prefix = getattr(self, "tile_prefix", b"") 

332 

333 # Remove consecutive duplicates that only differ by their offset 

334 self.tile = [ 

335 list(tiles)[-1] 

336 for _, tiles in itertools.groupby( 

337 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

338 ) 

339 ] 

340 for i, (decoder_name, extents, offset, args) in enumerate(self.tile): 

341 seek(offset) 

342 decoder = Image._getdecoder( 

343 self.mode, decoder_name, args, self.decoderconfig 

344 ) 

345 try: 

346 decoder.setimage(self.im, extents) 

347 if decoder.pulls_fd: 

348 decoder.setfd(self.fp) 

349 err_code = decoder.decode(b"")[1] 

350 else: 

351 b = prefix 

352 while True: 

353 read_bytes = self.decodermaxblock 

354 if i + 1 < len(self.tile): 

355 next_offset = self.tile[i + 1].offset 

356 if next_offset > offset: 

357 read_bytes = next_offset - offset 

358 try: 

359 s = read(read_bytes) 

360 except (IndexError, struct.error) as e: 

361 # truncated png/gif 

362 if LOAD_TRUNCATED_IMAGES: 

363 break 

364 else: 

365 msg = "image file is truncated" 

366 raise OSError(msg) from e 

367 

368 if not s: # truncated jpeg 

369 if LOAD_TRUNCATED_IMAGES: 

370 break 

371 else: 

372 msg = ( 

373 "image file is truncated " 

374 f"({len(b)} bytes not processed)" 

375 ) 

376 raise OSError(msg) 

377 

378 b = b + s 

379 n, err_code = decoder.decode(b) 

380 if n < 0: 

381 break 

382 b = b[n:] 

383 finally: 

384 # Need to cleanup here to prevent leaks 

385 decoder.cleanup() 

386 

387 self.tile = [] 

388 self.readonly = readonly 

389 

390 self.load_end() 

391 

392 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

393 self.fp.close() 

394 self.fp = None 

395 

396 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

397 # still raised if decoder fails to return anything 

398 raise _get_oserror(err_code, encoder=False) 

399 

400 return Image.Image.load(self) 

401 

402 def load_prepare(self) -> None: 

403 # create image memory if necessary 

404 if self._im is None: 

405 self.im = Image.core.new(self.mode, self.size) 

406 # create palette (optional) 

407 if self.mode == "P": 

408 Image.Image.load(self) 

409 

410 def load_end(self) -> None: 

411 # may be overridden 

412 pass 

413 

414 # may be defined for contained formats 

415 # def load_seek(self, pos: int) -> None: 

416 # pass 

417 

418 # may be defined for blocked formats (e.g. PNG) 

419 # def load_read(self, read_bytes: int) -> bytes: 

420 # pass 

421 

422 def _seek_check(self, frame: int) -> bool: 

423 if ( 

424 frame < self._min_frame 

425 # Only check upper limit on frames if additional seek operations 

426 # are not required to do so 

427 or ( 

428 not (hasattr(self, "_n_frames") and self._n_frames is None) 

429 and frame >= getattr(self, "n_frames") + self._min_frame 

430 ) 

431 ): 

432 msg = "attempt to seek outside sequence" 

433 raise EOFError(msg) 

434 

435 return self.tell() != frame 

436 

437 

438class StubHandler(abc.ABC): 

439 def open(self, im: StubImageFile) -> None: 

440 pass 

441 

442 @abc.abstractmethod 

443 def load(self, im: StubImageFile) -> Image.Image: 

444 pass 

445 

446 

447class StubImageFile(ImageFile, metaclass=abc.ABCMeta): 

448 """ 

449 Base class for stub image loaders. 

450 

451 A stub loader is an image loader that can identify files of a 

452 certain format, but relies on external code to load the file. 

453 """ 

454 

455 @abc.abstractmethod 

456 def _open(self) -> None: 

457 pass 

458 

459 def load(self) -> Image.core.PixelAccess | None: 

460 loader = self._load() 

461 if loader is None: 

462 msg = f"cannot find loader for this {self.format} file" 

463 raise OSError(msg) 

464 image = loader.load(self) 

465 assert image is not None 

466 # become the other object (!) 

467 self.__class__ = image.__class__ # type: ignore[assignment] 

468 self.__dict__ = image.__dict__ 

469 return image.load() 

470 

471 @abc.abstractmethod 

472 def _load(self) -> StubHandler | None: 

473 """(Hook) Find actual image loader.""" 

474 pass 

475 

476 

477class Parser: 

478 """ 

479 Incremental image parser. This class implements the standard 

480 feed/close consumer interface. 

481 """ 

482 

483 incremental = None 

484 image: Image.Image | None = None 

485 data: bytes | None = None 

486 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

487 offset = 0 

488 finished = 0 

489 

490 def reset(self) -> None: 

491 """ 

492 (Consumer) Reset the parser. Note that you can only call this 

493 method immediately after you've created a parser; parser 

494 instances cannot be reused. 

495 """ 

496 assert self.data is None, "cannot reuse parsers" 

497 

498 def feed(self, data: bytes) -> None: 

499 """ 

500 (Consumer) Feed data to the parser. 

501 

502 :param data: A string buffer. 

503 :exception OSError: If the parser failed to parse the image file. 

504 """ 

505 # collect data 

506 

507 if self.finished: 

508 return 

509 

510 if self.data is None: 

511 self.data = data 

512 else: 

513 self.data = self.data + data 

514 

515 # parse what we have 

516 if self.decoder: 

517 if self.offset > 0: 

518 # skip header 

519 skip = min(len(self.data), self.offset) 

520 self.data = self.data[skip:] 

521 self.offset = self.offset - skip 

522 if self.offset > 0 or not self.data: 

523 return 

524 

525 n, e = self.decoder.decode(self.data) 

526 

527 if n < 0: 

528 # end of stream 

529 self.data = None 

530 self.finished = 1 

531 if e < 0: 

532 # decoding error 

533 self.image = None 

534 raise _get_oserror(e, encoder=False) 

535 else: 

536 # end of image 

537 return 

538 self.data = self.data[n:] 

539 

540 elif self.image: 

541 # if we end up here with no decoder, this file cannot 

542 # be incrementally parsed. wait until we've gotten all 

543 # available data 

544 pass 

545 

546 else: 

547 # attempt to open this file 

548 try: 

549 with io.BytesIO(self.data) as fp: 

550 im = Image.open(fp) 

551 except OSError: 

552 pass # not enough data 

553 else: 

554 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

555 if flag or len(im.tile) != 1: 

556 # custom load code, or multiple tiles 

557 self.decode = None 

558 else: 

559 # initialize decoder 

560 im.load_prepare() 

561 d, e, o, a = im.tile[0] 

562 im.tile = [] 

563 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

564 self.decoder.setimage(im.im, e) 

565 

566 # calculate decoder offset 

567 self.offset = o 

568 if self.offset <= len(self.data): 

569 self.data = self.data[self.offset :] 

570 self.offset = 0 

571 

572 self.image = im 

573 

574 def __enter__(self) -> Parser: 

575 return self 

576 

577 def __exit__(self, *args: object) -> None: 

578 self.close() 

579 

580 def close(self) -> Image.Image: 

581 """ 

582 (Consumer) Close the stream. 

583 

584 :returns: An image object. 

585 :exception OSError: If the parser failed to parse the image file either 

586 because it cannot be identified or cannot be 

587 decoded. 

588 """ 

589 # finish decoding 

590 if self.decoder: 

591 # get rid of what's left in the buffers 

592 self.feed(b"") 

593 self.data = self.decoder = None 

594 if not self.finished: 

595 msg = "image was incomplete" 

596 raise OSError(msg) 

597 if not self.image: 

598 msg = "cannot parse this image" 

599 raise OSError(msg) 

600 if self.data: 

601 # incremental parsing not possible; reopen the file 

602 # not that we have all data 

603 with io.BytesIO(self.data) as fp: 

604 try: 

605 self.image = Image.open(fp) 

606 finally: 

607 self.image.load() 

608 return self.image 

609 

610 

611# -------------------------------------------------------------------- 

612 

613 

614def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

615 """Helper to save image based on tile list 

616 

617 :param im: Image object. 

618 :param fp: File object. 

619 :param tile: Tile list. 

620 :param bufsize: Optional buffer size 

621 """ 

622 

623 im.load() 

624 if not hasattr(im, "encoderconfig"): 

625 im.encoderconfig = () 

626 tile.sort(key=_tilesort) 

627 # FIXME: make MAXBLOCK a configuration parameter 

628 # It would be great if we could have the encoder specify what it needs 

629 # But, it would need at least the image size in most cases. RawEncode is 

630 # a tricky case. 

631 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

632 try: 

633 fh = fp.fileno() 

634 fp.flush() 

635 _encode_tile(im, fp, tile, bufsize, fh) 

636 except (AttributeError, io.UnsupportedOperation) as exc: 

637 _encode_tile(im, fp, tile, bufsize, None, exc) 

638 if hasattr(fp, "flush"): 

639 fp.flush() 

640 

641 

642def _encode_tile( 

643 im: Image.Image, 

644 fp: IO[bytes], 

645 tile: list[_Tile], 

646 bufsize: int, 

647 fh: int | None, 

648 exc: BaseException | None = None, 

649) -> None: 

650 for encoder_name, extents, offset, args in tile: 

651 if offset > 0: 

652 fp.seek(offset) 

653 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

654 try: 

655 encoder.setimage(im.im, extents) 

656 if encoder.pushes_fd: 

657 encoder.setfd(fp) 

658 errcode = encoder.encode_to_pyfd()[1] 

659 else: 

660 if exc: 

661 # compress to Python file-compatible object 

662 while True: 

663 errcode, data = encoder.encode(bufsize)[1:] 

664 fp.write(data) 

665 if errcode: 

666 break 

667 else: 

668 # slight speedup: compress to real file object 

669 assert fh is not None 

670 errcode = encoder.encode_to_file(fh, bufsize) 

671 if errcode < 0: 

672 raise _get_oserror(errcode, encoder=True) from exc 

673 finally: 

674 encoder.cleanup() 

675 

676 

677def _safe_read(fp: IO[bytes], size: int) -> bytes: 

678 """ 

679 Reads large blocks in a safe way. Unlike fp.read(n), this function 

680 doesn't trust the user. If the requested size is larger than 

681 SAFEBLOCK, the file is read block by block. 

682 

683 :param fp: File handle. Must implement a <b>read</b> method. 

684 :param size: Number of bytes to read. 

685 :returns: A string containing <i>size</i> bytes of data. 

686 

687 Raises an OSError if the file is truncated and the read cannot be completed 

688 

689 """ 

690 if size <= 0: 

691 return b"" 

692 if size <= SAFEBLOCK: 

693 data = fp.read(size) 

694 if len(data) < size: 

695 msg = "Truncated File Read" 

696 raise OSError(msg) 

697 return data 

698 blocks: list[bytes] = [] 

699 remaining_size = size 

700 while remaining_size > 0: 

701 block = fp.read(min(remaining_size, SAFEBLOCK)) 

702 if not block: 

703 break 

704 blocks.append(block) 

705 remaining_size -= len(block) 

706 if sum(len(block) for block in blocks) < size: 

707 msg = "Truncated File Read" 

708 raise OSError(msg) 

709 return b"".join(blocks) 

710 

711 

712class PyCodecState: 

713 def __init__(self) -> None: 

714 self.xsize = 0 

715 self.ysize = 0 

716 self.xoff = 0 

717 self.yoff = 0 

718 

719 def extents(self) -> tuple[int, int, int, int]: 

720 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

721 

722 

723class PyCodec: 

724 fd: IO[bytes] | None 

725 

726 def __init__(self, mode: str, *args: Any) -> None: 

727 self.im: Image.core.ImagingCore | None = None 

728 self.state = PyCodecState() 

729 self.fd = None 

730 self.mode = mode 

731 self.init(args) 

732 

733 def init(self, args: tuple[Any, ...]) -> None: 

734 """ 

735 Override to perform codec specific initialization 

736 

737 :param args: Tuple of arg items from the tile entry 

738 :returns: None 

739 """ 

740 self.args = args 

741 

742 def cleanup(self) -> None: 

743 """ 

744 Override to perform codec specific cleanup 

745 

746 :returns: None 

747 """ 

748 pass 

749 

750 def setfd(self, fd: IO[bytes]) -> None: 

751 """ 

752 Called from ImageFile to set the Python file-like object 

753 

754 :param fd: A Python file-like object 

755 :returns: None 

756 """ 

757 self.fd = fd 

758 

759 def setimage( 

760 self, 

761 im: Image.core.ImagingCore, 

762 extents: tuple[int, int, int, int] | None = None, 

763 ) -> None: 

764 """ 

765 Called from ImageFile to set the core output image for the codec 

766 

767 :param im: A core image object 

768 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

769 for this tile 

770 :returns: None 

771 """ 

772 

773 # following c code 

774 self.im = im 

775 

776 if extents: 

777 (x0, y0, x1, y1) = extents 

778 else: 

779 (x0, y0, x1, y1) = (0, 0, 0, 0) 

780 

781 if x0 == 0 and x1 == 0: 

782 self.state.xsize, self.state.ysize = self.im.size 

783 else: 

784 self.state.xoff = x0 

785 self.state.yoff = y0 

786 self.state.xsize = x1 - x0 

787 self.state.ysize = y1 - y0 

788 

789 if self.state.xsize <= 0 or self.state.ysize <= 0: 

790 msg = "Size cannot be negative" 

791 raise ValueError(msg) 

792 

793 if ( 

794 self.state.xsize + self.state.xoff > self.im.size[0] 

795 or self.state.ysize + self.state.yoff > self.im.size[1] 

796 ): 

797 msg = "Tile cannot extend outside image" 

798 raise ValueError(msg) 

799 

800 

801class PyDecoder(PyCodec): 

802 """ 

803 Python implementation of a format decoder. Override this class and 

804 add the decoding logic in the :meth:`decode` method. 

805 

806 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

807 """ 

808 

809 _pulls_fd = False 

810 

811 @property 

812 def pulls_fd(self) -> bool: 

813 return self._pulls_fd 

814 

815 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

816 """ 

817 Override to perform the decoding process. 

818 

819 :param buffer: A bytes object with the data to be decoded. 

820 :returns: A tuple of ``(bytes consumed, errcode)``. 

821 If finished with decoding return -1 for the bytes consumed. 

822 Err codes are from :data:`.ImageFile.ERRORS`. 

823 """ 

824 msg = "unavailable in base decoder" 

825 raise NotImplementedError(msg) 

826 

827 def set_as_raw( 

828 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

829 ) -> None: 

830 """ 

831 Convenience method to set the internal image from a stream of raw data 

832 

833 :param data: Bytes to be set 

834 :param rawmode: The rawmode to be used for the decoder. 

835 If not specified, it will default to the mode of the image 

836 :param extra: Extra arguments for the decoder. 

837 :returns: None 

838 """ 

839 

840 if not rawmode: 

841 rawmode = self.mode 

842 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

843 assert self.im is not None 

844 d.setimage(self.im, self.state.extents()) 

845 s = d.decode(data) 

846 

847 if s[0] >= 0: 

848 msg = "not enough image data" 

849 raise ValueError(msg) 

850 if s[1] != 0: 

851 msg = "cannot decode image data" 

852 raise ValueError(msg) 

853 

854 

855class PyEncoder(PyCodec): 

856 """ 

857 Python implementation of a format encoder. Override this class and 

858 add the decoding logic in the :meth:`encode` method. 

859 

860 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

861 """ 

862 

863 _pushes_fd = False 

864 

865 @property 

866 def pushes_fd(self) -> bool: 

867 return self._pushes_fd 

868 

869 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

870 """ 

871 Override to perform the encoding process. 

872 

873 :param bufsize: Buffer size. 

874 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

875 If finished with encoding return 1 for the error code. 

876 Err codes are from :data:`.ImageFile.ERRORS`. 

877 """ 

878 msg = "unavailable in base encoder" 

879 raise NotImplementedError(msg) 

880 

881 def encode_to_pyfd(self) -> tuple[int, int]: 

882 """ 

883 If ``pushes_fd`` is ``True``, then this method will be used, 

884 and ``encode()`` will only be called once. 

885 

886 :returns: A tuple of ``(bytes consumed, errcode)``. 

887 Err codes are from :data:`.ImageFile.ERRORS`. 

888 """ 

889 if not self.pushes_fd: 

890 return 0, -8 # bad configuration 

891 bytes_consumed, errcode, data = self.encode(0) 

892 if data: 

893 assert self.fd is not None 

894 self.fd.write(data) 

895 return bytes_consumed, errcode 

896 

897 def encode_to_file(self, fh: int, bufsize: int) -> int: 

898 """ 

899 :param fh: File handle. 

900 :param bufsize: Buffer size. 

901 

902 :returns: If finished successfully, return 0. 

903 Otherwise, return an error code. Err codes are from 

904 :data:`.ImageFile.ERRORS`. 

905 """ 

906 errcode = 0 

907 while errcode == 0: 

908 status, errcode, buf = self.encode(bufsize) 

909 if status > 0: 

910 os.write(fh, buf[status:]) 

911 return errcode