Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.10/site-packages/pillow-11.0.0-py3.10-linux-x86_64.egg/PIL/ImageFile.py: 25%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

407 statements  

1# 

2# The Python Imaging Library. 

3# $Id$ 

4# 

5# base class for image file handlers 

6# 

7# history: 

8# 1995-09-09 fl Created 

9# 1996-03-11 fl Fixed load mechanism. 

10# 1996-04-15 fl Added pcx/xbm decoders. 

11# 1996-04-30 fl Added encoders. 

12# 1996-12-14 fl Added load helpers 

13# 1997-01-11 fl Use encode_to_file where possible 

14# 1997-08-27 fl Flush output in _save 

15# 1998-03-05 fl Use memory mapping for some modes 

16# 1999-02-04 fl Use memory mapping also for "I;16" and "I;16B" 

17# 1999-05-31 fl Added image parser 

18# 2000-10-12 fl Set readonly flag on memory-mapped images 

19# 2002-03-20 fl Use better messages for common decoder errors 

20# 2003-04-21 fl Fall back on mmap/map_buffer if map is not available 

21# 2003-10-30 fl Added StubImageFile class 

22# 2004-02-25 fl Made incremental parser more robust 

23# 

24# Copyright (c) 1997-2004 by Secret Labs AB 

25# Copyright (c) 1995-2004 by Fredrik Lundh 

26# 

27# See the README file for information on usage and redistribution. 

28# 

29from __future__ import annotations 

30 

31import abc 

32import io 

33import itertools 

34import os 

35import struct 

36import sys 

37from typing import IO, TYPE_CHECKING, Any, NamedTuple, cast 

38 

39from . import Image 

40from ._deprecate import deprecate 

41from ._util import is_path 

42 

43if TYPE_CHECKING: 

44 from ._typing import StrOrBytesPath 

45 

46MAXBLOCK = 65536 

47 

48SAFEBLOCK = 1024 * 1024 

49 

50LOAD_TRUNCATED_IMAGES = False 

51"""Whether or not to load truncated image files. User code may change this.""" 

52 

53ERRORS = { 

54 -1: "image buffer overrun error", 

55 -2: "decoding error", 

56 -3: "unknown error", 

57 -8: "bad configuration", 

58 -9: "out of memory error", 

59} 

60""" 

61Dict of known error codes returned from :meth:`.PyDecoder.decode`, 

62:meth:`.PyEncoder.encode` :meth:`.PyEncoder.encode_to_pyfd` and 

63:meth:`.PyEncoder.encode_to_file`. 

64""" 

65 

66 

67# 

68# -------------------------------------------------------------------- 

69# Helpers 

70 

71 

72def _get_oserror(error: int, *, encoder: bool) -> OSError: 

73 try: 

74 msg = Image.core.getcodecstatus(error) 

75 except AttributeError: 

76 msg = ERRORS.get(error) 

77 if not msg: 

78 msg = f"{'encoder' if encoder else 'decoder'} error {error}" 

79 msg += f" when {'writing' if encoder else 'reading'} image file" 

80 return OSError(msg) 

81 

82 

83def raise_oserror(error: int) -> OSError: 

84 deprecate( 

85 "raise_oserror", 

86 12, 

87 action="It is only useful for translating error codes returned by a codec's " 

88 "decode() method, which ImageFile already does automatically.", 

89 ) 

90 raise _get_oserror(error, encoder=False) 

91 

92 

93def _tilesort(t: _Tile) -> int: 

94 # sort on offset 

95 return t[2] 

96 

97 

98class _Tile(NamedTuple): 

99 codec_name: str 

100 extents: tuple[int, int, int, int] | None 

101 offset: int 

102 args: tuple[Any, ...] | str | None 

103 

104 

105# 

106# -------------------------------------------------------------------- 

107# ImageFile base class 

108 

109 

110class ImageFile(Image.Image): 

111 """Base class for image file format handlers.""" 

112 

