Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# The Python Imaging Library.
3# $Id$
4#
5# base class for image file handlers
6#
7# history:
8# 1995-09-09 fl Created
9# 1996-03-11 fl Fixed load mechanism.
10# 1996-04-15 fl Added pcx/xbm decoders.
11# 1996-04-30 fl Added encoders.
12# 1996-12-14 fl Added load helpers
13# 1997-01-11 fl Use encode_to_file where possible
14# 1997-08-27 fl Flush output in _save
15# 1998-03-05 fl Use memory mapping for some modes
16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B"
17# 1999-05-31 fl Added image parser
18# 2000-10-12 fl Set readonly flag on memory-mapped images
19# 2002-03-20 fl Use better messages for common decoder errors
20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available
21# 2003-10-30 fl Added StubImageFile class
22# 2004-02-25 fl Made incremental parser more robust
23#
24# Copyright (c) 1997-2004 by Secret Labs AB
25# Copyright (c) 1995-2004 by Fredrik Lundh
26#
27# See the README file for information on usage and redistribution.
28#
29from __future__ import annotations
31import abc
32import io
33import itertools
34import logging
35import os
36import struct
37from typing import IO, Any, NamedTuple, cast
39from . import ExifTags, Image
40from ._deprecate import deprecate
41from ._util import DeferredError, is_path
43TYPE_CHECKING = False
44if TYPE_CHECKING:
45 from ._typing import StrOrBytesPath
47logger = logging.getLogger(__name__)
49MAXBLOCK = 65536
51SAFEBLOCK = 1024 * 1024
53LOAD_TRUNCATED_IMAGES = False
54"""Whether or not to load truncated image files. User code may change this."""
56ERRORS = {
57 -1: "image buffer overrun error",
58 -2: "decoding error",
59 -3: "unknown error",
60 -8: "bad configuration",
61 -9: "out of memory error",
62}
63"""
64Dict of known error codes returned from :meth:`.PyDecoder.decode`,
65:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and
66:meth:`.PyEncoder.encode_to_file`.
67"""
70#
71# --------------------------------------------------------------------
72# Helpers
75def _get_oserror(error: int, *, encoder: bool) -> OSError:
76 try:
77 msg = Image.core.getcodecstatus(error)
78 except AttributeError:
79 msg = ERRORS.get(error)
80 if not msg:
81 msg = f"{'encoder' if encoder else 'decoder'} error {error}"
82 msg += f" when {'writing' if encoder else 'reading'} image file"
83 return OSError(msg)
86def raise_oserror(error: int) -> OSError:
87 deprecate(
88 "raise_oserror",
89 12,
90 action="It is only useful for translating error codes returned by a codec's "
91 "decode() method, which ImageFile already does automatically.",
92 )
93 raise _get_oserror(error, encoder=False)
96def _tilesort(t: _Tile) -> int:
97 # sort on offset
98 return t[2]
101class _Tile(NamedTuple):
102 codec_name: str
103 extents: tuple[int, int, int, int] | None
104 offset: int = 0
105 args: tuple[Any, ...] | str | None = None
108#
109# --------------------------------------------------------------------
110# ImageFile base class
113class ImageFile(Image.Image):
114 """Base class for image file format handlers."""
116 def __init__(
117 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None
118 ) -> None:
119 super().__init__()
121 self._min_frame = 0
123 self.custom_mimetype: str | None = None
125 self.tile: list[_Tile] = []
126 """ A list of tile descriptors """
128 self.readonly = 1 # until we know better
130 self.decoderconfig: tuple[Any, ...] = ()
131 self.decodermaxblock = MAXBLOCK
133 if is_path(fp):
134 # filename
135 self.fp = open(fp, "rb")
136 self.filename = os.fspath(fp)
137 self._exclusive_fp = True
138 else:
139 # stream
140 self.fp = cast(IO[bytes], fp)
141 self.filename = filename if filename is not None else ""
142 # can be overridden
143 self._exclusive_fp = False
145 try:
146 try:
147 self._open()
148 except (
149 IndexError, # end of data
150 TypeError, # end of data (ord)
151 KeyError, # unsupported mode
152 EOFError, # got header but not the first frame
153 struct.error,
154 ) as v:
155 raise SyntaxError(v) from v
157 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0:
158 msg = "not identified by this driver"
159 raise SyntaxError(msg)
160 except BaseException:
161 # close the file only if we have opened it this constructor
162 if self._exclusive_fp:
163 self.fp.close()
164 raise
166 def _open(self) -> None:
167 pass
169 def _close_fp(self):
170 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError):
171 if self._fp != self.fp:
172 self._fp.close()
173 self._fp = DeferredError(ValueError("Operation on closed image"))
174 if self.fp:
175 self.fp.close()
177 def close(self) -> None:
178 """
179 Closes the file pointer, if possible.
181 This operation will destroy the image core and release its memory.
182 The image data will be unusable afterward.
184 This function is required to close images that have multiple frames or
185 have not had their file read and closed by the
186 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for
187 more information.
188 """
189 try:
190 self._close_fp()
191 self.fp = None
192 except Exception as msg:
193 logger.debug("Error closing: %s", msg)
195 super().close()
197 def get_child_images(self) -> list[ImageFile]:
198 child_images = []
199 exif = self.getexif()
200 ifds = []
201 if ExifTags.Base.SubIFDs in exif:
202 subifd_offsets = exif[ExifTags.Base.SubIFDs]
203 if subifd_offsets:
204 if not isinstance(subifd_offsets, tuple):
205 subifd_offsets = (subifd_offsets,)
206 for subifd_offset in subifd_offsets:
207 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset))
208 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1)
209 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset):
210 assert exif._info is not None
211 ifds.append((ifd1, exif._info.next))
213 offset = None
214 for ifd, ifd_offset in ifds:
215 assert self.fp is not None
216 current_offset = self.fp.tell()
217 if offset is None:
218 offset = current_offset
220 fp = self.fp
221 if ifd is not None:
222 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset)
223 if thumbnail_offset is not None:
224 thumbnail_offset += getattr(self, "_exif_offset", 0)
225 self.fp.seek(thumbnail_offset)
227 length = ifd.get(ExifTags.Base.JpegIFByteCount)
228 assert isinstance(length, int)
229 data = self.fp.read(length)
230 fp = io.BytesIO(data)
232 with Image.open(fp) as im:
233 from . import TiffImagePlugin
235 if thumbnail_offset is None and isinstance(
236 im, TiffImagePlugin.TiffImageFile
237 ):
238 im._frame_pos = [ifd_offset]
239 im._seek(0)
240 im.load()
241 child_images.append(im)
243 if offset is not None:
244 assert self.fp is not None
245 self.fp.seek(offset)
246 return child_images
248 def get_format_mimetype(self) -> str | None:
249 if self.custom_mimetype:
250 return self.custom_mimetype
251 if self.format is not None:
252 return Image.MIME.get(self.format.upper())
253 return None
255 def __getstate__(self) -> list[Any]:
256 return super().__getstate__() + [self.filename]
258 def __setstate__(self, state: list[Any]) -> None:
259 self.tile = []
260 if len(state) > 5:
261 self.filename = state[5]
262 super().__setstate__(state)
264 def verify(self) -> None:
265 """Check file integrity"""
267 # raise exception if something's wrong. must be called
268 # directly after open, and closes file when finished.
269 if self._exclusive_fp:
270 self.fp.close()
271 self.fp = None
273 def load(self) -> Image.core.PixelAccess | None:
274 """Load image data based on tile list"""
276 if not self.tile and self._im is None:
277 msg = "cannot load this image"
278 raise OSError(msg)
280 pixel = Image.Image.load(self)
281 if not self.tile:
282 return pixel
284 self.map: mmap.mmap | None = None
285 use_mmap = self.filename and len(self.tile) == 1
287 readonly = 0
289 # look for read/seek overrides
290 if hasattr(self, "load_read"):
291 read = self.load_read
292 # don't use mmap if there are custom read/seek functions
293 use_mmap = False
294 else:
295 read = self.fp.read
297 if hasattr(self, "load_seek"):
298 seek = self.load_seek
299 use_mmap = False
300 else:
301 seek = self.fp.seek
303 if use_mmap:
304 # try memory mapping
305 decoder_name, extents, offset, args = self.tile[0]
306 if isinstance(args, str):
307 args = (args, 0, 1)
308 if (
309 decoder_name == "raw"
310 and isinstance(args, tuple)
311 and len(args) >= 3
312 and args[0] == self.mode
313 and args[0] in Image._MAPMODES
314 ):
315 try:
316 # use mmap, if possible
317 import mmap
319 with open(self.filename) as fp:
320 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
321 if offset + self.size[1] * args[1] > self.map.size():
322 msg = "buffer is not large enough"
323 raise OSError(msg)
324 self.im = Image.core.map_buffer(
325 self.map, self.size, decoder_name, offset, args
326 )
327 readonly = 1
328 # After trashing self.im,
329 # we might need to reload the palette data.
330 if self.palette:
331 self.palette.dirty = 1
332 except (AttributeError, OSError, ImportError):
333 self.map = None
335 self.load_prepare()
336 err_code = -3 # initialize to unknown error
337 if not self.map:
338 # sort tiles in file order
339 self.tile.sort(key=_tilesort)
341 # FIXME: This is a hack to handle TIFF's JpegTables tag.
342 prefix = getattr(self, "tile_prefix", b"")
344 # Remove consecutive duplicates that only differ by their offset
345 self.tile = [
346 list(tiles)[-1]
347 for _, tiles in itertools.groupby(
348 self.tile, lambda tile: (tile[0], tile[1], tile[3])
349 )
350 ]
351 for i, (decoder_name, extents, offset, args) in enumerate(self.tile):
352 seek(offset)
353 decoder = Image._getdecoder(
354 self.mode, decoder_name, args, self.decoderconfig
355 )
356 try:
357 decoder.setimage(self.im, extents)
358 if decoder.pulls_fd:
359 decoder.setfd(self.fp)
360 err_code = decoder.decode(b"")[1]
361 else:
362 b = prefix
363 while True:
364 read_bytes = self.decodermaxblock
365 if i + 1 < len(self.tile):
366 next_offset = self.tile[i + 1].offset
367 if next_offset > offset:
368 read_bytes = next_offset - offset
369 try:
370 s = read(read_bytes)
371 except (IndexError, struct.error) as e:
372 # truncated png/gif
373 if LOAD_TRUNCATED_IMAGES:
374 break
375 else:
376 msg = "image file is truncated"
377 raise OSError(msg) from e
379 if not s: # truncated jpeg
380 if LOAD_TRUNCATED_IMAGES:
381 break
382 else:
383 msg = (
384 "image file is truncated "
385 f"({len(b)} bytes not processed)"
386 )
387 raise OSError(msg)
389 b = b + s
390 n, err_code = decoder.decode(b)
391 if n < 0:
392 break
393 b = b[n:]
394 finally:
395 # Need to cleanup here to prevent leaks
396 decoder.cleanup()
398 self.tile = []
399 self.readonly = readonly
401 self.load_end()
403 if self._exclusive_fp and self._close_exclusive_fp_after_loading:
404 self.fp.close()
405 self.fp = None
407 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0:
408 # still raised if decoder fails to return anything
409 raise _get_oserror(err_code, encoder=False)
411 return Image.Image.load(self)
413 def load_prepare(self) -> None:
414 # create image memory if necessary
415 if self._im is None:
416 self.im = Image.core.new(self.mode, self.size)
417 # create palette (optional)
418 if self.mode == "P":
419 Image.Image.load(self)
421 def load_end(self) -> None:
422 # may be overridden
423 pass
425 # may be defined for contained formats
426 # def load_seek(self, pos: int) -> None:
427 # pass
429 # may be defined for blocked formats (e.g. PNG)
430 # def load_read(self, read_bytes: int) -> bytes:
431 # pass
433 def _seek_check(self, frame: int) -> bool:
434 if (
435 frame < self._min_frame
436 # Only check upper limit on frames if additional seek operations
437 # are not required to do so
438 or (
439 not (hasattr(self, "_n_frames") and self._n_frames is None)
440 and frame >= getattr(self, "n_frames") + self._min_frame
441 )
442 ):
443 msg = "attempt to seek outside sequence"
444 raise EOFError(msg)
446 return self.tell() != frame
449class StubHandler(abc.ABC):
450 def open(self, im: StubImageFile) -> None:
451 pass
453 @abc.abstractmethod
454 def load(self, im: StubImageFile) -> Image.Image:
455 pass
458class StubImageFile(ImageFile, metaclass=abc.ABCMeta):
459 """
460 Base class for stub image loaders.
462 A stub loader is an image loader that can identify files of a
463 certain format, but relies on external code to load the file.
464 """
466 @abc.abstractmethod
467 def _open(self) -> None:
468 pass
470 def load(self) -> Image.core.PixelAccess | None:
471 loader = self._load()
472 if loader is None:
473 msg = f"cannot find loader for this {self.format} file"
474 raise OSError(msg)
475 image = loader.load(self)
476 assert image is not None
477 # become the other object (!)
478 self.__class__ = image.__class__ # type: ignore[assignment]
479 self.__dict__ = image.__dict__
480 return image.load()
482 @abc.abstractmethod
483 def _load(self) -> StubHandler | None:
484 """(Hook) Find actual image loader."""
485 pass
488class Parser:
489 """
490 Incremental image parser. This class implements the standard
491 feed/close consumer interface.
492 """
494 incremental = None
495 image: Image.Image | None = None
496 data: bytes | None = None
497 decoder: Image.core.ImagingDecoder | PyDecoder | None = None
498 offset = 0
499 finished = 0
501 def reset(self) -> None:
502 """
503 (Consumer) Reset the parser. Note that you can only call this
504 method immediately after you've created a parser; parser
505 instances cannot be reused.
506 """
507 assert self.data is None, "cannot reuse parsers"
509 def feed(self, data: bytes) -> None:
510 """
511 (Consumer) Feed data to the parser.
513 :param data: A string buffer.
514 :exception OSError: If the parser failed to parse the image file.
515 """
516 # collect data
518 if self.finished:
519 return
521 if self.data is None:
522 self.data = data
523 else:
524 self.data = self.data + data
526 # parse what we have
527 if self.decoder:
528 if self.offset > 0:
529 # skip header
530 skip = min(len(self.data), self.offset)
531 self.data = self.data[skip:]
532 self.offset = self.offset - skip
533 if self.offset > 0 or not self.data:
534 return
536 n, e = self.decoder.decode(self.data)
538 if n < 0:
539 # end of stream
540 self.data = None
541 self.finished = 1
542 if e < 0:
543 # decoding error
544 self.image = None
545 raise _get_oserror(e, encoder=False)
546 else:
547 # end of image
548 return
549 self.data = self.data[n:]
551 elif self.image:
552 # if we end up here with no decoder, this file cannot
553 # be incrementally parsed. wait until we've gotten all
554 # available data
555 pass
557 else:
558 # attempt to open this file
559 try:
560 with io.BytesIO(self.data) as fp:
561 im = Image.open(fp)
562 except OSError:
563 pass # not enough data
564 else:
565 flag = hasattr(im, "load_seek") or hasattr(im, "load_read")
566 if flag or len(im.tile) != 1:
567 # custom load code, or multiple tiles
568 self.decode = None
569 else:
570 # initialize decoder
571 im.load_prepare()
572 d, e, o, a = im.tile[0]
573 im.tile = []
574 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig)
575 self.decoder.setimage(im.im, e)
577 # calculate decoder offset
578 self.offset = o
579 if self.offset <= len(self.data):
580 self.data = self.data[self.offset :]
581 self.offset = 0
583 self.image = im
585 def __enter__(self) -> Parser:
586 return self
588 def __exit__(self, *args: object) -> None:
589 self.close()
591 def close(self) -> Image.Image:
592 """
593 (Consumer) Close the stream.
595 :returns: An image object.
596 :exception OSError: If the parser failed to parse the image file either
597 because it cannot be identified or cannot be
598 decoded.
599 """
600 # finish decoding
601 if self.decoder:
602 # get rid of what's left in the buffers
603 self.feed(b"")
604 self.data = self.decoder = None
605 if not self.finished:
606 msg = "image was incomplete"
607 raise OSError(msg)
608 if not self.image:
609 msg = "cannot parse this image"
610 raise OSError(msg)
611 if self.data:
612 # incremental parsing not possible; reopen the file
613 # not that we have all data
614 with io.BytesIO(self.data) as fp:
615 try:
616 self.image = Image.open(fp)
617 finally:
618 self.image.load()
619 return self.image
622# --------------------------------------------------------------------
625def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None:
626 """Helper to save image based on tile list
628 :param im: Image object.
629 :param fp: File object.
630 :param tile: Tile list.
631 :param bufsize: Optional buffer size
632 """
634 im.load()
635 if not hasattr(im, "encoderconfig"):
636 im.encoderconfig = ()
637 tile.sort(key=_tilesort)
638 # FIXME: make MAXBLOCK a configuration parameter
639 # It would be great if we could have the encoder specify what it needs
640 # But, it would need at least the image size in most cases. RawEncode is
641 # a tricky case.
642 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c
643 try:
644 fh = fp.fileno()
645 fp.flush()
646 _encode_tile(im, fp, tile, bufsize, fh)
647 except (AttributeError, io.UnsupportedOperation) as exc:
648 _encode_tile(im, fp, tile, bufsize, None, exc)
649 if hasattr(fp, "flush"):
650 fp.flush()
653def _encode_tile(
654 im: Image.Image,
655 fp: IO[bytes],
656 tile: list[_Tile],
657 bufsize: int,
658 fh: int | None,
659 exc: BaseException | None = None,
660) -> None:
661 for encoder_name, extents, offset, args in tile:
662 if offset > 0:
663 fp.seek(offset)
664 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig)
665 try:
666 encoder.setimage(im.im, extents)
667 if encoder.pushes_fd:
668 encoder.setfd(fp)
669 errcode = encoder.encode_to_pyfd()[1]
670 else:
671 if exc:
672 # compress to Python file-compatible object
673 while True:
674 errcode, data = encoder.encode(bufsize)[1:]
675 fp.write(data)
676 if errcode:
677 break
678 else:
679 # slight speedup: compress to real file object
680 assert fh is not None
681 errcode = encoder.encode_to_file(fh, bufsize)
682 if errcode < 0:
683 raise _get_oserror(errcode, encoder=True) from exc
684 finally:
685 encoder.cleanup()
688def _safe_read(fp: IO[bytes], size: int) -> bytes:
689 """
690 Reads large blocks in a safe way. Unlike fp.read(n), this function
691 doesn't trust the user. If the requested size is larger than
692 SAFEBLOCK, the file is read block by block.
694 :param fp: File handle. Must implement a <b>read</b> method.
695 :param size: Number of bytes to read.
696 :returns: A string containing <i>size</i> bytes of data.
698 Raises an OSError if the file is truncated and the read cannot be completed
700 """
701 if size <= 0:
702 return b""
703 if size <= SAFEBLOCK:
704 data = fp.read(size)
705 if len(data) < size:
706 msg = "Truncated File Read"
707 raise OSError(msg)
708 return data
709 blocks: list[bytes] = []
710 remaining_size = size
711 while remaining_size > 0:
712 block = fp.read(min(remaining_size, SAFEBLOCK))
713 if not block:
714 break
715 blocks.append(block)
716 remaining_size -= len(block)
717 if sum(len(block) for block in blocks) < size:
718 msg = "Truncated File Read"
719 raise OSError(msg)
720 return b"".join(blocks)
723class PyCodecState:
724 def __init__(self) -> None:
725 self.xsize = 0
726 self.ysize = 0
727 self.xoff = 0
728 self.yoff = 0
730 def extents(self) -> tuple[int, int, int, int]:
731 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize
734class PyCodec:
735 fd: IO[bytes] | None
737 def __init__(self, mode: str, *args: Any) -> None:
738 self.im: Image.core.ImagingCore | None = None
739 self.state = PyCodecState()
740 self.fd = None
741 self.mode = mode
742 self.init(args)
744 def init(self, args: tuple[Any, ...]) -> None:
745 """
746 Override to perform codec specific initialization
748 :param args: Tuple of arg items from the tile entry
749 :returns: None
750 """
751 self.args = args
753 def cleanup(self) -> None:
754 """
755 Override to perform codec specific cleanup
757 :returns: None
758 """
759 pass
761 def setfd(self, fd: IO[bytes]) -> None:
762 """
763 Called from ImageFile to set the Python file-like object
765 :param fd: A Python file-like object
766 :returns: None
767 """
768 self.fd = fd
770 def setimage(
771 self,
772 im: Image.core.ImagingCore,
773 extents: tuple[int, int, int, int] | None = None,
774 ) -> None:
775 """
776 Called from ImageFile to set the core output image for the codec
778 :param im: A core image object
779 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle
780 for this tile
781 :returns: None
782 """
784 # following c code
785 self.im = im
787 if extents:
788 (x0, y0, x1, y1) = extents
789 else:
790 (x0, y0, x1, y1) = (0, 0, 0, 0)
792 if x0 == 0 and x1 == 0:
793 self.state.xsize, self.state.ysize = self.im.size
794 else:
795 self.state.xoff = x0
796 self.state.yoff = y0
797 self.state.xsize = x1 - x0
798 self.state.ysize = y1 - y0
800 if self.state.xsize <= 0 or self.state.ysize <= 0:
801 msg = "Size cannot be negative"
802 raise ValueError(msg)
804 if (
805 self.state.xsize + self.state.xoff > self.im.size[0]
806 or self.state.ysize + self.state.yoff > self.im.size[1]
807 ):
808 msg = "Tile cannot extend outside image"
809 raise ValueError(msg)
812class PyDecoder(PyCodec):
813 """
814 Python implementation of a format decoder. Override this class and
815 add the decoding logic in the :meth:`decode` method.
817 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
818 """
820 _pulls_fd = False
822 @property
823 def pulls_fd(self) -> bool:
824 return self._pulls_fd
826 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
827 """
828 Override to perform the decoding process.
830 :param buffer: A bytes object with the data to be decoded.
831 :returns: A tuple of ``(bytes consumed, errcode)``.
832 If finished with decoding return -1 for the bytes consumed.
833 Err codes are from :data:`.ImageFile.ERRORS`.
834 """
835 msg = "unavailable in base decoder"
836 raise NotImplementedError(msg)
838 def set_as_raw(
839 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = ()
840 ) -> None:
841 """
842 Convenience method to set the internal image from a stream of raw data
844 :param data: Bytes to be set
845 :param rawmode: The rawmode to be used for the decoder.
846 If not specified, it will default to the mode of the image
847 :param extra: Extra arguments for the decoder.
848 :returns: None
849 """
851 if not rawmode:
852 rawmode = self.mode
853 d = Image._getdecoder(self.mode, "raw", rawmode, extra)
854 assert self.im is not None
855 d.setimage(self.im, self.state.extents())
856 s = d.decode(data)
858 if s[0] >= 0:
859 msg = "not enough image data"
860 raise ValueError(msg)
861 if s[1] != 0:
862 msg = "cannot decode image data"
863 raise ValueError(msg)
866class PyEncoder(PyCodec):
867 """
868 Python implementation of a format encoder. Override this class and
869 add the decoding logic in the :meth:`encode` method.
871 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
872 """
874 _pushes_fd = False
876 @property
877 def pushes_fd(self) -> bool:
878 return self._pushes_fd
880 def encode(self, bufsize: int) -> tuple[int, int, bytes]:
881 """
882 Override to perform the encoding process.
884 :param bufsize: Buffer size.
885 :returns: A tuple of ``(bytes encoded, errcode, bytes)``.
886 If finished with encoding return 1 for the error code.
887 Err codes are from :data:`.ImageFile.ERRORS`.
888 """
889 msg = "unavailable in base encoder"
890 raise NotImplementedError(msg)
892 def encode_to_pyfd(self) -> tuple[int, int]:
893 """
894 If ``pushes_fd`` is ``True``, then this method will be used,
895 and ``encode()`` will only be called once.
897 :returns: A tuple of ``(bytes consumed, errcode)``.
898 Err codes are from :data:`.ImageFile.ERRORS`.
899 """
900 if not self.pushes_fd:
901 return 0, -8 # bad configuration
902 bytes_consumed, errcode, data = self.encode(0)
903 if data:
904 assert self.fd is not None
905 self.fd.write(data)
906 return bytes_consumed, errcode
908 def encode_to_file(self, fh: int, bufsize: int) -> int:
909 """
910 :param fh: File handle.
911 :param bufsize: Buffer size.
913 :returns: If finished successfully, return 0.
914 Otherwise, return an error code. Err codes are from
915 :data:`.ImageFile.ERRORS`.
916 """
917 errcode = 0
918 while errcode == 0:
919 status, errcode, buf = self.encode(bufsize)
920 if status > 0:
921 os.write(fh, buf[status:])
922 return errcode