1#
2# The Python Imaging Library
3# $Id$
4#
5# FITS file handling
6#
7# Copyright (c) 1998-2003 by Fredrik Lundh
8#
9# See the README file for information on usage and redistribution.
10#
11from __future__ import annotations
12
13import gzip
14import math
15
16from . import Image, ImageFile
17
18
19def _accept(prefix: bytes) -> bool:
20 return prefix.startswith(b"SIMPLE")
21
22
23class FitsImageFile(ImageFile.ImageFile):
24 format = "FITS"
25 format_description = "FITS"
26
27 def _open(self) -> None:
28 assert self.fp is not None
29
30 headers: dict[bytes, bytes] = {}
31 header_in_progress = False
32 decoder_name = ""
33 while True:
34 header = self.fp.read(80)
35 if not header:
36 msg = "Truncated FITS file"
37 raise OSError(msg)
38 keyword = header[:8].strip()
39 if keyword in (b"SIMPLE", b"XTENSION"):
40 header_in_progress = True
41 elif headers and not header_in_progress:
42 # This is now a data unit
43 break
44 elif keyword == b"END":
45 # Seek to the end of the header unit
46 self.fp.seek(math.ceil(self.fp.tell() / 2880) * 2880)
47 if not decoder_name:
48 decoder_name, offset, args = self._parse_headers(headers)
49
50 header_in_progress = False
51 continue
52
53 if decoder_name:
54 # Keep going to read past the headers
55 continue
56
57 value = header[8:].split(b"/")[0].strip()
58 if value.startswith(b"="):
59 value = value[1:].strip()
60 if not headers and (not _accept(keyword) or value != b"T"):
61 msg = "Not a FITS file"
62 raise SyntaxError(msg)
63 headers[keyword] = value
64
65 if not decoder_name:
66 msg = "No image data"
67 raise ValueError(msg)
68
69 offset += self.fp.tell() - 80
70 self.tile = [ImageFile._Tile(decoder_name, (0, 0) + self.size, offset, args)]
71
72 def _get_size(
73 self, headers: dict[bytes, bytes], prefix: bytes
74 ) -> tuple[int, int] | None:
75 naxis = int(headers[prefix + b"NAXIS"])
76 if naxis == 0:
77 return None
78
79 if naxis == 1:
80 return 1, int(headers[prefix + b"NAXIS1"])
81 else:
82 return int(headers[prefix + b"NAXIS1"]), int(headers[prefix + b"NAXIS2"])
83
84 def _parse_headers(
85 self, headers: dict[bytes, bytes]
86 ) -> tuple[str, int, tuple[str | int, ...]]:
87 prefix = b""
88 decoder_name = "raw"
89 offset = 0
90 if (
91 headers.get(b"XTENSION") == b"'BINTABLE'"
92 and headers.get(b"ZIMAGE") == b"T"
93 and headers[b"ZCMPTYPE"] == b"'GZIP_1 '"
94 ):
95 no_prefix_size = self._get_size(headers, prefix) or (0, 0)
96 number_of_bits = int(headers[b"BITPIX"])
97 offset = no_prefix_size[0] * no_prefix_size[1] * (number_of_bits // 8)
98
99 prefix = b"Z"
100 decoder_name = "fits_gzip"
101
102 size = self._get_size(headers, prefix)
103 if not size:
104 return "", 0, ()
105
106 self._size = size
107
108 number_of_bits = int(headers[prefix + b"BITPIX"])
109 if number_of_bits == 8:
110 self._mode = "L"
111 elif number_of_bits == 16:
112 self._mode = "I;16"
113 elif number_of_bits == 32:
114 self._mode = "I"
115 elif number_of_bits in (-32, -64):
116 self._mode = "F"
117
118 args: tuple[str | int, ...]
119 if decoder_name == "raw":
120 args = (self.mode, 0, -1)
121 else:
122 args = (number_of_bits,)
123 return decoder_name, offset, args
124
125
126class FitsGzipDecoder(ImageFile.PyDecoder):
127 _pulls_fd = True
128
129 def decode(self, buffer: bytes | Image.SupportsArrayInterface) -> tuple[int, int]:
130 assert self.fd is not None
131 value = gzip.decompress(self.fd.read())
132
133 rows = []
134 offset = 0
135 number_of_bits = min(self.args[0] // 8, 4)
136 for y in range(self.state.ysize):
137 row = bytearray()
138 for x in range(self.state.xsize):
139 row += value[offset + (4 - number_of_bits) : offset + 4]
140 offset += 4
141 rows.append(row)
142 self.set_as_raw(bytes([pixel for row in rows[::-1] for pixel in row]))
143 return -1, 0
144
145
146# --------------------------------------------------------------------
147# Registry
148
149Image.register_open(FitsImageFile.format, FitsImageFile, _accept)
150Image.register_decoder("fits_gzip", FitsGzipDecoder)
151
152Image.register_extensions(FitsImageFile.format, [".fit", ".fits"])