113 def __init__( 

114 self, fp: StrOrBytesPath | IO[bytes], filename: str | bytes | None = None 

115 ) -> None: 

116 super().__init__() 

117 

118 self._min_frame = 0 

119 

120 self.custom_mimetype: str | None = None 

121 

122 self.tile: list[_Tile] = [] 

123 """ A list of tile descriptors, or ``None`` """ 

124 

125 self.readonly = 1 # until we know better 

126 

127 self.decoderconfig: tuple[Any, ...] = () 

128 self.decodermaxblock = MAXBLOCK 

129 

130 if is_path(fp): 

131 # filename 

132 self.fp = open(fp, "rb") 

133 self.filename = os.path.realpath(os.fspath(fp)) 

134 self._exclusive_fp = True 

135 else: 

136 # stream 

137 self.fp = cast(IO[bytes], fp) 

138 self.filename = filename if filename is not None else "" 

139 # can be overridden 

140 self._exclusive_fp = False 

141 

142 try: 

143 try: 

144 self._open() 

145 except ( 

146 IndexError, # end of data 

147 TypeError, # end of data (ord) 

148 KeyError, # unsupported mode 

149 EOFError, # got header but not the first frame 

150 struct.error, 

151 ) as v: 

152 raise SyntaxError(v) from v 

153 

154 if not self.mode or self.size[0] <= 0 or self.size[1] <= 0: 

155 msg = "not identified by this driver" 

156 raise SyntaxError(msg) 

157 except BaseException: 

158 # close the file only if we have opened it this constructor 

159 if self._exclusive_fp: 

160 self.fp.close() 

161 raise 

162 

163 def _open(self) -> None: 

164 pass 

165 

166 def get_format_mimetype(self) -> str | None: 

167 if self.custom_mimetype: 

168 return self.custom_mimetype 

169 if self.format is not None: 

170 return Image.MIME.get(self.format.upper()) 

171 return None 

172 

173 def __setstate__(self, state: list[Any]) -> None: 

174 self.tile = [] 

175 super().__setstate__(state) 

176 

177 def verify(self) -> None: 

178 """Check file integrity""" 

179 

180 # raise exception if something's wrong. must be called 

181 # directly after open, and closes file when finished. 

182 if self._exclusive_fp: 

183 self.fp.close() 

184 self.fp = None 

185 

186 def load(self) -> Image.core.PixelAccess | None: 

187 """Load image data based on tile list""" 

188 

189 if not self.tile and self._im is None: 

190 msg = "cannot load this image" 

191 raise OSError(msg) 

192 

193 pixel = Image.Image.load(self) 

194 if not self.tile: 

195 return pixel 

196 

197 self.map: mmap.mmap | None = None 

198 use_mmap = self.filename and len(self.tile) == 1 

199 # As of pypy 2.1.0, memory mapping was failing here. 

200 use_mmap = use_mmap and not hasattr(sys, "pypy_version_info") 

201 

202 readonly = 0 

203 

204 # look for read/seek overrides 

205 if hasattr(self, "load_read"): 

206 read = self.load_read 

207 # don't use mmap if there are custom read/seek functions 

208 use_mmap = False 

209 else: 

210 read = self.fp.read 

211 

212 if hasattr(self, "load_seek"): 

213 seek = self.load_seek 

214 use_mmap = False 

215 else: 

216 seek = self.fp.seek 

217 

218 if use_mmap: 

219 # try memory mapping 

220 decoder_name, extents, offset, args = self.tile[0] 

221 if isinstance(args, str): 

222 args = (args, 0, 1) 

223 if ( 

224 decoder_name == "raw" 

225 and isinstance(args, tuple) 

226 and len(args) >= 3 

227 and args[0] == self.mode 

228 and args[0] in Image._MAPMODES 

229 ): 

230 try: 

231 # use mmap, if possible 

232 import mmap 

233 

234 with open(self.filename) as fp: 

