Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/PIL/ImageFile.py: 56%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1#
2# The Python Imaging Library.
3# $Id$
4#
5# base class for image file handlers
6#
7# history:
8# 1995-09-09 fl Created
9# 1996-03-11 fl Fixed load mechanism.
10# 1996-04-15 fl Added pcx/xbm decoders.
11# 1996-04-30 fl Added encoders.
12# 1996-12-14 fl Added load helpers
13# 1997-01-11 fl Use encode_to_file where possible
14# 1997-08-27 fl Flush output in _save
15# 1998-03-05 fl Use memory mapping for some modes
16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B"
17# 1999-05-31 fl Added image parser
18# 2000-10-12 fl Set readonly flag on memory-mapped images
19# 2002-03-20 fl Use better messages for common decoder errors
20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available
21# 2003-10-30 fl Added StubImageFile class
22# 2004-02-25 fl Made incremental parser more robust
23#
24# Copyright (c) 1997-2004 by Secret Labs AB
25# Copyright (c) 1995-2004 by Fredrik Lundh
26#
27# See the README file for information on usage and redistribution.
28#
29from __future__ import annotations
31import abc
32import io
33import itertools
34import logging
35import os
36import struct
37from typing import IO, Any, NamedTuple, cast
39from . import ExifTags, Image
40from ._util import DeferredError, is_path
42TYPE_CHECKING = False
43if TYPE_CHECKING:
44 from ._typing import StrOrBytesPath
46logger = logging.getLogger(__name__)
48MAXBLOCK = 65536
50SAFEBLOCK = 1024 * 1024
52LOAD_TRUNCATED_IMAGES = False
53"""Whether or not to load truncated image files. User code may change this."""
55ERRORS = {
56 -1: "image buffer overrun error",
57 -2: "decoding error",
58 -3: "unknown error",
59 -8: "bad configuration",
60 -9: "out of memory error",
61}
62"""
63Dict of known error codes returned from :meth:`.PyDecoder.decode`,
64:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and
65:meth:`.PyEncoder.encode_to_file`.
66"""
69#
70# --------------------------------------------------------------------
71# Helpers
74def _get_oserror(error: int, *, encoder: bool) -> OSError:
75 try:
76 msg = Image.core.getcodecstatus(error)
77 except AttributeError:
78 msg = ERRORS.get(error)
79 if not msg:
80 msg = f"{'encoder' if encoder else 'decoder'} error {error}"
81 msg += f" when {'writing' if encoder else 'reading'} image file"
82 return OSError(msg)
85def _tilesort(t: _Tile) -> int:
86 # sort on offset
87 return t[2]
90class _Tile(NamedTuple):
91 codec_name: str
92 extents: tuple[int, int, int, int] | None
93 offset: int = 0
94 args: tuple[Any, ...] | str | None = None
97#
98# --------------------------------------------------------------------
99# ImageFile base class
102class ImageFile(Image.Image):
103 """Base class for image file format handlers."""
105 def __init__(
106 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None
107 ) -> None:
108 super().__init__()
110 self._min_frame = 0
112 self.custom_mimetype: str | None = None
114 self.tile: list[_Tile] = []
115 """ A list of tile descriptors """
117 self.readonly = 1 # until we know better
119 self.decoderconfig: tuple[Any, ...] = ()
120 self.decodermaxblock = MAXBLOCK
122 if is_path(fp):
123 # filename
124 self.fp = open(fp, "rb")
125 self.filename = os.fspath(fp)
126 self._exclusive_fp = True
127 else:
128 # stream
129 self.fp = cast(IO[bytes], fp)
130 self.filename = filename if filename is not None else ""
131 # can be overridden
132 self._exclusive_fp = False
134 try:
135 try:
136 self._open()
137 except (
138 IndexError, # end of data
139 TypeError, # end of data (ord)
140 KeyError, # unsupported mode
141 EOFError, # got header but not the first frame
142 struct.error,
143 ) as v:
144 raise SyntaxError(v) from v
146 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0:
147 msg = "not identified by this driver"
148 raise SyntaxError(msg)
149 except BaseException:
150 # close the file only if we have opened it this constructor
151 if self._exclusive_fp:
152 self.fp.close()
153 raise
155 def _open(self) -> None:
156 pass
158 def _close_fp(self):
159 if getattr(self, "_fp", False) and not isinstance(self._fp, DeferredError):
160 if self._fp != self.fp:
161 self._fp.close()
162 self._fp = DeferredError(ValueError("Operation on closed image"))
163 if self.fp:
164 self.fp.close()
166 def close(self) -> None:
167 """
168 Closes the file pointer, if possible.
170 This operation will destroy the image core and release its memory.
171 The image data will be unusable afterward.
173 This function is required to close images that have multiple frames or
174 have not had their file read and closed by the
175 :py:meth:`~PIL.Image.Image.load` method. See :ref:`file-handling` for
176 more information.
177 """
178 try:
179 self._close_fp()
180 self.fp = None
181 except Exception as msg:
182 logger.debug("Error closing: %s", msg)
184 super().close()
186 def get_child_images(self) -> list[ImageFile]:
187 child_images = []
188 exif = self.getexif()
189 ifds = []
190 if ExifTags.Base.SubIFDs in exif:
191 subifd_offsets = exif[ExifTags.Base.SubIFDs]
192 if subifd_offsets:
193 if not isinstance(subifd_offsets, tuple):
194 subifd_offsets = (subifd_offsets,)
195 for subifd_offset in subifd_offsets:
196 ifds.append((exif._get_ifd_dict(subifd_offset), subifd_offset))
197 ifd1 = exif.get_ifd(ExifTags.IFD.IFD1)
198 if ifd1 and ifd1.get(ExifTags.Base.JpegIFOffset):
199 assert exif._info is not None
200 ifds.append((ifd1, exif._info.next))
202 offset = None
203 for ifd, ifd_offset in ifds:
204 assert self.fp is not None
205 current_offset = self.fp.tell()
206 if offset is None:
207 offset = current_offset
209 fp = self.fp
210 if ifd is not None:
211 thumbnail_offset = ifd.get(ExifTags.Base.JpegIFOffset)
212 if thumbnail_offset is not None:
213 thumbnail_offset += getattr(self, "_exif_offset", 0)
214 self.fp.seek(thumbnail_offset)
216 length = ifd.get(ExifTags.Base.JpegIFByteCount)
217 assert isinstance(length, int)
218 data = self.fp.read(length)
219 fp = io.BytesIO(data)
221 with Image.open(fp) as im:
222 from . import TiffImagePlugin
224 if thumbnail_offset is None and isinstance(
225 im, TiffImagePlugin.TiffImageFile
226 ):
227 im._frame_pos = [ifd_offset]
228 im._seek(0)
229 im.load()
230 child_images.append(im)
232 if offset is not None:
233 assert self.fp is not None
234 self.fp.seek(offset)
235 return child_images
237 def get_format_mimetype(self) -> str | None:
238 if self.custom_mimetype:
239 return self.custom_mimetype
240 if self.format is not None:
241 return Image.MIME.get(self.format.upper())
242 return None
244 def __getstate__(self) -> list[Any]:
245 return super().__getstate__() + [self.filename]
247 def __setstate__(self, state: list[Any]) -> None:
248 self.tile = []
249 if len(state) > 5:
250 self.filename = state[5]
251 super().__setstate__(state)
253 def verify(self) -> None:
254 """Check file integrity"""
256 # raise exception if something's wrong. must be called
257 # directly after open, and closes file when finished.
258 if self._exclusive_fp:
259 self.fp.close()
260 self.fp = None
262 def load(self) -> Image.core.PixelAccess | None:
263 """Load image data based on tile list"""
265 if not self.tile and self._im is None:
266 msg = "cannot load this image"
267 raise OSError(msg)
269 pixel = Image.Image.load(self)
270 if not self.tile:
271 return pixel
273 self.map: mmap.mmap | None = None
274 use_mmap = self.filename and len(self.tile) == 1
276 readonly = 0
278 # look for read/seek overrides
279 if hasattr(self, "load_read"):
280 read = self.load_read
281 # don't use mmap if there are custom read/seek functions
282 use_mmap = False
283 else:
284 read = self.fp.read
286 if hasattr(self, "load_seek"):
287 seek = self.load_seek
288 use_mmap = False
289 else:
290 seek = self.fp.seek
292 if use_mmap:
293 # try memory mapping
294 decoder_name, extents, offset, args = self.tile[0]
295 if isinstance(args, str):
296 args = (args, 0, 1)
297 if (
298 decoder_name == "raw"
299 and isinstance(args, tuple)
300 and len(args) >= 3
301 and args[0] == self.mode
302 and args[0] in Image._MAPMODES
303 ):
304 try:
305 # use mmap, if possible
306 import mmap
308 with open(self.filename) as fp:
309 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ)
310 if offset + self.size[1] * args[1] > self.map.size():
311 msg = "buffer is not large enough"
312 raise OSError(msg)
313 self.im = Image.core.map_buffer(
314 self.map, self.size, decoder_name, offset, args
315 )
316 readonly = 1
317 # After trashing self.im,
318 # we might need to reload the palette data.
319 if self.palette:
320 self.palette.dirty = 1
321 except (AttributeError, OSError, ImportError):
322 self.map = None
324 self.load_prepare()
325 err_code = -3 # initialize to unknown error
326 if not self.map:
327 # sort tiles in file order
328 self.tile.sort(key=_tilesort)
330 # FIXME: This is a hack to handle TIFF's JpegTables tag.
331 prefix = getattr(self, "tile_prefix", b"")
333 # Remove consecutive duplicates that only differ by their offset
334 self.tile = [
335 list(tiles)[-1]
336 for _, tiles in itertools.groupby(
337 self.tile, lambda tile: (tile[0], tile[1], tile[3])
338 )
339 ]
340 for i, (decoder_name, extents, offset, args) in enumerate(self.tile):
341 seek(offset)
342 decoder = Image._getdecoder(
343 self.mode, decoder_name, args, self.decoderconfig
344 )
345 try:
346 decoder.setimage(self.im, extents)
347 if decoder.pulls_fd:
348 decoder.setfd(self.fp)
349 err_code = decoder.decode(b"")[1]
350 else:
351 b = prefix
352 while True:
353 read_bytes = self.decodermaxblock
354 if i + 1 < len(self.tile):
355 next_offset = self.tile[i + 1].offset
356 if next_offset > offset:
357 read_bytes = next_offset - offset
358 try:
359 s = read(read_bytes)
360 except (IndexError, struct.error) as e:
361 # truncated png/gif
362 if LOAD_TRUNCATED_IMAGES:
363 break
364 else:
365 msg = "image file is truncated"
366 raise OSError(msg) from e
368 if not s: # truncated jpeg
369 if LOAD_TRUNCATED_IMAGES:
370 break
371 else:
372 msg = (
373 "image file is truncated "
374 f"({len(b)} bytes not processed)"
375 )
376 raise OSError(msg)
378 b = b + s
379 n, err_code = decoder.decode(b)
380 if n < 0:
381 break
382 b = b[n:]
383 finally:
384 # Need to cleanup here to prevent leaks
385 decoder.cleanup()
387 self.tile = []
388 self.readonly = readonly
390 self.load_end()
392 if self._exclusive_fp and self._close_exclusive_fp_after_loading:
393 self.fp.close()
394 self.fp = None
396 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0:
397 # still raised if decoder fails to return anything
398 raise _get_oserror(err_code, encoder=False)
400 return Image.Image.load(self)
402 def load_prepare(self) -> None:
403 # create image memory if necessary
404 if self._im is None:
405 self.im = Image.core.new(self.mode, self.size)
406 # create palette (optional)
407 if self.mode == "P":
408 Image.Image.load(self)
410 def load_end(self) -> None:
411 # may be overridden
412 pass
414 # may be defined for contained formats
415 # def load_seek(self, pos: int) -> None:
416 # pass
418 # may be defined for blocked formats (e.g. PNG)
419 # def load_read(self, read_bytes: int) -> bytes:
420 # pass
422 def _seek_check(self, frame: int) -> bool:
423 if (
424 frame < self._min_frame
425 # Only check upper limit on frames if additional seek operations
426 # are not required to do so
427 or (
428 not (hasattr(self, "_n_frames") and self._n_frames is None)
429 and frame >= getattr(self, "n_frames") + self._min_frame
430 )
431 ):
432 msg = "attempt to seek outside sequence"
433 raise EOFError(msg)
435 return self.tell() != frame
438class StubHandler(abc.ABC):
439 def open(self, im: StubImageFile) -> None:
440 pass
442 @abc.abstractmethod
443 def load(self, im: StubImageFile) -> Image.Image:
444 pass
447class StubImageFile(ImageFile, metaclass=abc.ABCMeta):
448 """
449 Base class for stub image loaders.
451 A stub loader is an image loader that can identify files of a
452 certain format, but relies on external code to load the file.
453 """
455 @abc.abstractmethod
456 def _open(self) -> None:
457 pass
459 def load(self) -> Image.core.PixelAccess | None:
460 loader = self._load()
461 if loader is None:
462 msg = f"cannot find loader for this {self.format} file"
463 raise OSError(msg)
464 image = loader.load(self)
465 assert image is not None
466 # become the other object (!)
467 self.__class__ = image.__class__ # type: ignore[assignment]
468 self.__dict__ = image.__dict__
469 return image.load()
471 @abc.abstractmethod
472 def _load(self) -> StubHandler | None:
473 """(Hook) Find actual image loader."""
474 pass
477class Parser:
478 """
479 Incremental image parser. This class implements the standard
480 feed/close consumer interface.
481 """
483 incremental = None
484 image: Image.Image | None = None
485 data: bytes | None = None
486 decoder: Image.core.ImagingDecoder | PyDecoder | None = None
487 offset = 0
488 finished = 0
490 def reset(self) -> None:
491 """
492 (Consumer) Reset the parser. Note that you can only call this
493 method immediately after you've created a parser; parser
494 instances cannot be reused.
495 """
496 assert self.data is None, "cannot reuse parsers"
498 def feed(self, data: bytes) -> None:
499 """
500 (Consumer) Feed data to the parser.
502 :param data: A string buffer.
503 :exception OSError: If the parser failed to parse the image file.
504 """
505 # collect data
507 if self.finished:
508 return
510 if self.data is None:
511 self.data = data
512 else:
513 self.data = self.data + data
515 # parse what we have
516 if self.decoder:
517 if self.offset > 0:
518 # skip header
519 skip = min(len(self.data), self.offset)
520 self.data = self.data[skip:]
521 self.offset = self.offset - skip
522 if self.offset > 0 or not self.data:
523 return
525 n, e = self.decoder.decode(self.data)
527 if n < 0:
528 # end of stream
529 self.data = None
530 self.finished = 1
531 if e < 0:
532 # decoding error
533 self.image = None
534 raise _get_oserror(e, encoder=False)
535 else:
536 # end of image
537 return
538 self.data = self.data[n:]
540 elif self.image:
541 # if we end up here with no decoder, this file cannot
542 # be incrementally parsed. wait until we've gotten all
543 # available data
544 pass
546 else:
547 # attempt to open this file
548 try:
549 with io.BytesIO(self.data) as fp:
550 im = Image.open(fp)
551 except OSError:
552 pass # not enough data
553 else:
554 flag = hasattr(im, "load_seek") or hasattr(im, "load_read")
555 if flag or len(im.tile) != 1:
556 # custom load code, or multiple tiles
557 self.decode = None
558 else:
559 # initialize decoder
560 im.load_prepare()
561 d, e, o, a = im.tile[0]
562 im.tile = []
563 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig)
564 self.decoder.setimage(im.im, e)
566 # calculate decoder offset
567 self.offset = o
568 if self.offset <= len(self.data):
569 self.data = self.data[self.offset :]
570 self.offset = 0
572 self.image = im
574 def __enter__(self) -> Parser:
575 return self
577 def __exit__(self, *args: object) -> None:
578 self.close()
580 def close(self) -> Image.Image:
581 """
582 (Consumer) Close the stream.
584 :returns: An image object.
585 :exception OSError: If the parser failed to parse the image file either
586 because it cannot be identified or cannot be
587 decoded.
588 """
589 # finish decoding
590 if self.decoder:
591 # get rid of what's left in the buffers
592 self.feed(b"")
593 self.data = self.decoder = None
594 if not self.finished:
595 msg = "image was incomplete"
596 raise OSError(msg)
597 if not self.image:
598 msg = "cannot parse this image"
599 raise OSError(msg)
600 if self.data:
601 # incremental parsing not possible; reopen the file
602 # not that we have all data
603 with io.BytesIO(self.data) as fp:
604 try:
605 self.image = Image.open(fp)
606 finally:
607 self.image.load()
608 return self.image
611# --------------------------------------------------------------------
614def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None:
615 """Helper to save image based on tile list
617 :param im: Image object.
618 :param fp: File object.
619 :param tile: Tile list.
620 :param bufsize: Optional buffer size
621 """
623 im.load()
624 if not hasattr(im, "encoderconfig"):
625 im.encoderconfig = ()
626 tile.sort(key=_tilesort)
627 # FIXME: make MAXBLOCK a configuration parameter
628 # It would be great if we could have the encoder specify what it needs
629 # But, it would need at least the image size in most cases. RawEncode is
630 # a tricky case.
631 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c
632 try:
633 fh = fp.fileno()
634 fp.flush()
635 _encode_tile(im, fp, tile, bufsize, fh)
636 except (AttributeError, io.UnsupportedOperation) as exc:
637 _encode_tile(im, fp, tile, bufsize, None, exc)
638 if hasattr(fp, "flush"):
639 fp.flush()
642def _encode_tile(
643 im: Image.Image,
644 fp: IO[bytes],
645 tile: list[_Tile],
646 bufsize: int,
647 fh: int | None,
648 exc: BaseException | None = None,
649) -> None:
650 for encoder_name, extents, offset, args in tile:
651 if offset > 0:
652 fp.seek(offset)
653 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig)
654 try:
655 encoder.setimage(im.im, extents)
656 if encoder.pushes_fd:
657 encoder.setfd(fp)
658 errcode = encoder.encode_to_pyfd()[1]
659 else:
660 if exc:
661 # compress to Python file-compatible object
662 while True:
663 errcode, data = encoder.encode(bufsize)[1:]
664 fp.write(data)
665 if errcode:
666 break
667 else:
668 # slight speedup: compress to real file object
669 assert fh is not None
670 errcode = encoder.encode_to_file(fh, bufsize)
671 if errcode < 0:
672 raise _get_oserror(errcode, encoder=True) from exc
673 finally:
674 encoder.cleanup()
677def _safe_read(fp: IO[bytes], size: int) -> bytes:
678 """
679 Reads large blocks in a safe way. Unlike fp.read(n), this function
680 doesn't trust the user. If the requested size is larger than
681 SAFEBLOCK, the file is read block by block.
683 :param fp: File handle. Must implement a <b>read</b> method.
684 :param size: Number of bytes to read.
685 :returns: A string containing <i>size</i> bytes of data.
687 Raises an OSError if the file is truncated and the read cannot be completed
689 """
690 if size <= 0:
691 return b""
692 if size <= SAFEBLOCK:
693 data = fp.read(size)
694 if len(data) < size:
695 msg = "Truncated File Read"
696 raise OSError(msg)
697 return data
698 blocks: list[bytes] = []
699 remaining_size = size
700 while remaining_size > 0:
701 block = fp.read(min(remaining_size, SAFEBLOCK))
702 if not block:
703 break
704 blocks.append(block)
705 remaining_size -= len(block)
706 if sum(len(block) for block in blocks) < size:
707 msg = "Truncated File Read"
708 raise OSError(msg)
709 return b"".join(blocks)
712class PyCodecState:
713 def __init__(self) -> None:
714 self.xsize = 0
715 self.ysize = 0
716 self.xoff = 0
717 self.yoff = 0
719 def extents(self) -> tuple[int, int, int, int]:
720 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize
723class PyCodec:
724 fd: IO[bytes] | None
726 def __init__(self, mode: str, *args: Any) -> None:
727 self.im: Image.core.ImagingCore | None = None
728 self.state = PyCodecState()
729 self.fd = None
730 self.mode = mode
731 self.init(args)
733 def init(self, args: tuple[Any, ...]) -> None:
734 """
735 Override to perform codec specific initialization
737 :param args: Tuple of arg items from the tile entry
738 :returns: None
739 """
740 self.args = args
742 def cleanup(self) -> None:
743 """
744 Override to perform codec specific cleanup
746 :returns: None
747 """
748 pass
750 def setfd(self, fd: IO[bytes]) -> None:
751 """
752 Called from ImageFile to set the Python file-like object
754 :param fd: A Python file-like object
755 :returns: None
756 """
757 self.fd = fd
759 def setimage(
760 self,
761 im: Image.core.ImagingCore,
762 extents: tuple[int, int, int, int] | None = None,
763 ) -> None:
764 """
765 Called from ImageFile to set the core output image for the codec
767 :param im: A core image object
768 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle
769 for this tile
770 :returns: None
771 """
773 # following c code
774 self.im = im
776 if extents:
777 (x0, y0, x1, y1) = extents
778 else:
779 (x0, y0, x1, y1) = (0, 0, 0, 0)
781 if x0 == 0 and x1 == 0:
782 self.state.xsize, self.state.ysize = self.im.size
783 else:
784 self.state.xoff = x0
785 self.state.yoff = y0
786 self.state.xsize = x1 - x0
787 self.state.ysize = y1 - y0
789 if self.state.xsize <= 0 or self.state.ysize <= 0:
790 msg = "Size cannot be negative"
791 raise ValueError(msg)
793 if (
794 self.state.xsize + self.state.xoff > self.im.size[0]
795 or self.state.ysize + self.state.yoff > self.im.size[1]
796 ):
797 msg = "Tile cannot extend outside image"
798 raise ValueError(msg)
801class PyDecoder(PyCodec):
802 """
803 Python implementation of a format decoder. Override this class and
804 add the decoding logic in the :meth:`decode` method.
806 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
807 """
809 _pulls_fd = False
811 @property
812 def pulls_fd(self) -> bool:
813 return self._pulls_fd
815 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
816 """
817 Override to perform the decoding process.
819 :param buffer: A bytes object with the data to be decoded.
820 :returns: A tuple of ``(bytes consumed, errcode)``.
821 If finished with decoding return -1 for the bytes consumed.
822 Err codes are from :data:`.ImageFile.ERRORS`.
823 """
824 msg = "unavailable in base decoder"
825 raise NotImplementedError(msg)
827 def set_as_raw(
828 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = ()
829 ) -> None:
830 """
831 Convenience method to set the internal image from a stream of raw data
833 :param data: Bytes to be set
834 :param rawmode: The rawmode to be used for the decoder.
835 If not specified, it will default to the mode of the image
836 :param extra: Extra arguments for the decoder.
837 :returns: None
838 """
840 if not rawmode:
841 rawmode = self.mode
842 d = Image._getdecoder(self.mode, "raw", rawmode, extra)
843 assert self.im is not None
844 d.setimage(self.im, self.state.extents())
845 s = d.decode(data)
847 if s[0] >= 0:
848 msg = "not enough image data"
849 raise ValueError(msg)
850 if s[1] != 0:
851 msg = "cannot decode image data"
852 raise ValueError(msg)
855class PyEncoder(PyCodec):
856 """
857 Python implementation of a format encoder. Override this class and
858 add the decoding logic in the :meth:`encode` method.
860 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>`
861 """
863 _pushes_fd = False
865 @property
866 def pushes_fd(self) -> bool:
867 return self._pushes_fd
869 def encode(self, bufsize: int) -> tuple[int, int, bytes]:
870 """
871 Override to perform the encoding process.
873 :param bufsize: Buffer size.
874 :returns: A tuple of ``(bytes encoded, errcode, bytes)``.
875 If finished with encoding return 1 for the error code.
876 Err codes are from :data:`.ImageFile.ERRORS`.
877 """
878 msg = "unavailable in base encoder"
879 raise NotImplementedError(msg)
881 def encode_to_pyfd(self) -> tuple[int, int]:
882 """
883 If ``pushes_fd`` is ``True``, then this method will be used,
884 and ``encode()`` will only be called once.
886 :returns: A tuple of ``(bytes consumed, errcode)``.
887 Err codes are from :data:`.ImageFile.ERRORS`.
888 """
889 if not self.pushes_fd:
890 return 0, -8 # bad configuration
891 bytes_consumed, errcode, data = self.encode(0)
892 if data:
893 assert self.fd is not None
894 self.fd.write(data)
895 return bytes_consumed, errcode
897 def encode_to_file(self, fh: int, bufsize: int) -> int:
898 """
899 :param fh: File handle.
900 :param bufsize: Buffer size.
902 :returns: If finished successfully, return 0.
903 Otherwise, return an error code. Err codes are from
904 :data:`.ImageFile.ERRORS`.
905 """
906 errcode = 0
907 while errcode == 0:
908 status, errcode, buf = self.encode(bufsize)
909 if status > 0:
910 os.write(fh, buf[status:])
911 return errcode