Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# The Python Imaging Library.
3# $Id$
4#
5# base class for image file handlers
6#
7# history:
8# 1995-09-09 fl Created
9# 1996-03-11 fl Fixed load mechanism.
10# 1996-04-15 fl Added pcx/xbm decoders.
11# 1996-04-30 fl Added encoders.
12# 1996-12-14 fl Added load helpers
13# 1997-01-11 fl Use encode_to_file where possible
14# 1997-08-27 fl Flush output in _save
15# 1998-03-05 fl Use memory mapping for some modes
16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B"
17# 1999-05-31 fl Added image parser
18# 2000-10-12 fl Set readonly flag on memory-mapped images
19# 2002-03-20 fl Use better messages for common decoder errors
20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available
21# 2003-10-30 fl Added StubImageFile class
22# 2004-02-25 fl Made incremental parser more robust
23#
24# Copyright (c) 1997-2004 by Secret Labs AB
25# Copyright (c) 1995-2004 by Fredrik Lundh
26#
27# See the README file for information on usage and redistribution.
28#
29from __future__ import annotations
31import abc
32import io
33import itertools
34import logging
35import os
36import struct
37from typing import IO, Any, NamedTuple, cast
39from . import ExifTags, Image
40from ._util import DeferredError, is_path
42TYPE_CHECKING = False
43if TYPE_CHECKING:
44 from ._typing import StrOrBytesPath
46logger = logging.getLogger(__name__)
48MAXBLOCK = 65536
49"""
50By default, Pillow processes image data in blocks. This helps to prevent excessive use
51of resources. Codecs may disable this behaviour with ``_pulls_fd`` or ``_pushes_fd``.
53When reading an image, this is the number of bytes to read at once.
55When writing an image, this is the number of bytes to write at once.
56If the image width times 4 is greater, then that will be used instead.
57Plugins may also set a greater number.
59User code may set this to another number.
60"""
62SAFEBLOCK = 1024 * 1024
64LOAD_TRUNCATED_IMAGES = False
65"""Whether or not to load truncated image files. User code may change this."""
67ERRORS = {
68 -1: "image buffer overrun error",
69 -2: "decoding error",
70 -3: "unknown error",
71 -8: "bad configuration",
72 -9: "out of memory error",
73}
74"""
75Dict of known error codes returned from :meth:`.PyDecoder.decode`,
76:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and
77:meth:`.PyEncoder.encode_to_file`.
78"""
81#
82# --------------------------------------------------------------------
83# Helpers
86def _get_oserror(error: int, *, encoder: bool) -> OSError:
87 try:
88 msg = Image.core.getcodecstatus(error)
89 except AttributeError:
90 msg = ERRORS.get(error)
91 if not msg:
92 msg = f"{'encoder' if encoder else 'decoder'} error {error}"
93 msg += f" when {'writing' if encoder else 'reading'} image file"
94 return OSError(msg)
97def _tilesort(t: _Tile) -> int:
98 # sort on offset
99 return t[2]
102class _Tile(NamedTuple):
103 codec_name: str
104 extents: tuple[int, int, int, int] | None
105 offset: int = 0
106 args: tuple[Any, ...] | str | None = None
109#
110# --------------------------------------------------------------------
111# ImageFile base class
114class ImageFile(Image.Image):
115 """Base class for image file format handlers."""
117 def __init__(
118 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None
119 ) -> None:
120 super().__init__()
122 self._min_frame = 0
124 self.custom_mimetype: str | None = None
126 self.tile: list[_Tile] = []
127 """ A list of tile descriptors """
129 self.readonly = 1 # until we know better
131 self.decoderconfig: tuple[Any, ...] = ()
132 self.decodermaxblock = MAXBLOCK
134 if is_path(fp):
135 # filename
136 self.fp = open(fp, "rb")
137 self.filename = os.fspath(fp)
138 self._exclusive_fp = True
139 else:
140 # stream
141 self.fp = cast(IO[bytes], fp)
142 self.filename = filename if filename is not None else ""
143 # can be overridden
144 self._exclusive_fp = False
146 try:
147 try:
148 self._open()
149 except (
150 IndexError, # end of data
151 TypeError, # end of data (ord)
152 KeyError, # unsupported mode
153 EOFError, # got header but not the first frame
154 struct.error,
155 ) as v:
156 raise SyntaxError(v) from v
158 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0:
159 msg = "not identified by this driver"
160 raise SyntaxError(msg)
161 except BaseException:
162 # close the file only if we have opened it this constructor
163 if self._exclusive_fp:
164 self.fp.close()
165 raise
167 def _open(self) -> None:
168 pass
170 def _close_fp(self):
171 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError):
172 if self._fp != self.fp:
173 self._fp.close()
174 self._fp = DeferredError(ValueError("Operation on closed image"))
175 if self.fp:
176 self.fp.close()
178 def close(self) -> None:
179 """
180 Closes the file pointer, if possible.
182 This operation will destroy the image core and release its memory.
183 The image data will be unusable afterward.
185 This function is required to close images that have multiple frames or
186 have not had their file read and closed by the
187 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for
188 more information.
189 """
190 try:
191 self._close_fp()
192 self.fp = None
193 except Exception as msg:
194 logger.debug("Error closing: %s", msg)
196 super().close()
198 def get_child_images(self) -> list[ImageFile]:
199 child_images = []
200 exif = self.getexif()
201 ifds = []
202 if ExifTags.Base.SubIFDs in exif:
203 subifd_offsets = exif[ExifTags.Base.SubIFDs]
204 if subifd_offsets:
205 if not isinstance(subifd_offsets, tuple):
206 subifd_offsets = (subifd_offsets,)
207 for subifd_offset in subifd_offsets:
208 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset))
209 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1)
210 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset):
211 assert exif._info is not None
212 ifds.append((ifd1, exif._info.next))
214 offset = None
215 for ifd, ifd_offset in ifds:
216 assert self.fp is not None
217 current_offset = self.fp.tell()
218 if offset is None:
219 offset = current_offset
221 fp = self.fp
222 if ifd is not None:
223 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset)
224 if thumbnail_offset is not None:
225 thumbnail_offset += getattr(self, "_exif_offset", 0)
226 self.fp.seek(thumbnail_offset)
228 length = ifd.get(ExifTags.Base.JpegIFByteCount)
229 assert isinstance(length, int)
230 data = self.fp.read(length)
231 fp = io.BytesIO(data)
233 with Image.open(fp) as im:
234 from . import TiffImagePlugin
236 if thumbnail_offset is None and isinstance(
237 im, TiffImagePlugin.TiffImageFile
238 ):
239 im._frame_pos = [ifd_offset]
240 im._seek(0)
241 im.load()
242 child_images.append(im)
244 if offset is not None:
245 assert self.fp is not None
246 self.fp.seek(offset)
247 return child_images
249 def get_format_mimetype(self) -> str | None:
250 if self.custom_mimetype:
251 return self.custom_mimetype
252 if self.format is not None:
253 return Image.MIME.get(self.format.upper())
254 return None
256 def __getstate__(self) -> list[Any]:
257 return super().__getstate__() + [self.filename]
259 def __setstate__(self, state: list[Any]) -> None:
260 self.tile = []
261 if len(state) > 5:
262 self.filename = state[5]
263 super().__setstate__(state)
265 def verify(self) -> None:
266 """Check file integrity"""
268 # raise exception if something's wrong. must be called
269 # directly after open, and closes file when finished.
270 if self._exclusive_fp:
271 self.fp.close()
272 self.fp = None
274 def load(self) -> Image.core.PixelAccess | None:
275 """Load image data based on tile list"""
277 if not self.tile and self._im is None:
278 msg = "cannot load this image"
279 raise OSError(msg)
281 pixel = Image.Image.load(self)
282 if not self.tile:
283 return pixel
285 self.map: mmap.mmap | None = None
286 use_mmap = self.filename and len(self.tile) == 1
288 readonly = 0
290 # look for read/seek overrides
291 if hasattr(self, "load_read"):
292 read = self.load_read
293 # don't use mmap if there are custom read/seek functions
294 use_mmap = False
295 else:
296 read = self.fp.read
298 if hasattr(self, "load_seek"):
299 seek = self.load_seek
300 use_mmap = False
301 else:
302 seek = self.fp.seek
304 if use_mmap:
305 # try memory mapping
306 decoder_name, extents, offset, args = self.tile[0]
307 if isinstance(args, str):
308 args = (args, 0, 1)
309 if (
310 decoder_name == "raw"
311 and isinstance(args, tuple)
312 and len(args) >= 3
313 and args[0] == self.mode
314 and args[0] in Image._MAPMODES
315 ):
316 try:
317 # use mmap, if possible
318 import mmap
320 with open(self.filename) as fp:
321 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
322 if offset + self.size[1] * args[1] > self.map.size():
323 msg = "buffer is not large enough"
324 raise OSError(msg)
325 self.im = Image.core.map_buffer(
326 self.map, self.size, decoder_name, offset, args
327 )
328 readonly = 1
329 # After trashing self.im,
330 # we might need to reload the palette data.
331 if self.palette:
332 self.palette.dirty = 1
333 except (AttributeError, OSError, ImportError):
334 self.map = None
336 self.load_prepare()
337 err_code = -3 # initialize to unknown error
338 if not self.map:
339 # sort tiles in file order
340 self.tile.sort(key=_tilesort)
342 # FIXME: This is a hack to handle TIFF's JpegTables tag.
343 prefix = getattr(self, "tile_prefix", b"")
345 # Remove consecutive duplicates that only differ by their offset
346 self.tile = [
347 list(tiles)[-1]
348 for _, tiles in itertools.groupby(
349 self.tile, lambda tile: (tile[0], tile[1], tile[3])
350 )
351 ]
352 for i, (decoder_name, extents, offset, args) in enumerate(self.tile):
353 seek(offset)
354 decoder = Image._getdecoder(
355 self.mode, decoder_name, args, self.decoderconfig
356 )
357 try:
358 decoder.setimage(self.im, extents)
359 if decoder.pulls_fd:
360 decoder.setfd(self.fp)
361 err_code = decoder.decode(b"")[1]
362 else:
363 b = prefix
364 while True:
365 read_bytes = self.decodermaxblock
366 if i + 1 < len(self.tile):
367 next_offset = self.tile[i + 1].offset
368 if next_offset > offset:
369 read_bytes = next_offset - offset
370 try:
371 s = read(read_bytes)
372 except (IndexError, struct.error) as e:
373 # truncated png/gif
374 if LOAD_TRUNCATED_IMAGES:
375 break
376 else:
377 msg = "image file is truncated"
378 raise OSError(msg) from e
380 if not s: # truncated jpeg
381 if LOAD_TRUNCATED_IMAGES:
382 break
383 else:
384 msg = (
385 "image file is truncated "
386 f"({len(b)} bytes not processed)"
387 )
388 raise OSError(msg)
390 b = b + s
391 n, err_code = decoder.decode(b)
392 if n < 0:
393 break
394 b = b[n:]
395 finally:
396 # Need to cleanup here to prevent leaks
397 decoder.cleanup()
399 self.tile = []
400 self.readonly = readonly
402 self.load_end()
404 if self._exclusive_fp and self._close_exclusive_fp_after_loading:
405 self.fp.close()
406 self.fp = None
408 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0:
409 # still raised if decoder fails to return anything
410 raise _get_oserror(err_code, encoder=False)
412 return Image.Image.load(self)
414 def load_prepare(self) -> None:
415 # create image memory if necessary
416 if self._im is None:
417 self.im = Image.core.new(self.mode, self.size)
418 # create palette (optional)
419 if self.mode == "P":
420 Image.Image.load(self)
422 def load_end(self) -> None:
423 # may be overridden
424 pass
426 # may be defined for contained formats
427 # def load_seek(self, pos: int) -> None:
428 # pass
430 # may be defined for blocked formats (e.g. PNG)
431 # def load_read(self, read_bytes: int) -> bytes:
432 # pass
434 def _seek_check(self, frame: int) -> bool:
435 if (
436 frame < self._min_frame
437 # Only check upper limit on frames if additional seek operations
438 # are not required to do so
439 or (
440 not (hasattr(self, "_n_frames") and self._n_frames is None)
441 and frame >= getattr(self, "n_frames") + self._min_frame
442 )
443 ):
444 msg = "attempt to seek outside sequence"
445 raise EOFError(msg)
447 return self.tell() != frame
450class StubHandler(abc.ABC):
451 def open(self, im: StubImageFile) -> None:
452 pass
454 @abc.abstractmethod
455 def load(self, im: StubImageFile) -> Image.Image:
456 pass
459class StubImageFile(ImageFile, metaclass=abc.ABCMeta):
460 """
461 Base class for stub image loaders.
463 A stub loader is an image loader that can identify files of a
464 certain format, but relies on external code to load the file.
465 """
467 @abc.abstractmethod
468 def _open(self) -> None:
469 pass
471 def load(self) -> Image.core.PixelAccess | None:
472 loader = self._load()
473 if loader is None:
474 msg = f"cannot find loader for this {self.format} file"
475 raise OSError(msg)
476 image = loader.load(self)
477 assert image is not None
478 # become the other object (!)
479 self.__class__ = image.__class__ # type: ignore[assignment]
480 self.__dict__ = image.__dict__
481 return image.load()
483 @abc.abstractmethod
484 def _load(self) -> StubHandler | None:
485 """(Hook) Find actual image loader."""
486 pass
489class Parser:
490 """
491 Incremental image parser. This class implements the standard
492 feed/close consumer interface.
493 """
495 incremental = None
496 image: Image.Image | None = None
497 data: bytes | None = None
498 decoder: Image.core.ImagingDecoder | PyDecoder | None = None
499 offset = 0
500 finished = 0
502 def reset(self) -> None:
503 """
504 (Consumer) Reset the parser. Note that you can only call this
505 method immediately after you've created a parser; parser
506 instances cannot be reused.
507 """
508 assert self.data is None, "cannot reuse parsers"
510 def feed(self, data: bytes) -> None:
511 """
512 (Consumer) Feed data to the parser.
514 :param data: A string buffer.
515 :exception OSError: If the parser failed to parse the image file.
516 """
517 # collect data
519 if self.finished:
520 return
522 if self.data is None:
523 self.data = data
524 else:
525 self.data = self.data + data
527 # parse what we have
528 if self.decoder:
529 if self.offset > 0:
530 # skip header
531 skip = min(len(self.data), self.offset)
532 self.data = self.data[skip:]
533 self.offset = self.offset - skip
534 if self.offset > 0 or not self.data:
535 return
537 n, e = self.decoder.decode(self.data)
539 if n < 0:
540 # end of stream
541 self.data = None
542 self.finished = 1
543 if e < 0:
544 # decoding error
545 self.image = None
546 raise _get_oserror(e, encoder=False)
547 else:
548 # end of image
549 return
550 self.data = self.data[n:]
552 elif self.image:
553 # if we end up here with no decoder, this file cannot
554 # be incrementally parsed. wait until we've gotten all
555 # available data
556 pass
558 else:
559 # attempt to open this file
560 try:
561 with io.BytesIO(self.data) as fp:
562 im = Image.open(fp)
563 except OSError:
564 pass # not enough data
565 else:
566 flag = hasattr(im, "load_seek") or hasattr(im, "load_read")
567 if flag or len(im.tile) != 1:
568 # custom load code, or multiple tiles
569 self.decode = None
570 else:
571 # initialize decoder
572 im.load_prepare()
573 d, e, o, a = im.tile[0]
574 im.tile = []
575 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig)
576 self.decoder.setimage(im.im, e)
578 # calculate decoder offset
579 self.offset = o
580 if self.offset <= len(self.data):
581 self.data = self.data[self.offset :]
582 self.offset = 0
584 self.image = im
586 def __enter__(self) -> Parser:
587 return self
589 def __exit__(self, *args: object) -> None:
590 self.close()
592 def close(self) -> Image.Image:
593 """
594 (Consumer) Close the stream.
596 :returns: An image object.
597 :exception OSError: If the parser failed to parse the image file either
598 because it cannot be identified or cannot be
599 decoded.
600 """
601 # finish decoding
602 if self.decoder:
603 # get rid of what's left in the buffers
604 self.feed(b"")
605 self.data = self.decoder = None
606 if not self.finished:
607 msg = "image was incomplete"
608 raise OSError(msg)
609 if not self.image:
610 msg = "cannot parse this image"
611 raise OSError(msg)
612 if self.data:
613 # incremental parsing not possible; reopen the file
614 # not that we have all data
615 with io.BytesIO(self.data) as fp:
616 try:
617 self.image = Image.open(fp)
618 finally:
619 self.image.load()
620 return self.image
623# --------------------------------------------------------------------
626def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None:
627 """Helper to save image based on tile list
629 :param im: Image object.
630 :param fp: File object.
631 :param tile: Tile list.
632 :param bufsize: Optional buffer size
633 """
635 im.load()
636 if not hasattr(im, "encoderconfig"):
637 im.encoderconfig = ()
638 tile.sort(key=_tilesort)
639 # FIXME: make MAXBLOCK a configuration parameter
640 # It would be great if we could have the encoder specify what it needs
641 # But, it would need at least the image size in most cases. RawEncode is
642 # a tricky case.
643 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c
644 try:
645 fh = fp.fileno()
646 fp.flush()
647 _encode_tile(im, fp, tile, bufsize, fh)
648 except (AttributeError, io.UnsupportedOperation) as exc:
649 _encode_tile(im, fp, tile, bufsize, None, exc)
650 if hasattr(fp, "flush"):
651 fp.flush()
654def _encode_tile(
655 im: Image.Image,
656 fp: IO[bytes],
657 tile: list[_Tile],
658 bufsize: int,
659 fh: int | None,
660 exc: BaseException | None = None,
661) -> None:
662 for encoder_name, extents, offset, args in tile:
663 if offset > 0:
664 fp.seek(offset)
665 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig)
666 try:
667 encoder.setimage(im.im, extents)
668 if encoder.pushes_fd:
669 encoder.setfd(fp)
670 errcode = encoder.encode_to_pyfd()[1]
671 else:
672 if exc:
673 # compress to Python file-compatible object
674 while True:
675 errcode, data = encoder.encode(bufsize)[1:]
676 fp.write(data)
677 if errcode:
678 break
679 else:
680 # slight speedup: compress to real file object
681 assert fh is not None
682 errcode = encoder.encode_to_file(fh, bufsize)
683 if errcode < 0:
684 raise _get_oserror(errcode, encoder=True) from exc
685 finally:
686 encoder.cleanup()
689def _safe_read(fp: IO[bytes], size: int) -> bytes:
690 """
691 Reads large blocks in a safe way. Unlike fp.read(n), this function
692 doesn't trust the user. If the requested size is larger than
693 SAFEBLOCK, the file is read block by block.
695 :param fp: File handle. Must implement a <b>read</b> method.
696 :param size: Number of bytes to read.
697 :returns: A string containing <i>size</i> bytes of data.
699 Raises an OSError if the file is truncated and the read cannot be completed
701 """
702 if size <= 0:
703 return b""
704 if size <= SAFEBLOCK:
705 data = fp.read(size)
706 if len(data) < size:
707 msg = "Truncated File Read"
708 raise OSError(msg)
709 return data
710 blocks: list[bytes] = []
711 remaining_size = size
712 while remaining_size > 0:
713 block = fp.read(min(remaining_size, SAFEBLOCK))
714 if not block:
715 break
716 blocks.append(block)
717 remaining_size -= len(block)
718 if sum(len(block) for block in blocks) < size:
719 msg = "Truncated File Read"
720 raise OSError(msg)
721 return b"".join(blocks)
724class PyCodecState:
725 def __init__(self) -> None:
726 self.xsize = 0
727 self.ysize = 0
728 self.xoff = 0
729 self.yoff = 0
731 def extents(self) -> tuple[int, int, int, int]:
732 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize
735class PyCodec:
736 fd: IO[bytes] | None
738 def __init__(self, mode: str, *args: Any) -> None:
739 self.im: Image.core.ImagingCore | None = None
740 self.state = PyCodecState()
741 self.fd = None
742 self.mode = mode
743 self.init(args)
745 def init(self, args: tuple[Any, ...]) -> None:
746 """
747 Override to perform codec specific initialization
749 :param args: Tuple of arg items from the tile entry
750 :returns: None
751 """
752 self.args = args
754 def cleanup(self) -> None:
755 """
756 Override to perform codec specific cleanup
758 :returns: None
759 """
760 pass
762 def setfd(self, fd: IO[bytes]) -> None:
763 """
764 Called from ImageFile to set the Python file-like object
766 :param fd: A Python file-like object
767 :returns: None
768 """
769 self.fd = fd
771 def setimage(
772 self,
773 im: Image.core.ImagingCore,
774 extents: tuple[int, int, int, int] | None = None,
775 ) -> None:
776 """
777 Called from ImageFile to set the core output image for the codec
779 :param im: A core image object
780 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle
781 for this tile
782 :returns: None
783 """
785 # following c code
786 self.im = im
788 if extents:
789 (x0, y0, x1, y1) = extents
790 else:
791 (x0, y0, x1, y1) = (0, 0, 0, 0)
793 if x0 == 0 and x1 == 0:
794 self.state.xsize, self.state.ysize = self.im.size
795 else:
796 self.state.xoff = x0
797 self.state.yoff = y0
798 self.state.xsize = x1 - x0
799 self.state.ysize = y1 - y0
801 if self.state.xsize <= 0 or self.state.ysize <= 0:
802 msg = "Size cannot be negative"
803 raise ValueError(msg)
805 if (
806 self.state.xsize + self.state.xoff > self.im.size[0]
807 or self.state.ysize + self.state.yoff > self.im.size[1]
808 ):
809 msg = "Tile cannot extend outside image"
810 raise ValueError(msg)
813class PyDecoder(PyCodec):
814 """
815 Python implementation of a format decoder. Override this class and
816 add the decoding logic in the :meth:`decode` method.
818 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
819 """
821 _pulls_fd = False
823 @property
824 def pulls_fd(self) -> bool:
825 return self._pulls_fd
827 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
828 """
829 Override to perform the decoding process.
831 :param buffer: A bytes object with the data to be decoded.
832 :returns: A tuple of ``(bytes consumed, errcode)``.
833 If finished with decoding return -1 for the bytes consumed.
834 Err codes are from :data:`.ImageFile.ERRORS`.
835 """
836 msg = "unavailable in base decoder"
837 raise NotImplementedError(msg)
839 def set_as_raw(
840 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = ()
841 ) -> None:
842 """
843 Convenience method to set the internal image from a stream of raw data
845 :param data: Bytes to be set
846 :param rawmode: The rawmode to be used for the decoder.
847 If not specified, it will default to the mode of the image
848 :param extra: Extra arguments for the decoder.
849 :returns: None
850 """
852 if not rawmode:
853 rawmode = self.mode
854 d = Image._getdecoder(self.mode, "raw", rawmode, extra)
855 assert self.im is not None
856 d.setimage(self.im, self.state.extents())
857 s = d.decode(data)
859 if s[0] >= 0:
860 msg = "not enough image data"
861 raise ValueError(msg)
862 if s[1] != 0:
863 msg = "cannot decode image data"
864 raise ValueError(msg)
867class PyEncoder(PyCodec):
868 """
869 Python implementation of a format encoder. Override this class and
870 add the decoding logic in the :meth:`encode` method.
872 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
873 """
875 _pushes_fd = False
877 @property
878 def pushes_fd(self) -> bool:
879 return self._pushes_fd
881 def encode(self, bufsize: int) -> tuple[int, int, bytes]:
882 """
883 Override to perform the encoding process.
885 :param bufsize: Buffer size.
886 :returns: A tuple of ``(bytes encoded, errcode, bytes)``.
887 If finished with encoding return 1 for the error code.
888 Err codes are from :data:`.ImageFile.ERRORS`.
889 """
890 msg = "unavailable in base encoder"
891 raise NotImplementedError(msg)
893 def encode_to_pyfd(self) -> tuple[int, int]:
894 """
895 If ``pushes_fd`` is ``True``, then this method will be used,
896 and ``encode()`` will only be called once.
898 :returns: A tuple of ``(bytes consumed, errcode)``.
899 Err codes are from :data:`.ImageFile.ERRORS`.
900 """
901 if not self.pushes_fd:
902 return 0, -8 # bad configuration
903 bytes_consumed, errcode, data = self.encode(0)
904 if data:
905 assert self.fd is not None
906 self.fd.write(data)
907 return bytes_consumed, errcode
909 def encode_to_file(self, fh: int, bufsize: int) -> int:
910 """
911 :param fh: File handle.
912 :param bufsize: Buffer size.
914 :returns: If finished successfully, return 0.
915 Otherwise, return an error code. Err codes are from
916 :data:`.ImageFile.ERRORS`.
917 """
918 errcode = 0
919 while errcode == 0:
920 status, errcode, buf = self.encode(bufsize)
921 if status > 0:
922 os.write(fh, buf[status:])
923 return errcode