235 self.map = mmap.mmap(fp.fileno(), 0, access=mmap.ACCESS_READ) 

236 if offset + self.size[1] * args[1] > self.map.size(): 

237 msg = "buffer is not large enough" 

238 raise OSError(msg) 

239 self.im = Image.core.map_buffer( 

240 self.map, self.size, decoder_name, offset, args 

241 ) 

242 readonly = 1 

243 # After trashing self.im, 

244 # we might need to reload the palette data. 

245 if self.palette: 

246 self.palette.dirty = 1 

247 except (AttributeError, OSError, ImportError): 

248 self.map = None 

249 

250 self.load_prepare() 

251 err_code = -3 # initialize to unknown error 

252 if not self.map: 

253 # sort tiles in file order 

254 self.tile.sort(key=_tilesort) 

255 

256 # FIXME: This is a hack to handle TIFF's JpegTables tag. 

257 prefix = getattr(self, "tile_prefix", b"") 

258 

259 # Remove consecutive duplicates that only differ by their offset 

260 self.tile = [ 

261 list(tiles)[-1] 

262 for _, tiles in itertools.groupby( 

263 self.tile, lambda tile: (tile[0], tile[1], tile[3]) 

264 ) 

265 ] 

266 for decoder_name, extents, offset, args in self.tile: 

267 seek(offset) 

268 decoder = Image._getdecoder( 

269 self.mode, decoder_name, args, self.decoderconfig 

270 ) 

271 try: 

272 decoder.setimage(self.im, extents) 

273 if decoder.pulls_fd: 

274 decoder.setfd(self.fp) 

275 err_code = decoder.decode(b"")[1] 

276 else: 

277 b = prefix 

278 while True: 

279 try: 

280 s = read(self.decodermaxblock) 

281 except (IndexError, struct.error) as e: 

282 # truncated png/gif 

283 if LOAD_TRUNCATED_IMAGES: 

284 break 

285 else: 

286 msg = "image file is truncated" 

287 raise OSError(msg) from e 

288 

289 if not s: # truncated jpeg 

290 if LOAD_TRUNCATED_IMAGES: 

291 break 

292 else: 

293 msg = ( 

294 "image file is truncated " 

295 f"({len(b)} bytes not processed)" 

296 ) 

297 raise OSError(msg) 

298 

299 b = b + s 

300 n, err_code = decoder.decode(b) 

301 if n < 0: 

302 break 

303 b = b[n:] 

304 finally: 

305 # Need to cleanup here to prevent leaks 

306 decoder.cleanup() 

307 

308 self.tile = [] 

309 self.readonly = readonly 

310 

311 self.load_end() 

312 

313 if self._exclusive_fp and self._close_exclusive_fp_after_loading: 

314 self.fp.close() 

315 self.fp = None 

316 

317 if not self.map and not LOAD_TRUNCATED_IMAGES and err_code < 0: 

318 # still raised if decoder fails to return anything 

319 raise _get_oserror(err_code, encoder=False) 

320 

321 return Image.Image.load(self) 

322 

323 def load_prepare(self) -> None: 

324 # create image memory if necessary 

325 if self._im is None: 

326 self.im = Image.core.new(self.mode, self.size) 

327 # create palette (optional) 

328 if self.mode == "P": 

329 Image.Image.load(self) 

330 

331 def load_end(self) -> None: 

332 # may be overridden 

333 pass 

334 

335 # may be defined for contained formats 

336 # def load_seek(self, pos: int) -> None: 

337 # pass 

338 

339 # may be defined for blocked formats (e.g. PNG) 

340 # def load_read(self, read_bytes: int) -> bytes: 

341 # pass 

342 

343 def _seek_check(self, frame: int) -> bool: 

344 if ( 

345 frame < self._min_frame 

346 # Only check upper limit on frames if additional seek operations 

347 # are not required to do so 

348 or ( 

349 not (hasattr(self, "_n_frames") and self._n_frames is None) 

350 and frame >= getattr(self, "n_frames") + self._min_frame 

351 ) 

352 ): 

