Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# The Python Imaging Library.
3# $Id$
4#
5# base class for image file handlers
6#
7# history:
8# 1995-09-09 fl Created
9# 1996-03-11 fl Fixed load mechanism.
10# 1996-04-15 fl Added pcx/xbm decoders.
11# 1996-04-30 fl Added encoders.
12# 1996-12-14 fl Added load helpers
13# 1997-01-11 fl Use encode_to_file where possible
14# 1997-08-27 fl Flush output in _save
15# 1998-03-05 fl Use memory mapping for some modes
16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B"
17# 1999-05-31 fl Added image parser
18# 2000-10-12 fl Set readonly flag on memory-mapped images
19# 2002-03-20 fl Use better messages for common decoder errors
20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available
21# 2003-10-30 fl Added StubImageFile class
22# 2004-02-25 fl Made incremental parser more robust
23#
24# Copyright (c) 1997-2004 by Secret Labs AB
25# Copyright (c) 1995-2004 by Fredrik Lundh
26#
27# See the README file for information on usage and redistribution.
28#
29from __future__ import annotations
31import abc
32import io
33import itertools
34import logging
35import os
36import struct
37from typing import IO, Any, NamedTuple, cast
39from . import ExifTags, Image
40from ._util import DeferredError, is_path
42TYPE_CHECKING = False
43if TYPE_CHECKING:
44 from ._typing import StrOrBytesPath
46logger = logging.getLogger(__name__)
48MAXBLOCK = 65536
49"""
50By default, Pillow processes image data in blocks. This helps to prevent excessive use
51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``.
53When reading an image, this is the number of bytes to read at once.
55When writing an image, this is the number of bytes to write at once.
56If the image width times 4 is greater, then that will be used instead.
57Plugins may also set a greater number.
59User code may set this to another number.
60"""
62SAFEBLOCK = 1024 * 1024
64LOAD_TRUNCATED_IMAGES = False
65"""Whether or not to load truncated image files. User code may change this."""
67ERRORS = {
68 -1: "image buffer overrun error",
69 -2: "decoding error",
70 -3: "unknown error",
71 -8: "bad configuration",
72 -9: "out of memory error",
73}
74"""
75Dict of known error codes returned from :meth:`.PyDecoder.decode`,
76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and
77:meth:`.PyEncoder.encode_to_file`.
78"""
81#
82# --------------------------------------------------------------------
83# Helpers
86def _get_oserror(error: int, *, encoder: bool) -> OSError:
87 try:
88 msg = Image.core.getcodecstatus(error)
89 except AttributeError:
90 msg = ERRORS.get(error)
91 if not msg:
92 msg = f"{'encoder' if encoder else 'decoder'} error {error}"
93 msg += f" when {'writing' if encoder else 'reading'} image file"
94 return OSError(msg)
97def _tilesort(t: _Tile) -> int:
98 # sort on offset
99 return t[2]
102class _Tile(NamedTuple):
103 codec_name: str
104 extents: tuple[int, int, int, int] | None
105 offset: int = 0
106 args: tuple[Any, ...] | str | None = None
109#
110# --------------------------------------------------------------------
111# ImageFile base class
114class ImageFile(Image.Image):
115 """Base class for image file format handlers."""
117 def __init__(
118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None
119 ) -> None:
120 super().__init__()
122 self._min_frame = 0
124 self.custom_mimetype: str | None = None
126 self.tile: list[_Tile] = []
127 """ A list of tile descriptors """
129 self.readonly = 1 # until we know better
131 self.decoderconfig: tuple[Any, ...] = ()
132 self.decodermaxblock = MAXBLOCK
134 self.fp: IO[bytes] | None
135 self._fp: IO[bytes] | DeferredError
136 if is_path(fp):
137 # filename
138 self.fp = open(fp, "rb")
139 self.filename = os.fspath(fp)
140 self._exclusive_fp = True
141 else:
142 # stream
143 self.fp = cast(IO[bytes], fp)
144 self.filename = filename if filename is not None else ""
145 # can be overridden
146 self._exclusive_fp = False
148 try:
149 try:
150 self._open()
151 except (
152 IndexError, # end of data
153 TypeError, # end of data (ord)
154 KeyError, # unsupported mode
155 EOFError, # got header but not the first frame
156 struct.error,
157 ) as v:
158 raise SyntaxError(v) from v
160 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0:
161 msg = "not identified by this driver"
162 raise SyntaxError(msg)
163 except BaseException:
164 # close the file only if we have opened it this constructor
165 if self._exclusive_fp:
166 self.fp.close()
167 raise
169 def _open(self) -> None:
170 pass
172 # Context manager support
173 def __enter__(self) -> ImageFile:
174 return self
176 def _close_fp(self) -> None:
177 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError):
178 if self._fp != self.fp:
179 self._fp.close()
180 self._fp = DeferredError(ValueError("Operation on closed image"))
181 if self.fp:
182 self.fp.close()
184 def __exit__(self, *args: object) -> None:
185 if getattr(self, "_exclusive_fp", False):
186 self._close_fp()
187 self.fp = None
189 def close(self) -> None:
190 """
191 Closes the file pointer, if possible.
193 This operation will destroy the image core and release its memory.
194 The image data will be unusable afterward.
196 This function is required to close images that have multiple frames or
197 have not had their file read and closed by the
198 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for
199 more information.
200 """
201 try:
202 self._close_fp()
203 self.fp = None
204 except Exception as msg:
205 logger.debug("Error closing: %s", msg)
207 super().close()
209 def get_child_images(self) -> list[ImageFile]:
210 child_images = []
211 exif = self.getexif()
212 ifds = []
213 if ExifTags.Base.SubIFDs in exif:
214 subifd_offsets = exif[ExifTags.Base.SubIFDs]
215 if subifd_offsets:
216 if not isinstance(subifd_offsets, tuple):
217 subifd_offsets = (subifd_offsets,)
218 for subifd_offset in subifd_offsets:
219 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset))
220 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1)
221 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset):
222 assert exif._info is not None
223 ifds.append((ifd1, exif._info.next))
225 offset = None
226 for ifd, ifd_offset in ifds:
227 assert self.fp is not None
228 current_offset = self.fp.tell()
229 if offset is None:
230 offset = current_offset
232 fp = self.fp
233 if ifd is not None:
234 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset)
235 if thumbnail_offset is not None:
236 thumbnail_offset += getattr(self, "_exif_offset", 0)
237 self.fp.seek(thumbnail_offset)
239 length = ifd.get(ExifTags.Base.JpegIFByteCount)
240 assert isinstance(length, int)
241 data = self.fp.read(length)
242 fp = io.BytesIO(data)
244 with Image.open(fp) as im:
245 from . import TiffImagePlugin
247 if thumbnail_offset is None and isinstance(
248 im, TiffImagePlugin.TiffImageFile
249 ):
250 im._frame_pos = [ifd_offset]
251 im._seek(0)
252 im.load()
253 child_images.append(im)
255 if offset is not None:
256 assert self.fp is not None
257 self.fp.seek(offset)
258 return child_images
260 def get_format_mimetype(self) -> str | None:
261 if self.custom_mimetype:
262 return self.custom_mimetype
263 if self.format is not None:
264 return Image.MIME.get(self.format.upper())
265 return None
267 def __getstate__(self) -> list[Any]:
268 return super().__getstate__() + [self.filename]
270 def __setstate__(self, state: list[Any]) -> None:
271 self.tile = []
272 if len(state) > 5:
273 self.filename = state[5]
274 super().__setstate__(state)
276 def verify(self) -> None:
277 """Check file integrity"""
279 # raise exception if something's wrong. must be called
280 # directly after open, and closes file when finished.
281 if self._exclusive_fp and self.fp:
282 self.fp.close()
283 self.fp = None
285 def load(self) -> Image.core.PixelAccess | None:
286 """Load image data based on tile list"""
288 if not self.tile and self._im is None:
289 msg = "cannot load this image"
290 raise OSError(msg)
292 pixel = Image.Image.load(self)
293 if not self.tile:
294 return pixel
296 self.map: mmap.mmap | None = None
297 use_mmap = self.filename and len(self.tile) == 1
299 assert self.fp is not None
300 readonly = 0
302 # look for read/seek overrides
303 if hasattr(self, "load_read"):
304 read = self.load_read
305 # don't use mmap if there are custom read/seek functions
306 use_mmap = False
307 else:
308 read = self.fp.read
310 if hasattr(self, "load_seek"):
311 seek = self.load_seek
312 use_mmap = False
313 else:
314 seek = self.fp.seek
316 if use_mmap:
317 # try memory mapping
318 decoder_name, extents, offset, args = self.tile[0]
319 if isinstance(args, str):
320 args = (args, 0, 1)
321 if (
322 decoder_name == "raw"
323 and isinstance(args, tuple)
324 and len(args) >= 3
325 and args[0] == self.mode
326 and args[0] in Image._MAPMODES
327 ):
328 if offset < 0:
329 msg = "Tile offset cannot be negative"
330 raise ValueError(msg)
331 try:
332 # use mmap, if possible
333 import mmap
335 with open(self.filename) as fp:
336 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
337 if offset + self.size[1] * args[1] > self.map.size():
338 msg = "buffer is not large enough"
339 raise OSError(msg)
340 self.im = Image.core.map_buffer(
341 self.map, self.size, decoder_name, offset, args
342 )
343 readonly = 1
344 # After trashing self.im,
345 # we might need to reload the palette data.
346 if self.palette:
347 self.palette.dirty = 1
348 except (AttributeError, OSError, ImportError):
349 self.map = None
351 self.load_prepare()
352 err_code = -3 # initialize to unknown error
353 if not self.map:
354 # sort tiles in file order
355 self.tile.sort(key=_tilesort)
357 # FIXME: This is a hack to handle TIFF's JpegTables tag.
358 prefix = getattr(self, "tile_prefix", b"")
360 # Remove consecutive duplicates that only differ by their offset
361 self.tile = [
362 list(tiles)[-1]
363 for _, tiles in itertools.groupby(
364 self.tile, lambda tile: (tile[0], tile[1], tile[3])
365 )
366 ]
367 for i, (decoder_name, extents, offset, args) in enumerate(self.tile):
368 seek(offset)
369 decoder = Image._getdecoder(
370 self.mode, decoder_name, args, self.decoderconfig
371 )
372 try:
373 decoder.setimage(self.im, extents)
374 if decoder.pulls_fd:
375 decoder.setfd(self.fp)
376 err_code = decoder.decode(b"")[1]
377 else:
378 b = prefix
379 while True:
380 read_bytes = self.decodermaxblock
381 if i + 1 < len(self.tile):
382 next_offset = self.tile[i + 1].offset
383 if next_offset > offset:
384 read_bytes = next_offset - offset
385 try:
386 s = read(read_bytes)
387 except (IndexError, struct.error) as e:
388 # truncated png/gif
389 if LOAD_TRUNCATED_IMAGES:
390 break
391 else:
392 msg = "image file is truncated"
393 raise OSError(msg) from e
395 if not s: # truncated jpeg
396 if LOAD_TRUNCATED_IMAGES:
397 break
398 else:
399 msg = (
400 "image file is truncated "
401 f"({len(b)} bytes not processed)"
402 )
403 raise OSError(msg)
405 b = b + s
406 n, err_code = decoder.decode(b)
407 if n < 0:
408 break
409 b = b[n:]
410 finally:
411 # Need to cleanup here to prevent leaks
412 decoder.cleanup()
414 self.tile = []
415 self.readonly = readonly
417 self.load_end()
419 if self._exclusive_fp and self._close_exclusive_fp_after_loading:
420 self.fp.close()
421 self.fp = None
423 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0:
424 # still raised if decoder fails to return anything
425 raise _get_oserror(err_code, encoder=False)
427 return Image.Image.load(self)
429 def load_prepare(self) -> None:
430 # create image memory if necessary
431 if self._im is None:
432 self.im = Image.core.new(self.mode, self.size)
433 # create palette (optional)
434 if self.mode == "P":
435 Image.Image.load(self)
437 def load_end(self) -> None:
438 # may be overridden
439 pass
441 # may be defined for contained formats
442 # def load_seek(self, pos: int) -> None:
443 # pass
445 # may be defined for blocked formats (e.g. PNG)
446 # def load_read(self, read_bytes: int) -> bytes:
447 # pass
449 def _seek_check(self, frame: int) -> bool:
450 if (
451 frame < self._min_frame
452 # Only check upper limit on frames if additional seek operations
453 # are not required to do so
454 or (
455 not (hasattr(self, "_n_frames") and self._n_frames is None)
456 and frame >= getattr(self, "n_frames") + self._min_frame
457 )
458 ):
459 msg = "attempt to seek outside sequence"
460 raise EOFError(msg)
462 return self.tell() != frame
465class StubHandler(abc.ABC):
466 def open(self, im: StubImageFile) -> None:
467 pass
469 @abc.abstractmethod
470 def load(self, im: StubImageFile) -> Image.Image:
471 pass
474class StubImageFile(ImageFile, metaclass=abc.ABCMeta):
475 """
476 Base class for stub image loaders.
478 A stub loader is an image loader that can identify files of a
479 certain format, but relies on external code to load the file.
480 """
482 @abc.abstractmethod
483 def _open(self) -> None:
484 pass
486 def load(self) -> Image.core.PixelAccess | None:
487 loader = self._load()
488 if loader is None:
489 msg = f"cannot find loader for this {self.format} file"
490 raise OSError(msg)
491 image = loader.load(self)
492 assert image is not None
493 # become the other object (!)
494 self.__class__ = image.__class__ # type: ignore[assignment]
495 self.__dict__ = image.__dict__
496 return image.load()
498 @abc.abstractmethod
499 def _load(self) -> StubHandler | None:
500 """(Hook) Find actual image loader."""
501 pass
504class Parser:
505 """
506 Incremental image parser. This class implements the standard
507 feed/close consumer interface.
508 """
510 incremental = None
511 image: Image.Image | None = None
512 data: bytes | None = None
513 decoder: Image.core.ImagingDecoder | PyDecoder | None = None
514 offset = 0
515 finished = 0
517 def reset(self) -> None:
518 """
519 (Consumer) Reset the parser. Note that you can only call this
520 method immediately after you've created a parser; parser
521 instances cannot be reused.
522 """
523 assert self.data is None, "cannot reuse parsers"
525 def feed(self, data: bytes) -> None:
526 """
527 (Consumer) Feed data to the parser.
529 :param data: A string buffer.
530 :exception OSError: If the parser failed to parse the image file.
531 """
532 # collect data
534 if self.finished:
535 return
537 if self.data is None:
538 self.data = data
539 else:
540 self.data = self.data + data
542 # parse what we have
543 if self.decoder:
544 if self.offset > 0:
545 # skip header
546 skip = min(len(self.data), self.offset)
547 self.data = self.data[skip:]
548 self.offset = self.offset - skip
549 if self.offset > 0 or not self.data:
550 return
552 n, e = self.decoder.decode(self.data)
554 if n < 0:
555 # end of stream
556 self.data = None
557 self.finished = 1
558 if e < 0:
559 # decoding error
560 self.image = None
561 raise _get_oserror(e, encoder=False)
562 else:
563 # end of image
564 return
565 self.data = self.data[n:]
567 elif self.image:
568 # if we end up here with no decoder, this file cannot
569 # be incrementally parsed. wait until we've gotten all
570 # available data
571 pass
573 else:
574 # attempt to open this file
575 try:
576 with io.BytesIO(self.data) as fp:
577 im = Image.open(fp)
578 except OSError:
579 pass # not enough data
580 else:
581 flag = hasattr(im, "load_seek") or hasattr(im, "load_read")
582 if flag or len(im.tile) != 1:
583 # custom load code, or multiple tiles
584 self.decode = None
585 else:
586 # initialize decoder
587 im.load_prepare()
588 d, e, o, a = im.tile[0]
589 im.tile = []
590 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig)
591 self.decoder.setimage(im.im, e)
593 # calculate decoder offset
594 self.offset = o
595 if self.offset <= len(self.data):
596 self.data = self.data[self.offset :]
597 self.offset = 0
599 self.image = im
601 def __enter__(self) -> Parser:
602 return self
604 def __exit__(self, *args: object) -> None:
605 self.close()
607 def close(self) -> Image.Image:
608 """
609 (Consumer) Close the stream.
611 :returns: An image object.
612 :exception OSError: If the parser failed to parse the image file either
613 because it cannot be identified or cannot be
614 decoded.
615 """
616 # finish decoding
617 if self.decoder:
618 # get rid of what's left in the buffers
619 self.feed(b"")
620 self.data = self.decoder = None
621 if not self.finished:
622 msg = "image was incomplete"
623 raise OSError(msg)
624 if not self.image:
625 msg = "cannot parse this image"
626 raise OSError(msg)
627 if self.data:
628 # incremental parsing not possible; reopen the file
629 # not that we have all data
630 with io.BytesIO(self.data) as fp:
631 try:
632 self.image = Image.open(fp)
633 finally:
634 self.image.load()
635 return self.image
638# --------------------------------------------------------------------
641def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None:
642 """Helper to save image based on tile list
644 :param im: Image object.
645 :param fp: File object.
646 :param tile: Tile list.
647 :param bufsize: Optional buffer size
648 """
650 im.load()
651 if not hasattr(im, "encoderconfig"):
652 im.encoderconfig = ()
653 tile.sort(key=_tilesort)
654 # FIXME: make MAXBLOCK a configuration parameter
655 # It would be great if we could have the encoder specify what it needs
656 # But, it would need at least the image size in most cases. RawEncode is
657 # a tricky case.
658 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c
659 try:
660 fh = fp.fileno()
661 fp.flush()
662 _encode_tile(im, fp, tile, bufsize, fh)
663 except (AttributeError, io.UnsupportedOperation) as exc:
664 _encode_tile(im, fp, tile, bufsize, None, exc)
665 if hasattr(fp, "flush"):
666 fp.flush()
669def _encode_tile(
670 im: Image.Image,
671 fp: IO[bytes],
672 tile: list[_Tile],
673 bufsize: int,
674 fh: int | None,
675 exc: BaseException | None = None,
676) -> None:
677 for encoder_name, extents, offset, args in tile:
678 if offset > 0:
679 fp.seek(offset)
680 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig)
681 try:
682 encoder.setimage(im.im, extents)
683 if encoder.pushes_fd:
684 encoder.setfd(fp)
685 errcode = encoder.encode_to_pyfd()[1]
686 else:
687 if exc:
688 # compress to Python file-compatible object
689 while True:
690 errcode, data = encoder.encode(bufsize)[1:]
691 fp.write(data)
692 if errcode:
693 break
694 else:
695 # slight speedup: compress to real file object
696 assert fh is not None
697 errcode = encoder.encode_to_file(fh, bufsize)
698 if errcode < 0:
699 raise _get_oserror(errcode, encoder=True) from exc
700 finally:
701 encoder.cleanup()
704def _safe_read(fp: IO[bytes], size: int) -> bytes:
705 """
706 Reads large blocks in a safe way. Unlike fp.read(n), this function
707 doesn't trust the user. If the requested size is larger than
708 SAFEBLOCK, the file is read block by block.
710 :param fp: File handle. Must implement a <b>read</b> method.
711 :param size: Number of bytes to read.
712 :returns: A string containing <i>size</i> bytes of data.
714 Raises an OSError if the file is truncated and the read cannot be completed
716 """
717 if size <= 0:
718 return b""
719 if size <= SAFEBLOCK:
720 data = fp.read(size)
721 if len(data) < size:
722 msg = "Truncated File Read"
723 raise OSError(msg)
724 return data
725 blocks: list[bytes] = []
726 remaining_size = size
727 while remaining_size > 0:
728 block = fp.read(min(remaining_size, SAFEBLOCK))
729 if not block:
730 break
731 blocks.append(block)
732 remaining_size -= len(block)
733 if sum(len(block) for block in blocks) < size:
734 msg = "Truncated File Read"
735 raise OSError(msg)
736 return b"".join(blocks)
739class PyCodecState:
740 def __init__(self) -> None:
741 self.xsize = 0
742 self.ysize = 0
743 self.xoff = 0
744 self.yoff = 0
746 def extents(self) -> tuple[int, int, int, int]:
747 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize
750class PyCodec:
751 fd: IO[bytes] | None
753 def __init__(self, mode: str, *args: Any) -> None:
754 self.im: Image.core.ImagingCore | None = None
755 self.state = PyCodecState()
756 self.fd = None
757 self.mode = mode
758 self.init(args)
760 def init(self, args: tuple[Any, ...]) -> None:
761 """
762 Override to perform codec specific initialization
764 :param args: Tuple of arg items from the tile entry
765 :returns: None
766 """
767 self.args = args
769 def cleanup(self) -> None:
770 """
771 Override to perform codec specific cleanup
773 :returns: None
774 """
775 pass
777 def setfd(self, fd: IO[bytes]) -> None:
778 """
779 Called from ImageFile to set the Python file-like object
781 :param fd: A Python file-like object
782 :returns: None
783 """
784 self.fd = fd
786 def setimage(
787 self,
788 im: Image.core.ImagingCore,
789 extents: tuple[int, int, int, int] | None = None,
790 ) -> None:
791 """
792 Called from ImageFile to set the core output image for the codec
794 :param im: A core image object
795 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle
796 for this tile
797 :returns: None
798 """
800 # following c code
801 self.im = im
803 if extents:
804 x0, y0, x1, y1 = extents
805 else:
806 x0, y0, x1, y1 = (0, 0, 0, 0)
808 if x0 == 0 and x1 == 0:
809 self.state.xsize, self.state.ysize = self.im.size
810 else:
811 self.state.xoff = x0
812 self.state.yoff = y0
813 self.state.xsize = x1 - x0
814 self.state.ysize = y1 - y0
816 if self.state.xsize <= 0 or self.state.ysize <= 0:
817 msg = "Size must be positive"
818 raise ValueError(msg)
820 if (
821 self.state.xsize + self.state.xoff > self.im.size[0]
822 or self.state.ysize + self.state.yoff > self.im.size[1]
823 ):
824 msg = "Tile cannot extend outside image"
825 raise ValueError(msg)
828class PyDecoder(PyCodec):
829 """
830 Python implementation of a format decoder. Override this class and
831 add the decoding logic in the :meth:`decode` method.
833 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
834 """
836 _pulls_fd = False
838 @property
839 def pulls_fd(self) -> bool:
840 return self._pulls_fd
842 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
843 """
844 Override to perform the decoding process.
846 :param buffer: A bytes object with the data to be decoded.
847 :returns: A tuple of ``(bytes consumed, errcode)``.
848 If finished with decoding return -1 for the bytes consumed.
849 Err codes are from :data:`.ImageFile.ERRORS`.
850 """
851 msg = "unavailable in base decoder"
852 raise NotImplementedError(msg)
854 def set_as_raw(
855 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = ()
856 ) -> None:
857 """
858 Convenience method to set the internal image from a stream of raw data
860 :param data: Bytes to be set
861 :param rawmode: The rawmode to be used for the decoder.
862 If not specified, it will default to the mode of the image
863 :param extra: Extra arguments for the decoder.
864 :returns: None
865 """
867 if not rawmode:
868 rawmode = self.mode
869 d = Image._getdecoder(self.mode, "raw", rawmode, extra)
870 assert self.im is not None
871 d.setimage(self.im, self.state.extents())
872 s = d.decode(data)
874 if s[0] >= 0:
875 msg = "not enough image data"
876 raise ValueError(msg)
877 if s[1] != 0:
878 msg = "cannot decode image data"
879 raise ValueError(msg)
882class PyEncoder(PyCodec):
883 """
884 Python implementation of a format encoder. Override this class and
885 add the decoding logic in the :meth:`encode` method.
887 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
888 """
890 _pushes_fd = False
892 @property
893 def pushes_fd(self) -> bool:
894 return self._pushes_fd
896 def encode(self, bufsize: int) -> tuple[int, int, bytes]:
897 """
898 Override to perform the encoding process.
900 :param bufsize: Buffer size.
901 :returns: A tuple of ``(bytes encoded, errcode, bytes)``.
902 If finished with encoding return 1 for the error code.
903 Err codes are from :data:`.ImageFile.ERRORS`.
904 """
905 msg = "unavailable in base encoder"
906 raise NotImplementedError(msg)
908 def encode_to_pyfd(self) -> tuple[int, int]:
909 """
910 If ``pushes_fd`` is ``True``, then this method will be used,
911 and ``encode()`` will only be called once.
913 :returns: A tuple of ``(bytes consumed, errcode)``.
914 Err codes are from :data:`.ImageFile.ERRORS`.
915 """
916 if not self.pushes_fd:
917 return 0, -8 # bad configuration
918 bytes_consumed, errcode, data = self.encode(0)
919 if data:
920 assert self.fd is not None
921 self.fd.write(data)
922 return bytes_consumed, errcode
924 def encode_to_file(self, fh: int, bufsize: int) -> int:
925 """
926 :param fh: File handle.
927 :param bufsize: Buffer size.
929 :returns: If finished successfully, return 0.
930 Otherwise, return an error code. Err codes are from
931 :data:`.ImageFile.ERRORS`.
932 """
933 errcode = 0
934 while errcode == 0:
935 status, errcode, buf = self.encode(bufsize)
936 if status > 0:
937 os.write(fh, buf[status:])
938 return errcode