Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# The Python Imaging Library.
3# $Id$
4#
5# base class for image file handlers
6#
7# history:
8# 1995-09-09 fl Created
9# 1996-03-11 fl Fixed load mechanism.
10# 1996-04-15 fl Added pcx/xbm decoders.
11# 1996-04-30 fl Added encoders.
12# 1996-12-14 fl Added load helpers
13# 1997-01-11 fl Use encode_to_file where possible
14# 1997-08-27 fl Flush output in _save
15# 1998-03-05 fl Use memory mapping for some modes
16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B"
17# 1999-05-31 fl Added image parser
18# 2000-10-12 fl Set readonly flag on memory-mapped images
19# 2002-03-20 fl Use better messages for common decoder errors
20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available
21# 2003-10-30 fl Added StubImageFile class
22# 2004-02-25 fl Made incremental parser more robust
23#
24# Copyright (c) 1997-2004 by Secret Labs AB
25# Copyright (c) 1995-2004 by Fredrik Lundh
26#
27# See the README file for information on usage and redistribution.
28#
29from __future__ import annotations
31import abc
32import io
33import itertools
34import logging
35import os
36import struct
37from typing import IO, Any, NamedTuple, cast
39from . import ExifTags, Image
40from ._util import DeferredError, is_path
42TYPE_CHECKING = False
43if TYPE_CHECKING:
44 from ._typing import StrOrBytesPath
46logger = logging.getLogger(__name__)
48MAXBLOCK = 65536
49"""
50By default, Pillow processes image data in blocks. This helps to prevent excessive use
51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``.
53When reading an image, this is the number of bytes to read at once.
55When writing an image, this is the number of bytes to write at once.
56If the image width times 4 is greater, then that will be used instead.
57Plugins may also set a greater number.
59User code may set this to another number.
60"""
62SAFEBLOCK = 1024 * 1024
64LOAD_TRUNCATED_IMAGES = False
65"""Whether or not to load truncated image files. User code may change this."""
67ERRORS = {
68 -1: "image buffer overrun error",
69 -2: "decoding error",
70 -3: "unknown error",
71 -8: "bad configuration",
72 -9: "out of memory error",
73}
74"""
75Dict of known error codes returned from :meth:`.PyDecoder.decode`,
76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and
77:meth:`.PyEncoder.encode_to_file`.
78"""
81#
82# --------------------------------------------------------------------
83# Helpers
86def _get_oserror(error: int, *, encoder: bool) -> OSError:
87 try:
88 msg = Image.core.getcodecstatus(error)
89 except AttributeError:
90 msg = ERRORS.get(error)
91 if not msg:
92 msg = f"{'encoder' if encoder else 'decoder'} error {error}"
93 msg += f" when {'writing' if encoder else 'reading'} image file"
94 return OSError(msg)
97def _tilesort(t: _Tile) -> int:
98 # sort on offset
99 return t[2]
102class _Tile(NamedTuple):
103 codec_name: str
104 extents: tuple[int, int, int, int] | None
105 offset: int = 0
106 args: tuple[Any, ...] | str | None = None
109#
110# --------------------------------------------------------------------
111# ImageFile base class
114class ImageFile(Image.Image):
115 """Base class for image file format handlers."""
117 def __init__(
118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None
119 ) -> None:
120 super().__init__()
122 self._min_frame = 0
124 self.custom_mimetype: str | None = None
126 self.tile: list[_Tile] = []
127 """ A list of tile descriptors """
129 self.readonly = 1 # until we know better
131 self.decoderconfig: tuple[Any, ...] = ()
132 self.decodermaxblock = MAXBLOCK
134 self.fp: IO[bytes] | None
135 self._fp: IO[bytes] | DeferredError
136 if is_path(fp):
137 # filename
138 self.fp = open(fp, "rb")
139 self.filename = os.fspath(fp)
140 self._exclusive_fp = True
141 else:
142 # stream
143 self.fp = cast(IO[bytes], fp)
144 self.filename = filename if filename is not None else ""
145 # can be overridden
146 self._exclusive_fp = False
148 try:
149 try:
150 self._open()
152 if isinstance(self, StubImageFile):
153 if loader := self._load():
154 loader.open(self)
155 except (
156 IndexError, # end of data
157 TypeError, # end of data (ord)
158 KeyError, # unsupported mode
159 EOFError, # got header but not the first frame
160 struct.error,
161 ) as v:
162 raise SyntaxError(v) from v
164 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0:
165 msg = "not identified by this driver"
166 raise SyntaxError(msg)
167 except BaseException:
168 # close the file only if we have opened it this constructor
169 if self._exclusive_fp:
170 self.fp.close()
171 raise
173 def _open(self) -> None:
174 pass
176 # Context manager support
177 def __enter__(self) -> ImageFile:
178 return self
180 def _close_fp(self) -> None:
181 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError):
182 if self._fp != self.fp:
183 self._fp.close()
184 self._fp = DeferredError(ValueError("Operation on closed image"))
185 if self.fp:
186 self.fp.close()
188 def __exit__(self, *args: object) -> None:
189 if getattr(self, "_exclusive_fp", False):
190 self._close_fp()
191 self.fp = None
193 def close(self) -> None:
194 """
195 Closes the file pointer, if possible.
197 This operation will destroy the image core and release its memory.
198 The image data will be unusable afterward.
200 This function is required to close images that have multiple frames or
201 have not had their file read and closed by the
202 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for
203 more information.
204 """
205 try:
206 self._close_fp()
207 self.fp = None
208 except Exception as msg:
209 logger.debug("Error closing: %s", msg)
211 super().close()
213 def get_child_images(self) -> list[ImageFile]:
214 child_images = []
215 exif = self.getexif()
216 ifds = []
217 if ExifTags.Base.SubIFDs in exif:
218 subifd_offsets = exif[ExifTags.Base.SubIFDs]
219 if subifd_offsets:
220 if not isinstance(subifd_offsets, tuple):
221 subifd_offsets = (subifd_offsets,)
222 ifds = [
223 (exif._get_ifd_dict(subifd_offset), subifd_offset)
224 for subifd_offset in subifd_offsets
225 ]
226 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1)
227 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset):
228 assert exif._info is not None
229 ifds.append((ifd1, exif._info.next))
231 offset = None
232 for ifd, ifd_offset in ifds:
233 assert self.fp is not None
234 current_offset = self.fp.tell()
235 if offset is None:
236 offset = current_offset
238 fp = self.fp
239 if ifd is not None:
240 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset)
241 if thumbnail_offset is not None:
242 thumbnail_offset += getattr(self, "_exif_offset", 0)
243 self.fp.seek(thumbnail_offset)
245 length = ifd.get(ExifTags.Base.JpegIFByteCount)
246 assert isinstance(length, int)
247 data = self.fp.read(length)
248 fp = io.BytesIO(data)
250 with Image.open(fp) as im:
251 from . import TiffImagePlugin
253 if thumbnail_offset is None and isinstance(
254 im, TiffImagePlugin.TiffImageFile
255 ):
256 im._frame_pos = [ifd_offset]
257 im._seek(0)
258 im.load()
259 child_images.append(im)
261 if offset is not None:
262 assert self.fp is not None
263 self.fp.seek(offset)
264 return child_images
266 def get_format_mimetype(self) -> str | None:
267 if self.custom_mimetype:
268 return self.custom_mimetype
269 if self.format is not None:
270 return Image.MIME.get(self.format.upper())
271 return None
273 def __getstate__(self) -> list[Any]:
274 return super().__getstate__() + [self.filename]
276 def __setstate__(self, state: list[Any]) -> None:
277 self.tile = []
278 if len(state) > 5:
279 self.filename = state[5]
280 super().__setstate__(state)
282 def verify(self) -> None:
283 """Check file integrity"""
285 # raise exception if something's wrong. must be called
286 # directly after open, and closes file when finished.
287 if self._exclusive_fp and self.fp:
288 self.fp.close()
289 self.fp = None
291 def load(self) -> Image.core.PixelAccess | None:
292 """Load image data based on tile list"""
294 if not self.tile and self._im is None:
295 msg = "cannot load this image"
296 raise OSError(msg)
298 pixel = Image.Image.load(self)
299 if not self.tile:
300 return pixel
302 self.map: mmap.mmap | None = None
303 use_mmap = self.filename and len(self.tile) == 1
305 assert self.fp is not None
306 readonly = 0
308 # look for read/seek overrides
309 if hasattr(self, "load_read"):
310 read = self.load_read
311 # don't use mmap if there are custom read/seek functions
312 use_mmap = False
313 else:
314 read = self.fp.read
316 if hasattr(self, "load_seek"):
317 seek = self.load_seek
318 use_mmap = False
319 else:
320 seek = self.fp.seek
322 if use_mmap:
323 # try memory mapping
324 decoder_name, extents, offset, args = self.tile[0]
325 if isinstance(args, str):
326 args = (args, 0, 1)
327 if (
328 decoder_name == "raw"
329 and isinstance(args, tuple)
330 and len(args) >= 3
331 and args[0] == self.mode
332 and args[0] in Image._MAPMODES
333 ):
334 if offset < 0:
335 msg = "Tile offset cannot be negative"
336 raise ValueError(msg)
337 try:
338 # use mmap, if possible
339 import mmap
341 with open(self.filename) as fp:
342 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
343 if offset + self.size[1] * args[1] > self.map.size():
344 msg = "buffer is not large enough"
345 raise OSError(msg)
346 self.im = Image.core.map_buffer(
347 self.map, self.size, decoder_name, offset, args
348 )
349 readonly = 1
350 # After trashing self.im,
351 # we might need to reload the palette data.
352 if self.palette:
353 self.palette.dirty = 1
354 except (AttributeError, OSError, ImportError):
355 self.map = None
357 self.load_prepare()
358 err_code = -3 # initialize to unknown error
359 if not self.map:
360 # sort tiles in file order
361 self.tile.sort(key=_tilesort)
363 # FIXME: This is a hack to handle TIFF's JpegTables tag.
364 prefix = getattr(self, "tile_prefix", b"")
366 # Remove consecutive duplicates that only differ by their offset
367 self.tile = [
368 list(tiles)[-1]
369 for _, tiles in itertools.groupby(
370 self.tile, lambda tile: (tile[0], tile[1], tile[3])
371 )
372 ]
373 for i, (decoder_name, extents, offset, args) in enumerate(self.tile):
374 seek(offset)
375 decoder = Image._getdecoder(
376 self.mode, decoder_name, args, self.decoderconfig
377 )
378 try:
379 decoder.setimage(self.im, extents)
380 if decoder.pulls_fd:
381 decoder.setfd(self.fp)
382 err_code = decoder.decode(b"")[1]
383 else:
384 b = prefix
385 while True:
386 read_bytes = self.decodermaxblock
387 if i + 1 < len(self.tile):
388 next_offset = self.tile[i + 1].offset
389 if next_offset > offset:
390 read_bytes = next_offset - offset
391 try:
392 s = read(read_bytes)
393 except (IndexError, struct.error) as e:
394 # truncated png/gif
395 if LOAD_TRUNCATED_IMAGES:
396 break
397 else:
398 msg = "image file is truncated"
399 raise OSError(msg) from e
401 if not s: # truncated jpeg
402 if LOAD_TRUNCATED_IMAGES:
403 break
404 else:
405 msg = (
406 "image file is truncated "
407 f"({len(b)} bytes not processed)"
408 )
409 raise OSError(msg)
411 b = b + s
412 n, err_code = decoder.decode(b)
413 if n < 0:
414 break
415 b = b[n:]
416 finally:
417 # Need to cleanup here to prevent leaks
418 decoder.cleanup()
420 self.tile = []
421 self.readonly = readonly
423 self.load_end()
425 if self._exclusive_fp and self._close_exclusive_fp_after_loading:
426 self.fp.close()
427 self.fp = None
429 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0:
430 # still raised if decoder fails to return anything
431 raise _get_oserror(err_code, encoder=False)
433 return Image.Image.load(self)
435 def load_prepare(self) -> None:
436 # create image memory if necessary
437 if self._im is None:
438 self.im = Image.core.new(self.mode, self.size)
439 # create palette (optional)
440 if self.mode == "P":
441 Image.Image.load(self)
443 def load_end(self) -> None:
444 # may be overridden
445 pass
447 # may be defined for contained formats
448 # def load_seek(self, pos: int) -> None:
449 # pass
451 # may be defined for blocked formats (e.g. PNG)
452 # def load_read(self, read_bytes: int) -> bytes:
453 # pass
455 def _seek_check(self, frame: int) -> bool:
456 if (
457 frame < self._min_frame
458 # Only check upper limit on frames if additional seek operations
459 # are not required to do so
460 or (
461 not (hasattr(self, "_n_frames") and self._n_frames is None)
462 and frame >= getattr(self, "n_frames") + self._min_frame
463 )
464 ):
465 msg = "attempt to seek outside sequence"
466 raise EOFError(msg)
468 return self.tell() != frame
471class StubHandler(abc.ABC):
472 def open(self, im: StubImageFile) -> None:
473 pass
475 @abc.abstractmethod
476 def load(self, im: StubImageFile) -> Image.Image:
477 pass
480class StubImageFile(ImageFile, metaclass=abc.ABCMeta):
481 """
482 Base class for stub image loaders.
484 A stub loader is an image loader that can identify files of a
485 certain format, but relies on external code to load the file.
486 """
488 @abc.abstractmethod
489 def _open(self) -> None:
490 pass
492 def load(self) -> Image.core.PixelAccess | None:
493 loader = self._load()
494 if loader is None:
495 msg = f"cannot find loader for this {self.format} file"
496 raise OSError(msg)
497 image = loader.load(self)
498 assert image is not None
499 # become the other object (!)
500 self.__class__ = image.__class__ # type: ignore[assignment]
501 self.__dict__ = image.__dict__
502 return image.load()
504 @abc.abstractmethod
505 def _load(self) -> StubHandler | None:
506 """(Hook) Find actual image loader."""
507 pass
510class Parser:
511 """
512 Incremental image parser. This class implements the standard
513 feed/close consumer interface.
514 """
516 incremental = None
517 image: Image.Image | None = None
518 data: bytes | None = None
519 decoder: Image.core.ImagingDecoder | PyDecoder | None = None
520 offset = 0
521 finished = 0
523 def reset(self) -> None:
524 """
525 (Consumer) Reset the parser. Note that you can only call this
526 method immediately after you've created a parser; parser
527 instances cannot be reused.
528 """
529 assert self.data is None, "cannot reuse parsers"
531 def feed(self, data: bytes) -> None:
532 """
533 (Consumer) Feed data to the parser.
535 :param data: A string buffer.
536 :exception OSError: If the parser failed to parse the image file.
537 """
538 # collect data
540 if self.finished:
541 return
543 if self.data is None:
544 self.data = data
545 else:
546 self.data = self.data + data
548 # parse what we have
549 if self.decoder:
550 if self.offset > 0:
551 # skip header
552 skip = min(len(self.data), self.offset)
553 self.data = self.data[skip:]
554 self.offset = self.offset - skip
555 if self.offset > 0 or not self.data:
556 return
558 n, e = self.decoder.decode(self.data)
560 if n < 0:
561 # end of stream
562 self.data = None
563 self.finished = 1
564 if e < 0:
565 # decoding error
566 self.image = None
567 raise _get_oserror(e, encoder=False)
568 else:
569 # end of image
570 return
571 self.data = self.data[n:]
573 elif self.image:
574 # if we end up here with no decoder, this file cannot
575 # be incrementally parsed. wait until we've gotten all
576 # available data
577 pass
579 else:
580 # attempt to open this file
581 try:
582 with io.BytesIO(self.data) as fp:
583 im = Image.open(fp)
584 except OSError:
585 pass # not enough data
586 else:
587 flag = hasattr(im, "load_seek") or hasattr(im, "load_read")
588 if not flag and len(im.tile) == 1:
589 # initialize decoder
590 im.load_prepare()
591 d, e, o, a = im.tile[0]
592 im.tile = []
593 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig)
594 self.decoder.setimage(im.im, e)
596 # calculate decoder offset
597 self.offset = o
598 if self.offset <= len(self.data):
599 self.data = self.data[self.offset :]
600 self.offset = 0
602 self.image = im
604 def __enter__(self) -> Parser:
605 return self
607 def __exit__(self, *args: object) -> None:
608 self.close()
610 def close(self) -> Image.Image:
611 """
612 (Consumer) Close the stream.
614 :returns: An image object.
615 :exception OSError: If the parser failed to parse the image file either
616 because it cannot be identified or cannot be
617 decoded.
618 """
619 # finish decoding
620 if self.decoder:
621 # get rid of what's left in the buffers
622 self.feed(b"")
623 self.data = self.decoder = None
624 if not self.finished:
625 msg = "image was incomplete"
626 raise OSError(msg)
627 if not self.image:
628 msg = "cannot parse this image"
629 raise OSError(msg)
630 if self.data:
631 # incremental parsing not possible; reopen the file
632 # not that we have all data
633 with io.BytesIO(self.data) as fp:
634 try:
635 self.image = Image.open(fp)
636 finally:
637 self.image.load()
638 return self.image
641# --------------------------------------------------------------------
644def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None:
645 """Helper to save image based on tile list
647 :param im: Image object.
648 :param fp: File object.
649 :param tile: Tile list.
650 :param bufsize: Optional buffer size
651 """
653 im.load()
654 if not hasattr(im, "encoderconfig"):
655 im.encoderconfig = ()
656 tile.sort(key=_tilesort)
657 # FIXME: make MAXBLOCK a configuration parameter
658 # It would be great if we could have the encoder specify what it needs
659 # But, it would need at least the image size in most cases. RawEncode is
660 # a tricky case.
661 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c
662 try:
663 fh = fp.fileno()
664 fp.flush()
665 _encode_tile(im, fp, tile, bufsize, fh)
666 except (AttributeError, io.UnsupportedOperation) as exc:
667 _encode_tile(im, fp, tile, bufsize, None, exc)
668 if hasattr(fp, "flush"):
669 fp.flush()
672def _encode_tile(
673 im: Image.Image,
674 fp: IO[bytes],
675 tile: list[_Tile],
676 bufsize: int,
677 fh: int | None,
678 exc: BaseException | None = None,
679) -> None:
680 for encoder_name, extents, offset, args in tile:
681 if offset > 0:
682 fp.seek(offset)
683 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig)
684 try:
685 encoder.setimage(im.im, extents)
686 if encoder.pushes_fd:
687 encoder.setfd(fp)
688 errcode = encoder.encode_to_pyfd()[1]
689 else:
690 if exc:
691 # compress to Python file-compatible object
692 while True:
693 errcode, data = encoder.encode(bufsize)[1:]
694 fp.write(data)
695 if errcode:
696 break
697 else:
698 # slight speedup: compress to real file object
699 assert fh is not None
700 errcode = encoder.encode_to_file(fh, bufsize)
701 if errcode < 0:
702 raise _get_oserror(errcode, encoder=True) from exc
703 finally:
704 encoder.cleanup()
707def _safe_read(fp: IO[bytes], size: int) -> bytes:
708 """
709 Reads large blocks in a safe way. Unlike fp.read(n), this function
710 doesn't trust the user. If the requested size is larger than
711 SAFEBLOCK, the file is read block by block.
713 :param fp: File handle. Must implement a <b>read</b> method.
714 :param size: Number of bytes to read.
715 :returns: A string containing <i>size</i> bytes of data.
717 Raises an OSError if the file is truncated and the read cannot be completed
719 """
720 if size <= 0:
721 return b""
722 if size <= SAFEBLOCK:
723 data = fp.read(size)
724 if len(data) < size:
725 msg = "Truncated File Read"
726 raise OSError(msg)
727 return data
728 blocks: list[bytes] = []
729 remaining_size = size
730 while remaining_size > 0:
731 block = fp.read(min(remaining_size, SAFEBLOCK))
732 if not block:
733 break
734 blocks.append(block)
735 remaining_size -= len(block)
736 if sum(len(block) for block in blocks) < size:
737 msg = "Truncated File Read"
738 raise OSError(msg)
739 return b"".join(blocks)
742class PyCodecState:
743 def __init__(self) -> None:
744 self.xsize = 0
745 self.ysize = 0
746 self.xoff = 0
747 self.yoff = 0
749 def extents(self) -> tuple[int, int, int, int]:
750 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize
753class PyCodec:
754 fd: IO[bytes] | None
756 def __init__(self, mode: str, *args: Any) -> None:
757 self.im: Image.core.ImagingCore | None = None
758 self.state = PyCodecState()
759 self.fd = None
760 self.mode = mode
761 self.init(args)
763 def init(self, args: tuple[Any, ...]) -> None:
764 """
765 Override to perform codec specific initialization
767 :param args: Tuple of arg items from the tile entry
768 :returns: None
769 """
770 self.args = args
772 def cleanup(self) -> None:
773 """
774 Override to perform codec specific cleanup
776 :returns: None
777 """
778 pass
780 def setfd(self, fd: IO[bytes]) -> None:
781 """
782 Called from ImageFile to set the Python file-like object
784 :param fd: A Python file-like object
785 :returns: None
786 """
787 self.fd = fd
789 def setimage(
790 self,
791 im: Image.core.ImagingCore,
792 extents: tuple[int, int, int, int] | None = None,
793 ) -> None:
794 """
795 Called from ImageFile to set the core output image for the codec
797 :param im: A core image object
798 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle
799 for this tile
800 :returns: None
801 """
803 # following c code
804 self.im = im
806 if extents:
807 x0, y0, x1, y1 = extents
809 if x0 < 0 or y0 < 0 or x1 > self.im.size[0] or y1 > self.im.size[1]:
810 msg = "Tile cannot extend outside image"
811 raise ValueError(msg)
813 self.state.xoff = x0
814 self.state.yoff = y0
815 self.state.xsize = x1 - x0
816 self.state.ysize = y1 - y0
817 else:
818 self.state.xsize, self.state.ysize = self.im.size
820 if self.state.xsize <= 0 or self.state.ysize <= 0:
821 msg = "Size must be positive"
822 raise ValueError(msg)
825class PyDecoder(PyCodec):
826 """
827 Python implementation of a format decoder. Override this class and
828 add the decoding logic in the :meth:`decode` method.
830 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
831 """
833 _pulls_fd = False
835 @property
836 def pulls_fd(self) -> bool:
837 return self._pulls_fd
839 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
840 """
841 Override to perform the decoding process.
843 :param buffer: A bytes object with the data to be decoded.
844 :returns: A tuple of ``(bytes consumed, errcode)``.
845 If finished with decoding return -1 for the bytes consumed.
846 Err codes are from :data:`.ImageFile.ERRORS`.
847 """
848 msg = "unavailable in base decoder"
849 raise NotImplementedError(msg)
851 def set_as_raw(
852 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = ()
853 ) -> None:
854 """
855 Convenience method to set the internal image from a stream of raw data
857 :param data: Bytes to be set
858 :param rawmode: The rawmode to be used for the decoder.
859 If not specified, it will default to the mode of the image
860 :param extra: Extra arguments for the decoder.
861 :returns: None
862 """
864 if not rawmode:
865 rawmode = self.mode
866 d = Image._getdecoder(self.mode, "raw", rawmode, extra)
867 assert self.im is not None
868 d.setimage(self.im, self.state.extents())
869 s = d.decode(data)
871 if s[0] >= 0:
872 msg = "not enough image data"
873 raise ValueError(msg)
874 if s[1] != 0:
875 msg = "cannot decode image data"
876 raise ValueError(msg)
879class PyEncoder(PyCodec):
880 """
881 Python implementation of a format encoder. Override this class and
882 add the decoding logic in the :meth:`encode` method.
884 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
885 """
887 _pushes_fd = False
889 @property
890 def pushes_fd(self) -> bool:
891 return self._pushes_fd
893 def encode(self, bufsize: int) -> tuple[int, int, bytes]:
894 """
895 Override to perform the encoding process.
897 :param bufsize: Buffer size.
898 :returns: A tuple of ``(bytes encoded, errcode, bytes)``.
899 If finished with encoding return 1 for the error code.
900 Err codes are from :data:`.ImageFile.ERRORS`.
901 """
902 msg = "unavailable in base encoder"
903 raise NotImplementedError(msg)
905 def encode_to_pyfd(self) -> tuple[int, int]:
906 """
907 If ``pushes_fd`` is ``True``, then this method will be used,
908 and ``encode()`` will only be called once.
910 :returns: A tuple of ``(bytes consumed, errcode)``.
911 Err codes are from :data:`.ImageFile.ERRORS`.
912 """
913 if not self.pushes_fd:
914 return 0, -8 # bad configuration
915 bytes_consumed, errcode, data = self.encode(0)
916 if data:
917 assert self.fd is not None
918 self.fd.write(data)
919 return bytes_consumed, errcode
921 def encode_to_file(self, fh: int, bufsize: int) -> int:
922 """
923 :param fh: File handle.
924 :param bufsize: Buffer size.
926 :returns: If finished successfully, return 0.
927 Otherwise, return an error code. Err codes are from
928 :data:`.ImageFile.ERRORS`.
929 """
930 errcode = 0
931 while errcode == 0:
932 status, errcode, buf = self.encode(bufsize)
933 if status > 0:
934 os.write(fh, buf[status:])
935 return errcode