353 msg = "attempt to seek outside sequence" 

354 raise EOFError(msg) 

355 

356 return self.tell() != frame 

357 

358 

359class StubHandler: 

360 def open(self, im: StubImageFile) -> None: 

361 pass 

362 

363 @abc.abstractmethod 

364 def load(self, im: StubImageFile) -> Image.Image: 

365 pass 

366 

367 

368class StubImageFile(ImageFile): 

369 """ 

370 Base class for stub image loaders. 

371 

372 A stub loader is an image loader that can identify files of a 

373 certain format, but relies on external code to load the file. 

374 """ 

375 

376 def _open(self) -> None: 

377 msg = "StubImageFile subclass must implement _open" 

378 raise NotImplementedError(msg) 

379 

380 def load(self) -> Image.core.PixelAccess | None: 

381 loader = self._load() 

382 if loader is None: 

383 msg = f"cannot find loader for this {self.format} file" 

384 raise OSError(msg) 

385 image = loader.load(self) 

386 assert image is not None 

387 # become the other object (!) 

388 self.__class__ = image.__class__ # type: ignore[assignment] 

389 self.__dict__ = image.__dict__ 

390 return image.load() 

391 

392 def _load(self) -> StubHandler | None: 

393 """(Hook) Find actual image loader.""" 

394 msg = "StubImageFile subclass must implement _load" 

395 raise NotImplementedError(msg) 

396 

397 

398class Parser: 

399 """ 

400 Incremental image parser. This class implements the standard 

401 feed/close consumer interface. 

402 """ 

403 

404 incremental = None 

405 image: Image.Image | None = None 

406 data: bytes | None = None 

407 decoder: Image.core.ImagingDecoder | PyDecoder | None = None 

408 offset = 0 

409 finished = 0 

410 

411 def reset(self) -> None: 

412 """ 

413 (Consumer) Reset the parser. Note that you can only call this 

414 method immediately after you've created a parser; parser 

415 instances cannot be reused. 

416 """ 

417 assert self.data is None, "cannot reuse parsers" 

418 

419 def feed(self, data: bytes) -> None: 

420 """ 

421 (Consumer) Feed data to the parser. 

422 

423 :param data: A string buffer. 

424 :exception OSError: If the parser failed to parse the image file. 

425 """ 

426 # collect data 

427 

428 if self.finished: 

429 return 

430 

431 if self.data is None: 

432 self.data = data 

433 else: 

434 self.data = self.data + data 

435 

436 # parse what we have 

437 if self.decoder: 

438 if self.offset > 0: 

439 # skip header 

440 skip = min(len(self.data), self.offset) 

441 self.data = self.data[skip:] 

442 self.offset = self.offset - skip 

443 if self.offset > 0 or not self.data: 

444 return 

445 

446 n, e = self.decoder.decode(self.data) 

447 

448 if n < 0: 

449 # end of stream 

450 self.data = None 

451 self.finished = 1 

452 if e < 0: 

453 # decoding error 

454 self.image = None 

455 raise _get_oserror(e, encoder=False) 

456 else: 

457 # end of image 

458 return 

459 self.data = self.data[n:] 

460 

461 elif self.image: 

462 # if we end up here with no decoder, this file cannot 

463 # be incrementally parsed. wait until we've gotten all 

464 # available data 

465 pass 

466 

467 else: 

468 # attempt to open this file 

469 try: 

470 with io.BytesIO(self.data) as fp: 

471 im = Image.open(fp) 

472 except OSError: 

473 pass # not enough data 

474 else: 

475 flag = hasattr(im, "load_seek") or hasattr(im, "load_read") 

476 if flag or len(im.tile) != 1: 

477 # custom load code, or multiple tiles 

478 self.decode = None 

479 else: 

480 # initialize decoder 

481 im.load_prepare() 

482 d, e, o, a = im.tile[0] 

483 im.tile = [] 

