Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 57%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# The Python Imaging Library.
3# $Id$
4#
5# base class for image file handlers
6#
7# history:
8# 1995-09-09 fl Created
9# 1996-03-11 fl Fixed load mechanism.
10# 1996-04-15 fl Added pcx/xbm decoders.
11# 1996-04-30 fl Added encoders.
12# 1996-12-14 fl Added load helpers
13# 1997-01-11 fl Use encode_to_file where possible
14# 1997-08-27 fl Flush output in _save
15# 1998-03-05 fl Use memory mapping for some modes
16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B"
17# 1999-05-31 fl Added image parser
18# 2000-10-12 fl Set readonly flag on memory-mapped images
19# 2002-03-20 fl Use better messages for common decoder errors
20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available
21# 2003-10-30 fl Added StubImageFile class
22# 2004-02-25 fl Made incremental parser more robust
23#
24# Copyright (c) 1997-2004 by Secret Labs AB
25# Copyright (c) 1995-2004 by Fredrik Lundh
26#
27# See the README file for information on usage and redistribution.
28#
29from __future__ import annotations
31import abc
32import io
33import itertools
34import logging
35import os
36import struct
37from typing import IO, Any, NamedTuple, cast
39from . import ExifTags, Image
40from ._util import DeferredError, is_path
42TYPE_CHECKING = False
43if TYPE_CHECKING:
44 from ._typing import StrOrBytesPath
46logger = logging.getLogger(__name__)
48MAXBLOCK = 65536
49"""
50By default, Pillow processes image data in blocks. This helps to prevent excessive use
51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``.
53When reading an image, this is the number of bytes to read at once.
55When writing an image, this is the number of bytes to write at once.
56If the image width times 4 is greater, then that will be used instead.
57Plugins may also set a greater number.
59User code may set this to another number.
60"""
62SAFEBLOCK = 1024 * 1024
64LOAD_TRUNCATED_IMAGES = False
65"""Whether or not to load truncated image files. User code may change this."""
67ERRORS = {
68 -1: "image buffer overrun error",
69 -2: "decoding error",
70 -3: "unknown error",
71 -8: "bad configuration",
72 -9: "out of memory error",
73}
74"""
75Dict of known error codes returned from :meth:`.PyDecoder.decode`,
76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and
77:meth:`.PyEncoder.encode_to_file`.
78"""
81#
82# --------------------------------------------------------------------
83# Helpers
86def _get_oserror(error: int, *, encoder: bool) -> OSError:
87 try:
88 msg = Image.core.getcodecstatus(error)
89 except AttributeError:
90 msg = ERRORS.get(error)
91 if not msg:
92 msg = f"{'encoder' if encoder else 'decoder'} error {error}"
93 msg += f" when {'writing' if encoder else 'reading'} image file"
94 return OSError(msg)
97def _tilesort(t: _Tile) -> int:
98 # sort on offset
99 return t[2]
102class _Tile(NamedTuple):
103 codec_name: str
104 extents: tuple[int, int, int, int] | None
105 offset: int = 0
106 args: tuple[Any, ...] | str | None = None
109#
110# --------------------------------------------------------------------
111# ImageFile base class
114class ImageFile(Image.Image):
115 """Base class for image file format handlers."""
117 def __init__(
118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None
119 ) -> None:
120 super().__init__()
122 self._min_frame = 0
124 self.custom_mimetype: str | None = None
126 self.tile: list[_Tile] = []
127 """ A list of tile descriptors """
129 self.readonly = 1 # until we know better
131 self.decoderconfig: tuple[Any, ...] = ()
132 self.decodermaxblock = MAXBLOCK
134 self.fp: IO[bytes] | None
135 self._fp: IO[bytes] | DeferredError
136 if is_path(fp):
137 # filename
138 self.fp = open(fp, "rb")
139 self.filename = os.fspath(fp)
140 self._exclusive_fp = True
141 else:
142 # stream
143 self.fp = cast(IO[bytes], fp)
144 self.filename = filename if filename is not None else ""
145 # can be overridden
146 self._exclusive_fp = False
148 try:
149 try:
150 self._open()
151 except (
152 IndexError, # end of data
153 TypeError, # end of data (ord)
154 KeyError, # unsupported mode
155 EOFError, # got header but not the first frame
156 struct.error,
157 ) as v:
158 raise SyntaxError(v) from v
160 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0:
161 msg = "not identified by this driver"
162 raise SyntaxError(msg)
163 except BaseException:
164 # close the file only if we have opened it this constructor
165 if self._exclusive_fp:
166 self.fp.close()
167 raise
169 def _open(self) -> None:
170 pass
172 # Context manager support
173 def __enter__(self) -> ImageFile:
174 return self
176 def _close_fp(self) -> None:
177 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError):
178 if self._fp != self.fp:
179 self._fp.close()
180 self._fp = DeferredError(ValueError("Operation on closed image"))
181 if self.fp:
182 self.fp.close()
184 def __exit__(self, *args: object) -> None:
185 if getattr(self, "_exclusive_fp", False):
186 self._close_fp()
187 self.fp = None
189 def close(self) -> None:
190 """
191 Closes the file pointer, if possible.
193 This operation will destroy the image core and release its memory.
194 The image data will be unusable afterward.
196 This function is required to close images that have multiple frames or
197 have not had their file read and closed by the
198 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for
199 more information.
200 """
201 try:
202 self._close_fp()
203 self.fp = None
204 except Exception as msg:
205 logger.debug("Error closing: %s", msg)
207 super().close()
209 def get_child_images(self) -> list[ImageFile]:
210 child_images = []
211 exif = self.getexif()
212 ifds = []
213 if ExifTags.Base.SubIFDs in exif:
214 subifd_offsets = exif[ExifTags.Base.SubIFDs]
215 if subifd_offsets:
216 if not isinstance(subifd_offsets, tuple):
217 subifd_offsets = (subifd_offsets,)
218 for subifd_offset in subifd_offsets:
219 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset))
220 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1)
221 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset):
222 assert exif._info is not None
223 ifds.append((ifd1, exif._info.next))
225 offset = None
226 for ifd, ifd_offset in ifds:
227 assert self.fp is not None
228 current_offset = self.fp.tell()
229 if offset is None:
230 offset = current_offset
232 fp = self.fp
233 if ifd is not None:
234 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset)
235 if thumbnail_offset is not None:
236 thumbnail_offset += getattr(self, "_exif_offset", 0)
237 self.fp.seek(thumbnail_offset)
239 length = ifd.get(ExifTags.Base.JpegIFByteCount)
240 assert isinstance(length, int)
241 data = self.fp.read(length)
242 fp = io.BytesIO(data)
244 with Image.open(fp) as im:
245 from . import TiffImagePlugin
247 if thumbnail_offset is None and isinstance(
248 im, TiffImagePlugin.TiffImageFile
249 ):
250 im._frame_pos = [ifd_offset]
251 im._seek(0)
252 im.load()
253 child_images.append(im)
255 if offset is not None:
256 assert self.fp is not None
257 self.fp.seek(offset)
258 return child_images
260 def get_format_mimetype(self) -> str | None:
261 if self.custom_mimetype:
262 return self.custom_mimetype
263 if self.format is not None:
264 return Image.MIME.get(self.format.upper())
265 return None
267 def __getstate__(self) -> list[Any]:
268 return super().__getstate__() + [self.filename]
270 def __setstate__(self, state: list[Any]) -> None:
271 self.tile = []
272 if len(state) > 5:
273 self.filename = state[5]
274 super().__setstate__(state)
276 def verify(self) -> None:
277 """Check file integrity"""
279 # raise exception if something's wrong. must be called
280 # directly after open, and closes file when finished.
281 if self._exclusive_fp and self.fp:
282 self.fp.close()
283 self.fp = None
285 def load(self) -> Image.core.PixelAccess | None:
286 """Load image data based on tile list"""
288 if not self.tile and self._im is None:
289 msg = "cannot load this image"
290 raise OSError(msg)
292 pixel = Image.Image.load(self)
293 if not self.tile:
294 return pixel
296 self.map: mmap.mmap | None = None
297 use_mmap = self.filename and len(self.tile) == 1
299 assert self.fp is not None
300 readonly = 0
302 # look for read/seek overrides
303 if hasattr(self, "load_read"):
304 read = self.load_read
305 # don't use mmap if there are custom read/seek functions
306 use_mmap = False
307 else:
308 read = self.fp.read
310 if hasattr(self, "load_seek"):
311 seek = self.load_seek
312 use_mmap = False
313 else:
314 seek = self.fp.seek
316 if use_mmap:
317 # try memory mapping
318 decoder_name, extents, offset, args = self.tile[0]
319 if isinstance(args, str):
320 args = (args, 0, 1)
321 if (
322 decoder_name == "raw"
323 and isinstance(args, tuple)
324 and len(args) >= 3
325 and args[0] == self.mode
326 and args[0] in Image._MAPMODES
327 ):
328 if offset < 0:
329 msg = "Tile offset cannot be negative"
330 raise ValueError(msg)
331 try:
332 # use mmap, if possible
333 import mmap
335 with open(self.filename) as fp:
336 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
337 if offset + self.size[1] * args[1] > self.map.size():
338 msg = "buffer is not large enough"
339 raise OSError(msg)
340 self.im = Image.core.map_buffer(
341 self.map, self.size, decoder_name, offset, args
342 )
343 readonly = 1
344 # After trashing self.im,
345 # we might need to reload the palette data.
346 if self.palette:
347 self.palette.dirty = 1
348 except (AttributeError, OSError, ImportError):
349 self.map = None
351 self.load_prepare()
352 err_code = -3 # initialize to unknown error
353 if not self.map:
354 # sort tiles in file order
355 self.tile.sort(key=_tilesort)
357 # FIXME: This is a hack to handle TIFF's JpegTables tag.
358 prefix = getattr(self, "tile_prefix", b"")
360 # Remove consecutive duplicates that only differ by their offset
361 self.tile = [
362 list(tiles)[-1]
363 for _, tiles in itertools.groupby(
364 self.tile, lambda tile: (tile[0], tile[1], tile[3])
365 )
366 ]
367 for i, (decoder_name, extents, offset, args) in enumerate(self.tile):
368 seek(offset)
369 decoder = Image._getdecoder(
370 self.mode, decoder_name, args, self.decoderconfig
371 )
372 try:
373 decoder.setimage(self.im, extents)
374 if decoder.pulls_fd:
375 decoder.setfd(self.fp)
376 err_code = decoder.decode(b"")[1]
377 else:
378 b = prefix
379 while True:
380 read_bytes = self.decodermaxblock
381 if i + 1 < len(self.tile):
382 next_offset = self.tile[i + 1].offset
383 if next_offset > offset:
384 read_bytes = next_offset - offset
385 try:
386 s = read(read_bytes)
387 except (IndexError, struct.error) as e:
388 # truncated png/gif
389 if LOAD_TRUNCATED_IMAGES:
390 break
391 else:
392 msg = "image file is truncated"
393 raise OSError(msg) from e
395 if not s: # truncated jpeg
396 if LOAD_TRUNCATED_IMAGES:
397 break
398 else:
399 msg = (
400 "image file is truncated "
401 f"({len(b)} bytes not processed)"
402 )
403 raise OSError(msg)
405 b = b + s
406 n, err_code = decoder.decode(b)
407 if n < 0:
408 break
409 b = b[n:]
410 finally:
411 # Need to cleanup here to prevent leaks
412 decoder.cleanup()
414 self.tile = []
415 self.readonly = readonly
417 self.load_end()
419 if self._exclusive_fp and self._close_exclusive_fp_after_loading:
420 self.fp.close()
421 self.fp = None
423 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0:
424 # still raised if decoder fails to return anything
425 raise _get_oserror(err_code, encoder=False)
427 return Image.Image.load(self)
429 def load_prepare(self) -> None:
430 # create image memory if necessary
431 if self._im is None:
432 self.im = Image.core.new(self.mode, self.size)
433 # create palette (optional)
434 if self.mode == "P":
435 Image.Image.load(self)
437 def load_end(self) -> None:
438 # may be overridden
439 pass
441 # may be defined for contained formats
442 # def load_seek(self, pos: int) -> None:
443 # pass
445 # may be defined for blocked formats (e.g. PNG)
446 # def load_read(self, read_bytes: int) -> bytes:
447 # pass
449 def _seek_check(self, frame: int) -> bool:
450 if (
451 frame < self._min_frame
452 # Only check upper limit on frames if additional seek operations
453 # are not required to do so
454 or (
455 not (hasattr(self, "_n_frames") and self._n_frames is None)
456 and frame >= getattr(self, "n_frames") + self._min_frame
457 )
458 ):
459 msg = "attempt to seek outside sequence"
460 raise EOFError(msg)
462 return self.tell() != frame
465class StubHandler(abc.ABC):
466 def open(self, im: StubImageFile) -> None:
467 pass
469 @abc.abstractmethod
470 def load(self, im: StubImageFile) -> Image.Image:
471 pass
474class StubImageFile(ImageFile, metaclass=abc.ABCMeta):
475 """
476 Base class for stub image loaders.
478 A stub loader is an image loader that can identify files of a
479 certain format, but relies on external code to load the file.
480 """
482 @abc.abstractmethod
483 def _open(self) -> None:
484 pass
486 def load(self) -> Image.core.PixelAccess | None:
487 loader = self._load()
488 if loader is None:
489 msg = f"cannot find loader for this {self.format} file"
490 raise OSError(msg)
491 image = loader.load(self)
492 assert image is not None
493 # become the other object (!)
494 self.__class__ = image.__class__ # type: ignore[assignment]
495 self.__dict__ = image.__dict__
496 return image.load()
498 @abc.abstractmethod
499 def _load(self) -> StubHandler | None:
500 """(Hook) Find actual image loader."""
501 pass
504class Parser:
505 """
506 Incremental image parser. This class implements the standard
507 feed/close consumer interface.
508 """
510 incremental = None
511 image: Image.Image | None = None
512 data: bytes | None = None
513 decoder: Image.core.ImagingDecoder | PyDecoder | None = None
514 offset = 0
515 finished = 0
517 def reset(self) -> None:
518 """
519 (Consumer) Reset the parser. Note that you can only call this
520 method immediately after you've created a parser; parser
521 instances cannot be reused.
522 """
523 assert self.data is None, "cannot reuse parsers"
525 def feed(self, data: bytes) -> None:
526 """
527 (Consumer) Feed data to the parser.
529 :param data: A string buffer.
530 :exception OSError: If the parser failed to parse the image file.
531 """
532 # collect data
534 if self.finished:
535 return
537 if self.data is None:
538 self.data = data
539 else:
540 self.data = self.data + data
542 # parse what we have
543 if self.decoder:
544 if self.offset > 0:
545 # skip header
546 skip = min(len(self.data), self.offset)
547 self.data = self.data[skip:]
548 self.offset = self.offset - skip
549 if self.offset > 0 or not self.data:
550 return
552 n, e = self.decoder.decode(self.data)
554 if n < 0:
555 # end of stream
556 self.data = None
557 self.finished = 1
558 if e < 0:
559 # decoding error
560 self.image = None
561 raise _get_oserror(e, encoder=False)
562 else:
563 # end of image
564 return
565 self.data = self.data[n:]
567 elif self.image:
568 # if we end up here with no decoder, this file cannot
569 # be incrementally parsed. wait until we've gotten all
570 # available data
571 pass
573 else:
574 # attempt to open this file
575 try:
576 with io.BytesIO(self.data) as fp:
577 im = Image.open(fp)
578 except OSError:
579 pass # not enough data
580 else:
581 flag = hasattr(im, "load_seek") or hasattr(im, "load_read")
582 if not flag and len(im.tile) == 1:
583 # initialize decoder
584 im.load_prepare()
585 d, e, o, a = im.tile[0]
586 im.tile = []
587 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig)
588 self.decoder.setimage(im.im, e)
590 # calculate decoder offset
591 self.offset = o
592 if self.offset <= len(self.data):
593 self.data = self.data[self.offset :]
594 self.offset = 0
596 self.image = im
598 def __enter__(self) -> Parser:
599 return self
601 def __exit__(self, *args: object) -> None:
602 self.close()
604 def close(self) -> Image.Image:
605 """
606 (Consumer) Close the stream.
608 :returns: An image object.
609 :exception OSError: If the parser failed to parse the image file either
610 because it cannot be identified or cannot be
611 decoded.
612 """
613 # finish decoding
614 if self.decoder:
615 # get rid of what's left in the buffers
616 self.feed(b"")
617 self.data = self.decoder = None
618 if not self.finished:
619 msg = "image was incomplete"
620 raise OSError(msg)
621 if not self.image:
622 msg = "cannot parse this image"
623 raise OSError(msg)
624 if self.data:
625 # incremental parsing not possible; reopen the file
626 # not that we have all data
627 with io.BytesIO(self.data) as fp:
628 try:
629 self.image = Image.open(fp)
630 finally:
631 self.image.load()
632 return self.image
635# --------------------------------------------------------------------
638def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None:
639 """Helper to save image based on tile list
641 :param im: Image object.
642 :param fp: File object.
643 :param tile: Tile list.
644 :param bufsize: Optional buffer size
645 """
647 im.load()
648 if not hasattr(im, "encoderconfig"):
649 im.encoderconfig = ()
650 tile.sort(key=_tilesort)
651 # FIXME: make MAXBLOCK a configuration parameter
652 # It would be great if we could have the encoder specify what it needs
653 # But, it would need at least the image size in most cases. RawEncode is
654 # a tricky case.
655 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c
656 try:
657 fh = fp.fileno()
658 fp.flush()
659 _encode_tile(im, fp, tile, bufsize, fh)
660 except (AttributeError, io.UnsupportedOperation) as exc:
661 _encode_tile(im, fp, tile, bufsize, None, exc)
662 if hasattr(fp, "flush"):
663 fp.flush()
666def _encode_tile(
667 im: Image.Image,
668 fp: IO[bytes],
669 tile: list[_Tile],
670 bufsize: int,
671 fh: int | None,
672 exc: BaseException | None = None,
673) -> None:
674 for encoder_name, extents, offset, args in tile:
675 if offset > 0:
676 fp.seek(offset)
677 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig)
678 try:
679 encoder.setimage(im.im, extents)
680 if encoder.pushes_fd:
681 encoder.setfd(fp)
682 errcode = encoder.encode_to_pyfd()[1]
683 else:
684 if exc:
685 # compress to Python file-compatible object
686 while True:
687 errcode, data = encoder.encode(bufsize)[1:]
688 fp.write(data)
689 if errcode:
690 break
691 else:
692 # slight speedup: compress to real file object
693 assert fh is not None
694 errcode = encoder.encode_to_file(fh, bufsize)
695 if errcode < 0:
696 raise _get_oserror(errcode, encoder=True) from exc
697 finally:
698 encoder.cleanup()
701def _safe_read(fp: IO[bytes], size: int) -> bytes:
702 """
703 Reads large blocks in a safe way. Unlike fp.read(n), this function
704 doesn't trust the user. If the requested size is larger than
705 SAFEBLOCK, the file is read block by block.
707 :param fp: File handle. Must implement a <b>read</b> method.
708 :param size: Number of bytes to read.
709 :returns: A string containing <i>size</i> bytes of data.
711 Raises an OSError if the file is truncated and the read cannot be completed
713 """
714 if size <= 0:
715 return b""
716 if size <= SAFEBLOCK:
717 data = fp.read(size)
718 if len(data) < size:
719 msg = "Truncated File Read"
720 raise OSError(msg)
721 return data
722 blocks: list[bytes] = []
723 remaining_size = size
724 while remaining_size > 0:
725 block = fp.read(min(remaining_size, SAFEBLOCK))
726 if not block:
727 break
728 blocks.append(block)
729 remaining_size -= len(block)
730 if sum(len(block) for block in blocks) < size:
731 msg = "Truncated File Read"
732 raise OSError(msg)
733 return b"".join(blocks)
736class PyCodecState:
737 def __init__(self) -> None:
738 self.xsize = 0
739 self.ysize = 0
740 self.xoff = 0
741 self.yoff = 0
743 def extents(self) -> tuple[int, int, int, int]:
744 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize
747class PyCodec:
748 fd: IO[bytes] | None
750 def __init__(self, mode: str, *args: Any) -> None:
751 self.im: Image.core.ImagingCore | None = None
752 self.state = PyCodecState()
753 self.fd = None
754 self.mode = mode
755 self.init(args)
757 def init(self, args: tuple[Any, ...]) -> None:
758 """
759 Override to perform codec specific initialization
761 :param args: Tuple of arg items from the tile entry
762 :returns: None
763 """
764 self.args = args
766 def cleanup(self) -> None:
767 """
768 Override to perform codec specific cleanup
770 :returns: None
771 """
772 pass
774 def setfd(self, fd: IO[bytes]) -> None:
775 """
776 Called from ImageFile to set the Python file-like object
778 :param fd: A Python file-like object
779 :returns: None
780 """
781 self.fd = fd
783 def setimage(
784 self,
785 im: Image.core.ImagingCore,
786 extents: tuple[int, int, int, int] | None = None,
787 ) -> None:
788 """
789 Called from ImageFile to set the core output image for the codec
791 :param im: A core image object
792 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle
793 for this tile
794 :returns: None
795 """
797 # following c code
798 self.im = im
800 if extents:
801 x0, y0, x1, y1 = extents
802 else:
803 x0, y0, x1, y1 = (0, 0, 0, 0)
805 if x0 == 0 and x1 == 0:
806 self.state.xsize, self.state.ysize = self.im.size
807 else:
808 self.state.xoff = x0
809 self.state.yoff = y0
810 self.state.xsize = x1 - x0
811 self.state.ysize = y1 - y0
813 if self.state.xsize <= 0 or self.state.ysize <= 0:
814 msg = "Size must be positive"
815 raise ValueError(msg)
817 if (
818 self.state.xsize + self.state.xoff > self.im.size[0]
819 or self.state.ysize + self.state.yoff > self.im.size[1]
820 ):
821 msg = "Tile cannot extend outside image"
822 raise ValueError(msg)
825class PyDecoder(PyCodec):
826 """
827 Python implementation of a format decoder. Override this class and
828 add the decoding logic in the :meth:`decode` method.
830 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
831 """
833 _pulls_fd = False
835 @property
836 def pulls_fd(self) -> bool:
837 return self._pulls_fd
839 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
840 """
841 Override to perform the decoding process.
843 :param buffer: A bytes object with the data to be decoded.
844 :returns: A tuple of ``(bytes consumed, errcode)``.
845 If finished with decoding return -1 for the bytes consumed.
846 Err codes are from :data:`.ImageFile.ERRORS`.
847 """
848 msg = "unavailable in base decoder"
849 raise NotImplementedError(msg)
851 def set_as_raw(
852 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = ()
853 ) -> None:
854 """
855 Convenience method to set the internal image from a stream of raw data
857 :param data: Bytes to be set
858 :param rawmode: The rawmode to be used for the decoder.
859 If not specified, it will default to the mode of the image
860 :param extra: Extra arguments for the decoder.
861 :returns: None
862 """
864 if not rawmode:
865 rawmode = self.mode
866 d = Image._getdecoder(self.mode, "raw", rawmode, extra)
867 assert self.im is not None
868 d.setimage(self.im, self.state.extents())
869 s = d.decode(data)
871 if s[0] >= 0:
872 msg = "not enough image data"
873 raise ValueError(msg)
874 if s[1] != 0:
875 msg = "cannot decode image data"
876 raise ValueError(msg)
879class PyEncoder(PyCodec):
880 """
881 Python implementation of a format encoder. Override this class and
882 add the decoding logic in the :meth:`encode` method.
884 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
885 """
887 _pushes_fd = False
889 @property
890 def pushes_fd(self) -> bool:
891 return self._pushes_fd
893 def encode(self, bufsize: int) -> tuple[int, int, bytes]:
894 """
895 Override to perform the encoding process.
897 :param bufsize: Buffer size.
898 :returns: A tuple of ``(bytes encoded, errcode, bytes)``.
899 If finished with encoding return 1 for the error code.
900 Err codes are from :data:`.ImageFile.ERRORS`.
901 """
902 msg = "unavailable in base encoder"
903 raise NotImplementedError(msg)
905 def encode_to_pyfd(self) -> tuple[int, int]:
906 """
907 If ``pushes_fd`` is ``True``, then this method will be used,
908 and ``encode()`` will only be called once.
910 :returns: A tuple of ``(bytes consumed, errcode)``.
911 Err codes are from :data:`.ImageFile.ERRORS`.
912 """
913 if not self.pushes_fd:
914 return 0, -8 # bad configuration
915 bytes_consumed, errcode, data = self.encode(0)
916 if data:
917 assert self.fd is not None
918 self.fd.write(data)
919 return bytes_consumed, errcode
921 def encode_to_file(self, fh: int, bufsize: int) -> int:
922 """
923 :param fh: File handle.
924 :param bufsize: Buffer size.
926 :returns: If finished successfully, return 0.
927 Otherwise, return an error code. Err codes are from
928 :data:`.ImageFile.ERRORS`.
929 """
930 errcode = 0
931 while errcode == 0:
932 status, errcode, buf = self.encode(bufsize)
933 if status > 0:
934 os.write(fh, buf[status:])
935 return errcode