Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# The Python Imaging Library.
3# $Id$
4#
5# base class for image file handlers
6#
7# history:
8# 1995-09-09 fl Created
9# 1996-03-11 fl Fixed load mechanism.
10# 1996-04-15 fl Added pcx/xbm decoders.
11# 1996-04-30 fl Added encoders.
12# 1996-12-14 fl Added load helpers
13# 1997-01-11 fl Use encode_to_file where possible
14# 1997-08-27 fl Flush output in _save
15# 1998-03-05 fl Use memory mapping for some modes
16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B"
17# 1999-05-31 fl Added image parser
18# 2000-10-12 fl Set readonly flag on memory-mapped images
19# 2002-03-20 fl Use better messages for common decoder errors
20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available
21# 2003-10-30 fl Added StubImageFile class
22# 2004-02-25 fl Made incremental parser more robust
23#
24# Copyright (c) 1997-2004 by Secret Labs AB
25# Copyright (c) 1995-2004 by Fredrik Lundh
26#
27# See the README file for information on usage and redistribution.
28#
29from __future__ import annotations
31import abc
32import io
33import itertools
34import logging
35import os
36import struct
37from typing import IO, Any, NamedTuple, cast
39from . import ExifTags, Image
40from ._util import DeferredError, is_path
42TYPE_CHECKING = False
43if TYPE_CHECKING:
44 from ._typing import StrOrBytesPath
46logger = logging.getLogger(__name__)
48MAXBLOCK = 65536
49"""
50By default, Pillow processes image data in blocks. This helps to prevent excessive use
51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``.
53When reading an image, this is the number of bytes to read at once.
55When writing an image, this is the number of bytes to write at once.
56If the image width times 4 is greater, then that will be used instead.
57Plugins may also set a greater number.
59User code may set this to another number.
60"""
62SAFEBLOCK = 1024 * 1024
64LOAD_TRUNCATED_IMAGES = False
65"""Whether or not to load truncated image files. User code may change this."""
67ERRORS = {
68 -1: "image buffer overrun error",
69 -2: "decoding error",
70 -3: "unknown error",
71 -8: "bad configuration",
72 -9: "out of memory error",
73}
74"""
75Dict of known error codes returned from :meth:`.PyDecoder.decode`,
76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and
77:meth:`.PyEncoder.encode_to_file`.
78"""
81#
82# --------------------------------------------------------------------
83# Helpers
86def _get_oserror(error: int, *, encoder: bool) -> OSError:
87 try:
88 msg = Image.core.getcodecstatus(error)
89 except AttributeError:
90 msg = ERRORS.get(error)
91 if not msg:
92 msg = f"{'encoder' if encoder else 'decoder'} error {error}"
93 msg += f" when {'writing' if encoder else 'reading'} image file"
94 return OSError(msg)
97def _tilesort(t: _Tile) -> int:
98 # sort on offset
99 return t[2]
102class _Tile(NamedTuple):
103 codec_name: str
104 extents: tuple[int, int, int, int] | None
105 offset: int = 0
106 args: tuple[Any, ...] | str | None = None
109#
110# --------------------------------------------------------------------
111# ImageFile base class
114class ImageFile(Image.Image):
115 """Base class for image file format handlers."""
117 def __init__(
118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None
119 ) -> None:
120 super().__init__()
122 self._min_frame = 0
124 self.custom_mimetype: str | None = None
126 self.tile: list[_Tile] = []
127 """ A list of tile descriptors """
129 self.readonly = 1 # until we know better
131 self.decoderconfig: tuple[Any, ...] = ()
132 self.decodermaxblock = MAXBLOCK
134 if is_path(fp):
135 # filename
136 self.fp = open(fp, "rb")
137 self.filename = os.fspath(fp)
138 self._exclusive_fp = True
139 else:
140 # stream
141 self.fp = cast(IO[bytes], fp)
142 self.filename = filename if filename is not None else ""
143 # can be overridden
144 self._exclusive_fp = False
146 try:
147 try:
148 self._open()
149 except (
150 IndexError, # end of data
151 TypeError, # end of data (ord)
152 KeyError, # unsupported mode
153 EOFError, # got header but not the first frame
154 struct.error,
155 ) as v:
156 raise SyntaxError(v) from v
158 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0:
159 msg = "not identified by this driver"
160 raise SyntaxError(msg)
161 except BaseException:
162 # close the file only if we have opened it this constructor
163 if self._exclusive_fp:
164 self.fp.close()
165 raise
167 def _open(self) -> None:
168 pass
170 def _close_fp(self):
171 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError):
172 if self._fp != self.fp:
173 self._fp.close()
174 self._fp = DeferredError(ValueError("Operation on closed image"))
175 if self.fp:
176 self.fp.close()
178 def close(self) -> None:
179 """
180 Closes the file pointer, if possible.
182 This operation will destroy the image core and release its memory.
183 The image data will be unusable afterward.
185 This function is required to close images that have multiple frames or
186 have not had their file read and closed by the
187 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for
188 more information.
189 """
190 try:
191 self._close_fp()
192 self.fp = None
193 except Exception as msg:
194 logger.debug("Error closing: %s", msg)
196 super().close()
198 def get_child_images(self) -> list[ImageFile]:
199 child_images = []
200 exif = self.getexif()
201 ifds = []
202 if ExifTags.Base.SubIFDs in exif:
203 subifd_offsets = exif[ExifTags.Base.SubIFDs]
204 if subifd_offsets:
205 if not isinstance(subifd_offsets, tuple):
206 subifd_offsets = (subifd_offsets,)
207 for subifd_offset in subifd_offsets:
208 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset))
209 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1)
210 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset):
211 assert exif._info is not None
212 ifds.append((ifd1, exif._info.next))
214 offset = None
215 for ifd, ifd_offset in ifds:
216 assert self.fp is not None
217 current_offset = self.fp.tell()
218 if offset is None:
219 offset = current_offset
221 fp = self.fp
222 if ifd is not None:
223 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset)
224 if thumbnail_offset is not None:
225 thumbnail_offset += getattr(self, "_exif_offset", 0)
226 self.fp.seek(thumbnail_offset)
228 length = ifd.get(ExifTags.Base.JpegIFByteCount)
229 assert isinstance(length, int)
230 data = self.fp.read(length)
231 fp = io.BytesIO(data)
233 with Image.open(fp) as im:
234 from . import TiffImagePlugin
236 if thumbnail_offset is None and isinstance(
237 im, TiffImagePlugin.TiffImageFile
238 ):
239 im._frame_pos = [ifd_offset]
240 im._seek(0)
241 im.load()
242 child_images.append(im)
244 if offset is not None:
245 assert self.fp is not None
246 self.fp.seek(offset)
247 return child_images
249 def get_format_mimetype(self) -> str | None:
250 if self.custom_mimetype:
251 return self.custom_mimetype
252 if self.format is not None:
253 return Image.MIME.get(self.format.upper())
254 return None
256 def __getstate__(self) -> list[Any]:
257 return super().__getstate__() + [self.filename]
259 def __setstate__(self, state: list[Any]) -> None:
260 self.tile = []
261 if len(state) > 5:
262 self.filename = state[5]
263 super().__setstate__(state)
265 def verify(self) -> None:
266 """Check file integrity"""
268 # raise exception if something's wrong. must be called
269 # directly after open, and closes file when finished.
270 if self._exclusive_fp:
271 self.fp.close()
272 self.fp = None
274 def load(self) -> Image.core.PixelAccess | None:
275 """Load image data based on tile list"""
277 if not self.tile and self._im is None:
278 msg = "cannot load this image"
279 raise OSError(msg)
281 pixel = Image.Image.load(self)
282 if not self.tile:
283 return pixel
285 self.map: mmap.mmap | None = None
286 use_mmap = self.filename and len(self.tile) == 1
288 readonly = 0
290 # look for read/seek overrides
291 if hasattr(self, "load_read"):
292 read = self.load_read
293 # don't use mmap if there are custom read/seek functions
294 use_mmap = False
295 else:
296 read = self.fp.read
298 if hasattr(self, "load_seek"):
299 seek = self.load_seek
300 use_mmap = False
301 else:
302 seek = self.fp.seek
304 if use_mmap:
305 # try memory mapping
306 decoder_name, extents, offset, args = self.tile[0]
307 if isinstance(args, str):
308 args = (args, 0, 1)
309 if (
310 decoder_name == "raw"
311 and isinstance(args, tuple)
312 and len(args) >= 3
313 and args[0] == self.mode
314 and args[0] in Image._MAPMODES
315 ):
316 if offset < 0:
317 msg = "Tile offset cannot be negative"
318 raise ValueError(msg)
319 try:
320 # use mmap, if possible
321 import mmap
323 with open(self.filename) as fp:
324 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
325 if offset + self.size[1] * args[1] > self.map.size():
326 msg = "buffer is not large enough"
327 raise OSError(msg)
328 self.im = Image.core.map_buffer(
329 self.map, self.size, decoder_name, offset, args
330 )
331 readonly = 1
332 # After trashing self.im,
333 # we might need to reload the palette data.
334 if self.palette:
335 self.palette.dirty = 1
336 except (AttributeError, OSError, ImportError):
337 self.map = None
339 self.load_prepare()
340 err_code = -3 # initialize to unknown error
341 if not self.map:
342 # sort tiles in file order
343 self.tile.sort(key=_tilesort)
345 # FIXME: This is a hack to handle TIFF's JpegTables tag.
346 prefix = getattr(self, "tile_prefix", b"")
348 # Remove consecutive duplicates that only differ by their offset
349 self.tile = [
350 list(tiles)[-1]
351 for _, tiles in itertools.groupby(
352 self.tile, lambda tile: (tile[0], tile[1], tile[3])
353 )
354 ]
355 for i, (decoder_name, extents, offset, args) in enumerate(self.tile):
356 seek(offset)
357 decoder = Image._getdecoder(
358 self.mode, decoder_name, args, self.decoderconfig
359 )
360 try:
361 decoder.setimage(self.im, extents)
362 if decoder.pulls_fd:
363 decoder.setfd(self.fp)
364 err_code = decoder.decode(b"")[1]
365 else:
366 b = prefix
367 while True:
368 read_bytes = self.decodermaxblock
369 if i + 1 < len(self.tile):
370 next_offset = self.tile[i + 1].offset
371 if next_offset > offset:
372 read_bytes = next_offset - offset
373 try:
374 s = read(read_bytes)
375 except (IndexError, struct.error) as e:
376 # truncated png/gif
377 if LOAD_TRUNCATED_IMAGES:
378 break
379 else:
380 msg = "image file is truncated"
381 raise OSError(msg) from e
383 if not s: # truncated jpeg
384 if LOAD_TRUNCATED_IMAGES:
385 break
386 else:
387 msg = (
388 "image file is truncated "
389 f"({len(b)} bytes not processed)"
390 )
391 raise OSError(msg)
393 b = b + s
394 n, err_code = decoder.decode(b)
395 if n < 0:
396 break
397 b = b[n:]
398 finally:
399 # Need to cleanup here to prevent leaks
400 decoder.cleanup()
402 self.tile = []
403 self.readonly = readonly
405 self.load_end()
407 if self._exclusive_fp and self._close_exclusive_fp_after_loading:
408 self.fp.close()
409 self.fp = None
411 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0:
412 # still raised if decoder fails to return anything
413 raise _get_oserror(err_code, encoder=False)
415 return Image.Image.load(self)
417 def load_prepare(self) -> None:
418 # create image memory if necessary
419 if self._im is None:
420 self.im = Image.core.new(self.mode, self.size)
421 # create palette (optional)
422 if self.mode == "P":
423 Image.Image.load(self)
425 def load_end(self) -> None:
426 # may be overridden
427 pass
429 # may be defined for contained formats
430 # def load_seek(self, pos: int) -> None:
431 # pass
433 # may be defined for blocked formats (e.g. PNG)
434 # def load_read(self, read_bytes: int) -> bytes:
435 # pass
437 def _seek_check(self, frame: int) -> bool:
438 if (
439 frame < self._min_frame
440 # Only check upper limit on frames if additional seek operations
441 # are not required to do so
442 or (
443 not (hasattr(self, "_n_frames") and self._n_frames is None)
444 and frame >= getattr(self, "n_frames") + self._min_frame
445 )
446 ):
447 msg = "attempt to seek outside sequence"
448 raise EOFError(msg)
450 return self.tell() != frame
453class StubHandler(abc.ABC):
454 def open(self, im: StubImageFile) -> None:
455 pass
457 @abc.abstractmethod
458 def load(self, im: StubImageFile) -> Image.Image:
459 pass
462class StubImageFile(ImageFile, metaclass=abc.ABCMeta):
463 """
464 Base class for stub image loaders.
466 A stub loader is an image loader that can identify files of a
467 certain format, but relies on external code to load the file.
468 """
470 @abc.abstractmethod
471 def _open(self) -> None:
472 pass
474 def load(self) -> Image.core.PixelAccess | None:
475 loader = self._load()
476 if loader is None:
477 msg = f"cannot find loader for this {self.format} file"
478 raise OSError(msg)
479 image = loader.load(self)
480 assert image is not None
481 # become the other object (!)
482 self.__class__ = image.__class__ # type: ignore[assignment]
483 self.__dict__ = image.__dict__
484 return image.load()
486 @abc.abstractmethod
487 def _load(self) -> StubHandler | None:
488 """(Hook) Find actual image loader."""
489 pass
492class Parser:
493 """
494 Incremental image parser. This class implements the standard
495 feed/close consumer interface.
496 """
498 incremental = None
499 image: Image.Image | None = None
500 data: bytes | None = None
501 decoder: Image.core.ImagingDecoder | PyDecoder | None = None
502 offset = 0
503 finished = 0
505 def reset(self) -> None:
506 """
507 (Consumer) Reset the parser. Note that you can only call this
508 method immediately after you've created a parser; parser
509 instances cannot be reused.
510 """
511 assert self.data is None, "cannot reuse parsers"
513 def feed(self, data: bytes) -> None:
514 """
515 (Consumer) Feed data to the parser.
517 :param data: A string buffer.
518 :exception OSError: If the parser failed to parse the image file.
519 """
520 # collect data
522 if self.finished:
523 return
525 if self.data is None:
526 self.data = data
527 else:
528 self.data = self.data + data
530 # parse what we have
531 if self.decoder:
532 if self.offset > 0:
533 # skip header
534 skip = min(len(self.data), self.offset)
535 self.data = self.data[skip:]
536 self.offset = self.offset - skip
537 if self.offset > 0 or not self.data:
538 return
540 n, e = self.decoder.decode(self.data)
542 if n < 0:
543 # end of stream
544 self.data = None
545 self.finished = 1
546 if e < 0:
547 # decoding error
548 self.image = None
549 raise _get_oserror(e, encoder=False)
550 else:
551 # end of image
552 return
553 self.data = self.data[n:]
555 elif self.image:
556 # if we end up here with no decoder, this file cannot
557 # be incrementally parsed. wait until we've gotten all
558 # available data
559 pass
561 else:
562 # attempt to open this file
563 try:
564 with io.BytesIO(self.data) as fp:
565 im = Image.open(fp)
566 except OSError:
567 pass # not enough data
568 else:
569 flag = hasattr(im, "load_seek") or hasattr(im, "load_read")
570 if flag or len(im.tile) != 1:
571 # custom load code, or multiple tiles
572 self.decode = None
573 else:
574 # initialize decoder
575 im.load_prepare()
576 d, e, o, a = im.tile[0]
577 im.tile = []
578 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig)
579 self.decoder.setimage(im.im, e)
581 # calculate decoder offset
582 self.offset = o
583 if self.offset <= len(self.data):
584 self.data = self.data[self.offset :]
585 self.offset = 0
587 self.image = im
589 def __enter__(self) -> Parser:
590 return self
592 def __exit__(self, *args: object) -> None:
593 self.close()
595 def close(self) -> Image.Image:
596 """
597 (Consumer) Close the stream.
599 :returns: An image object.
600 :exception OSError: If the parser failed to parse the image file either
601 because it cannot be identified or cannot be
602 decoded.
603 """
604 # finish decoding
605 if self.decoder:
606 # get rid of what's left in the buffers
607 self.feed(b"")
608 self.data = self.decoder = None
609 if not self.finished:
610 msg = "image was incomplete"
611 raise OSError(msg)
612 if not self.image:
613 msg = "cannot parse this image"
614 raise OSError(msg)
615 if self.data:
616 # incremental parsing not possible; reopen the file
617 # not that we have all data
618 with io.BytesIO(self.data) as fp:
619 try:
620 self.image = Image.open(fp)
621 finally:
622 self.image.load()
623 return self.image
626# --------------------------------------------------------------------
629def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None:
630 """Helper to save image based on tile list
632 :param im: Image object.
633 :param fp: File object.
634 :param tile: Tile list.
635 :param bufsize: Optional buffer size
636 """
638 im.load()
639 if not hasattr(im, "encoderconfig"):
640 im.encoderconfig = ()
641 tile.sort(key=_tilesort)
642 # FIXME: make MAXBLOCK a configuration parameter
643 # It would be great if we could have the encoder specify what it needs
644 # But, it would need at least the image size in most cases. RawEncode is
645 # a tricky case.
646 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c
647 try:
648 fh = fp.fileno()
649 fp.flush()
650 _encode_tile(im, fp, tile, bufsize, fh)
651 except (AttributeError, io.UnsupportedOperation) as exc:
652 _encode_tile(im, fp, tile, bufsize, None, exc)
653 if hasattr(fp, "flush"):
654 fp.flush()
657def _encode_tile(
658 im: Image.Image,
659 fp: IO[bytes],
660 tile: list[_Tile],
661 bufsize: int,
662 fh: int | None,
663 exc: BaseException | None = None,
664) -> None:
665 for encoder_name, extents, offset, args in tile:
666 if offset > 0:
667 fp.seek(offset)
668 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig)
669 try:
670 encoder.setimage(im.im, extents)
671 if encoder.pushes_fd:
672 encoder.setfd(fp)
673 errcode = encoder.encode_to_pyfd()[1]
674 else:
675 if exc:
676 # compress to Python file-compatible object
677 while True:
678 errcode, data = encoder.encode(bufsize)[1:]
679 fp.write(data)
680 if errcode:
681 break
682 else:
683 # slight speedup: compress to real file object
684 assert fh is not None
685 errcode = encoder.encode_to_file(fh, bufsize)
686 if errcode < 0:
687 raise _get_oserror(errcode, encoder=True) from exc
688 finally:
689 encoder.cleanup()
692def _safe_read(fp: IO[bytes], size: int) -> bytes:
693 """
694 Reads large blocks in a safe way. Unlike fp.read(n), this function
695 doesn't trust the user. If the requested size is larger than
696 SAFEBLOCK, the file is read block by block.
698 :param fp: File handle. Must implement a <b>read</b> method.
699 :param size: Number of bytes to read.
700 :returns: A string containing <i>size</i> bytes of data.
702 Raises an OSError if the file is truncated and the read cannot be completed
704 """
705 if size <= 0:
706 return b""
707 if size <= SAFEBLOCK:
708 data = fp.read(size)
709 if len(data) < size:
710 msg = "Truncated File Read"
711 raise OSError(msg)
712 return data
713 blocks: list[bytes] = []
714 remaining_size = size
715 while remaining_size > 0:
716 block = fp.read(min(remaining_size, SAFEBLOCK))
717 if not block:
718 break
719 blocks.append(block)
720 remaining_size -= len(block)
721 if sum(len(block) for block in blocks) < size:
722 msg = "Truncated File Read"
723 raise OSError(msg)
724 return b"".join(blocks)
727class PyCodecState:
728 def __init__(self) -> None:
729 self.xsize = 0
730 self.ysize = 0
731 self.xoff = 0
732 self.yoff = 0
734 def extents(self) -> tuple[int, int, int, int]:
735 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize
738class PyCodec:
739 fd: IO[bytes] | None
741 def __init__(self, mode: str, *args: Any) -> None:
742 self.im: Image.core.ImagingCore | None = None
743 self.state = PyCodecState()
744 self.fd = None
745 self.mode = mode
746 self.init(args)
748 def init(self, args: tuple[Any, ...]) -> None:
749 """
750 Override to perform codec specific initialization
752 :param args: Tuple of arg items from the tile entry
753 :returns: None
754 """
755 self.args = args
757 def cleanup(self) -> None:
758 """
759 Override to perform codec specific cleanup
761 :returns: None
762 """
763 pass
765 def setfd(self, fd: IO[bytes]) -> None:
766 """
767 Called from ImageFile to set the Python file-like object
769 :param fd: A Python file-like object
770 :returns: None
771 """
772 self.fd = fd
774 def setimage(
775 self,
776 im: Image.core.ImagingCore,
777 extents: tuple[int, int, int, int] | None = None,
778 ) -> None:
779 """
780 Called from ImageFile to set the core output image for the codec
782 :param im: A core image object
783 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle
784 for this tile
785 :returns: None
786 """
788 # following c code
789 self.im = im
791 if extents:
792 (x0, y0, x1, y1) = extents
793 else:
794 (x0, y0, x1, y1) = (0, 0, 0, 0)
796 if x0 == 0 and x1 == 0:
797 self.state.xsize, self.state.ysize = self.im.size
798 else:
799 self.state.xoff = x0
800 self.state.yoff = y0
801 self.state.xsize = x1 - x0
802 self.state.ysize = y1 - y0
804 if self.state.xsize <= 0 or self.state.ysize <= 0:
805 msg = "Size cannot be negative"
806 raise ValueError(msg)
808 if (
809 self.state.xsize + self.state.xoff > self.im.size[0]
810 or self.state.ysize + self.state.yoff > self.im.size[1]
811 ):
812 msg = "Tile cannot extend outside image"
813 raise ValueError(msg)
816class PyDecoder(PyCodec):
817 """
818 Python implementation of a format decoder. Override this class and
819 add the decoding logic in the :meth:`decode` method.
821 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
822 """
824 _pulls_fd = False
826 @property
827 def pulls_fd(self) -> bool:
828 return self._pulls_fd
830 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
831 """
832 Override to perform the decoding process.
834 :param buffer: A bytes object with the data to be decoded.
835 :returns: A tuple of ``(bytes consumed, errcode)``.
836 If finished with decoding return -1 for the bytes consumed.
837 Err codes are from :data:`.ImageFile.ERRORS`.
838 """
839 msg = "unavailable in base decoder"
840 raise NotImplementedError(msg)
842 def set_as_raw(
843 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = ()
844 ) -> None:
845 """
846 Convenience method to set the internal image from a stream of raw data
848 :param data: Bytes to be set
849 :param rawmode: The rawmode to be used for the decoder.
850 If not specified, it will default to the mode of the image
851 :param extra: Extra arguments for the decoder.
852 :returns: None
853 """
855 if not rawmode:
856 rawmode = self.mode
857 d = Image._getdecoder(self.mode, "raw", rawmode, extra)
858 assert self.im is not None
859 d.setimage(self.im, self.state.extents())
860 s = d.decode(data)
862 if s[0] >= 0:
863 msg = "not enough image data"
864 raise ValueError(msg)
865 if s[1] != 0:
866 msg = "cannot decode image data"
867 raise ValueError(msg)
870class PyEncoder(PyCodec):
871 """
872 Python implementation of a format encoder. Override this class and
873 add the decoding logic in the :meth:`encode` method.
875 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
876 """
878 _pushes_fd = False
880 @property
881 def pushes_fd(self) -> bool:
882 return self._pushes_fd
884 def encode(self, bufsize: int) -> tuple[int, int, bytes]:
885 """
886 Override to perform the encoding process.
888 :param bufsize: Buffer size.
889 :returns: A tuple of ``(bytes encoded, errcode, bytes)``.
890 If finished with encoding return 1 for the error code.
891 Err codes are from :data:`.ImageFile.ERRORS`.
892 """
893 msg = "unavailable in base encoder"
894 raise NotImplementedError(msg)
896 def encode_to_pyfd(self) -> tuple[int, int]:
897 """
898 If ``pushes_fd`` is ``True``, then this method will be used,
899 and ``encode()`` will only be called once.
901 :returns: A tuple of ``(bytes consumed, errcode)``.
902 Err codes are from :data:`.ImageFile.ERRORS`.
903 """
904 if not self.pushes_fd:
905 return 0, -8 # bad configuration
906 bytes_consumed, errcode, data = self.encode(0)
907 if data:
908 assert self.fd is not None
909 self.fd.write(data)
910 return bytes_consumed, errcode
912 def encode_to_file(self, fh: int, bufsize: int) -> int:
913 """
914 :param fh: File handle.
915 :param bufsize: Buffer size.
917 :returns: If finished successfully, return 0.
918 Otherwise, return an error code. Err codes are from
919 :data:`.ImageFile.ERRORS`.
920 """
921 errcode = 0
922 while errcode == 0:
923 status, errcode, buf = self.encode(bufsize)
924 if status > 0:
925 os.write(fh, buf[status:])
926 return errcode