484 self.decoder = Image._getdecoder(im.mode, d, a, im.decoderconfig) 

485 self.decoder.setimage(im.im, e) 

486 

487 # calculate decoder offset 

488 self.offset = o 

489 if self.offset <= len(self.data): 

490 self.data = self.data[self.offset :] 

491 self.offset = 0 

492 

493 self.image = im 

494 

495 def __enter__(self) -> Parser: 

496 return self 

497 

498 def __exit__(self, *args: object) -> None: 

499 self.close() 

500 

501 def close(self) -> Image.Image: 

502 """ 

503 (Consumer) Close the stream. 

504 

505 :returns: An image object. 

506 :exception OSError: If the parser failed to parse the image file either 

507 because it cannot be identified or cannot be 

508 decoded. 

509 """ 

510 # finish decoding 

511 if self.decoder: 

512 # get rid of what's left in the buffers 

513 self.feed(b"") 

514 self.data = self.decoder = None 

515 if not self.finished: 

516 msg = "image was incomplete" 

517 raise OSError(msg) 

518 if not self.image: 

519 msg = "cannot parse this image" 

520 raise OSError(msg) 

521 if self.data: 

522 # incremental parsing not possible; reopen the file 

523 # not that we have all data 

524 with io.BytesIO(self.data) as fp: 

525 try: 

526 self.image = Image.open(fp) 

527 finally: 

528 self.image.load() 

529 return self.image 

530 

531 

532# -------------------------------------------------------------------- 

533 

534 

535def _save(im: Image.Image, fp: IO[bytes], tile: list[_Tile], bufsize: int = 0) -> None: 

536 """Helper to save image based on tile list 

537 

538 :param im: Image object. 

539 :param fp: File object. 

540 :param tile: Tile list. 

541 :param bufsize: Optional buffer size 

542 """ 

543 

544 im.load() 

545 if not hasattr(im, "encoderconfig"): 

546 im.encoderconfig = () 

547 tile.sort(key=_tilesort) 

548 # FIXME: make MAXBLOCK a configuration parameter 

549 # It would be great if we could have the encoder specify what it needs 

550 # But, it would need at least the image size in most cases. RawEncode is 

551 # a tricky case. 

552 bufsize = max(MAXBLOCK, bufsize, im.size[0] * 4) # see RawEncode.c 

553 try: 

554 fh = fp.fileno() 

555 fp.flush() 

556 _encode_tile(im, fp, tile, bufsize, fh) 

557 except (AttributeError, io.UnsupportedOperation) as exc: 

558 _encode_tile(im, fp, tile, bufsize, None, exc) 

559 if hasattr(fp, "flush"): 

560 fp.flush() 

561 

562 

563def _encode_tile( 

564 im: Image.Image, 

565 fp: IO[bytes], 

566 tile: list[_Tile], 

567 bufsize: int, 

568 fh: int | None, 

569 exc: BaseException | None = None, 

570) -> None: 

571 for encoder_name, extents, offset, args in tile: 

572 if offset > 0: 

573 fp.seek(offset) 

574 encoder = Image._getencoder(im.mode, encoder_name, args, im.encoderconfig) 

575 try: 

576 encoder.setimage(im.im, extents) 

577 if encoder.pushes_fd: 

578 encoder.setfd(fp) 

579 errcode = encoder.encode_to_pyfd()[1] 

580 else: 

581 if exc: 

582 # compress to Python file-compatible object 

583 while True: 

584 errcode, data = encoder.encode(bufsize)[1:] 

585 fp.write(data) 

586 if errcode: 

587 break 

588 else: 

589 # slight speedup: compress to real file object 

590 assert fh is not None 

591 errcode = encoder.encode_to_file(fh, bufsize) 

592 if errcode < 0: 

593 raise _get_oserror(errcode, encoder=True) from exc 

594 finally: 

595 encoder.cleanup() 

596 

597 

598def _safe_read(fp: IO[bytes], size: int) -> bytes: 

599 """ 

600 Reads large blocks in a safe way. Unlike fp.read(n), this function 

601 doesn't trust the user. If the requested size is larger than 

602 SAFEBLOCK, the file is read block by block. 

603 

604 :param fp: File handle. Must implement a <b>read</b> method. 

605 :param size: Number of bytes to read. 

606 :returns: A string containing <i>size</i> bytes of data. 

607 

608 Raises an OSError if the file is truncated and the read cannot be completed 

609 

610 """ 

611 if size <= 0: 

612 return b"" 

613 if size <= SAFEBLOCK: 

614 data = fp.read(size) 

615 if len(data) < size: 

616 msg = "Truncated File Read" 

617 raise OSError(msg) 

618 return data 

619 blocks: list[bytes] = [] 

620 remaining_size = size 

621 while remaining_size > 0: 

622 block = fp.read(min(remaining_size, SAFEBLOCK)) 

623 if not block: 

624 break 

625 blocks.append(block) 

626 remaining_size -= len(block) 

627 if sum(len(block) for block in blocks) < size: 

628 msg = "Truncated File Read" 

629 raise OSError(msg) 

630 return b"".join(blocks) 

631 

632 

633class PyCodecState: 

634 def __init__(self) -> None: 

635 self.xsize = 0 

636 self.ysize = 0 

637 self.xoff = 0 

638 self.yoff = 0 

639 

640 def extents(self) -> tuple[int, int, int, int]: 

641 return self.xoff, self.yoff, self.xoff + self.xsize, self.yoff + self.ysize 

642 

643 

644class PyCodec: 

645 fd: IO[bytes] | None 

646 

647 def __init__(self, mode: str, *args: Any) -> None: 

648 self.im: Image.core.ImagingCore | None = None 

649 self.state = PyCodecState() 

650 self.fd = None 

651 self.mode = mode 

652 self.init(args) 

653 

654 def init(self, args: tuple[Any, ...]) -> None: 

655 """ 

656 Override to perform codec specific initialization 

657 

658 :param args: Tuple of arg items from the tile entry 

659 :returns: None 

660 """ 

661 self.args = args 

662 

663 def cleanup(self) -> None: 

664 """ 

665 Override to perform codec specific cleanup 

666 

667 :returns: None 

668 """ 

669 pass 

670 

671 def setfd(self, fd: IO[bytes]) -> None: 

672 """ 

673 Called from ImageFile to set the Python file-like object 

674 

675 :param fd: A Python file-like object 

676 :returns: None 

677 """ 

678 self.fd = fd 

679 

680 def setimage( 

681 self, 

682 im: Image.core.ImagingCore, 

683 extents: tuple[int, int, int, int] | None = None, 

684 ) -> None: 

685 """ 

686 Called from ImageFile to set the core output image for the codec 

687 

688 :param im: A core image object 

689 :param extents: a 4 tuple of (x0, y0, x1, y1) defining the rectangle 

690 for this tile 

691 :returns: None 

692 """ 

693 

694 # following c code 

695 self.im = im 

696 

697 if extents: 

698 (x0, y0, x1, y1) = extents 

699 else: 

700 (x0, y0, x1, y1) = (0, 0, 0, 0) 

701 

702 if x0 == 0 and x1 == 0: 

703 self.state.xsize, self.state.ysize = self.im.size 

704 else: 

705 self.state.xoff = x0 

706 self.state.yoff = y0 

707 self.state.xsize = x1 - x0 

708 self.state.ysize = y1 - y0 

709 

710 if self.state.xsize <= 0 or self.state.ysize <= 0: 

711 msg = "Size cannot be negative" 

712 raise ValueError(msg) 

713 

714 if ( 

715 self.state.xsize + self.state.xoff > self.im.size[0] 

716 or self.state.ysize + self.state.yoff > self.im.size[1] 

717 ): 

718 msg = "Tile cannot extend outside image" 

719 raise ValueError(msg) 

720 

721 

722class PyDecoder(PyCodec): 

723 """ 

724 Python implementation of a format decoder. Override this class and 

725 add the decoding logic in the :meth:`decode` method. 

726 

727 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

728 """ 

729 

730 _pulls_fd = False 

731 

732 @property 

733 def pulls_fd(self) -> bool: 

734 return self._pulls_fd 

735 

736 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]: 

737 """ 

738 Override to perform the decoding process. 

739 

740 :param buffer: A bytes object with the data to be decoded. 

741 :returns: A tuple of ``(bytes consumed, errcode)``. 

742 If finished with decoding return -1 for the bytes consumed. 

743 Err codes are from :data:`.ImageFile.ERRORS`. 

744 """ 

745 msg = "unavailable in base decoder" 

746 raise NotImplementedError(msg) 

747 

748 def set_as_raw( 

749 self, data: bytes, rawmode: str | None = None, extra: tuple[Any, ...] = () 

750 ) -> None: 

751 """ 

752 Convenience method to set the internal image from a stream of raw data 

753 

754 :param data: Bytes to be set 

755 :param rawmode: The rawmode to be used for the decoder. 

756 If not specified, it will default to the mode of the image 

757 :param extra: Extra arguments for the decoder. 

758 :returns: None 

759 """ 

760 

761 if not rawmode: 

762 rawmode = self.mode 

763 d = Image._getdecoder(self.mode, "raw", rawmode, extra) 

764 assert self.im is not None 

765 d.setimage(self.im, self.state.extents()) 

766 s = d.decode(data) 

767 

768 if s[0] >= 0: 

769 msg = "not enough image data" 

770 raise ValueError(msg) 

771 if s[1] != 0: 

772 msg = "cannot decode image data" 

773 raise ValueError(msg) 

774 

775 

776class PyEncoder(PyCodec): 

777 """ 

778 Python implementation of a format encoder. Override this class and 

779 add the decoding logic in the :meth:`encode` method. 

780 

781 See :ref:`Writing Your Own File Codec in Python<file-codecs-py>` 

782 """ 

783 

784 _pushes_fd = False 

785 

786 @property 

787 def pushes_fd(self) -> bool: 

788 return self._pushes_fd 

789 

790 def encode(self, bufsize: int) -> tuple[int, int, bytes]: 

791 """ 

792 Override to perform the encoding process. 

793 

794 :param bufsize: Buffer size. 

795 :returns: A tuple of ``(bytes encoded, errcode, bytes)``. 

796 If finished with encoding return 1 for the error code. 

797 Err codes are from :data:`.ImageFile.ERRORS`. 

798 """ 

799 msg = "unavailable in base encoder" 

800 raise NotImplementedError(msg) 

801 

802 def encode_to_pyfd(self) -> tuple[int, int]: 

803 """ 

804 If ``pushes_fd`` is ``True``, then this method will be used, 

805 and ``encode()`` will only be called once. 

806 

807 :returns: A tuple of ``(bytes consumed, errcode)``. 

808 Err codes are from :data:`.ImageFile.ERRORS`. 

809 """ 

810 if not self.pushes_fd: 

811 return 0, -8 # bad configuration 

812 bytes_consumed, errcode, data = self.encode(0) 

813 if data: 

814 assert self.fd is not None 

815 self.fd.write(data) 

816 return bytes_consumed, errcode 

817 

818 def encode_to_file(self, fh: int, bufsize: int) -> int: 

819 """ 

820 :param fh: File handle. 

821 :param bufsize: Buffer size. 

822 

823 :returns: If finished successfully, return 0. 

824 Otherwise, return an error code. Err codes are from 

825 :data:`.ImageFile.ERRORS`. 

826 """ 

827 errcode = 0 

828 while errcode == 0: 

829 status, errcode, buf = self.encode(bufsize) 

830 if status > 0: 

831 os.write(fh, buf[status:]) 

832 return errcode