1import struct
2
3
4def load_tzdata(key):
5 try:
6 import importlib.resources as importlib_resources
7 except ImportError:
8 import importlib_resources
9
10 components = key.split("/")
11 package_name = ".".join(["tzdata.zoneinfo"] + components[:-1])
12 resource_name = components[-1]
13
14 try:
15 return importlib_resources.open_binary(package_name, resource_name)
16 except (ImportError, FileNotFoundError, UnicodeEncodeError):
17 # There are three types of exception that can be raised that all amount
18 # to "we cannot find this key":
19 #
20 # ImportError: If package_name doesn't exist (e.g. if tzdata is not
21 # installed, or if there's an error in the folder name like
22 # Amrica/New_York)
23 # FileNotFoundError: If resource_name doesn't exist in the package
24 # (e.g. Europe/Krasnoy)
25 # UnicodeEncodeError: If package_name or resource_name are not UTF-8,
26 # such as keys containing a surrogate character.
27 raise ZoneInfoNotFoundError(f"No time zone found with key {key}")
28
29
30def load_data(fobj):
31 header = _TZifHeader.from_file(fobj)
32
33 if header.version == 1:
34 time_size = 4
35 time_type = "l"
36 else:
37 # Version 2+ has 64-bit integer transition times
38 time_size = 8
39 time_type = "q"
40
41 # Version 2+ also starts with a Version 1 header and data, which
42 # we need to skip now
43 skip_bytes = (
44 header.timecnt * 5 # Transition times and types
45 + header.typecnt * 6 # Local time type records
46 + header.charcnt # Time zone designations
47 + header.leapcnt * 8 # Leap second records
48 + header.isstdcnt # Standard/wall indicators
49 + header.isutcnt # UT/local indicators
50 )
51
52 fobj.seek(skip_bytes, 1)
53
54 # Now we need to read the second header, which is not the same
55 # as the first
56 header = _TZifHeader.from_file(fobj)
57
58 typecnt = header.typecnt
59 timecnt = header.timecnt
60 charcnt = header.charcnt
61
62 # The data portion starts with timecnt transitions and indices
63 if timecnt:
64 trans_list_utc = struct.unpack(
65 f">{timecnt}{time_type}", fobj.read(timecnt * time_size)
66 )
67 trans_idx = struct.unpack(f">{timecnt}B", fobj.read(timecnt))
68 else:
69 trans_list_utc = ()
70 trans_idx = ()
71
72 # Read the ttinfo struct, (utoff, isdst, abbrind)
73 if typecnt:
74 utcoff, isdst, abbrind = zip(
75 *(struct.unpack(">lbb", fobj.read(6)) for i in range(typecnt))
76 )
77 else:
78 utcoff = ()
79 isdst = ()
80 abbrind = ()
81
82 # Now read the abbreviations. They are null-terminated strings, indexed
83 # not by position in the array but by position in the unsplit
84 # abbreviation string. I suppose this makes more sense in C, which uses
85 # null to terminate the strings, but it's inconvenient here...
86 abbr_vals = {}
87 abbr_chars = fobj.read(charcnt)
88
89 def get_abbr(idx):
90 # Gets a string starting at idx and running until the next \x00
91 #
92 # We cannot pre-populate abbr_vals by splitting on \x00 because there
93 # are some zones that use subsets of longer abbreviations, like so:
94 #
95 # LMT\x00AHST\x00HDT\x00
96 #
97 # Where the idx to abbr mapping should be:
98 #
99 # {0: "LMT", 4: "AHST", 5: "HST", 9: "HDT"}
100 if idx not in abbr_vals:
101 span_end = abbr_chars.find(b"\x00", idx)
102 abbr_vals[idx] = abbr_chars[idx:span_end].decode()
103
104 return abbr_vals[idx]
105
106 abbr = tuple(get_abbr(idx) for idx in abbrind)
107
108 # The remainder of the file consists of leap seconds (currently unused) and
109 # the standard/wall and ut/local indicators, which are metadata we don't need.
110 # In version 2 files, we need to skip the unnecessary data to get at the TZ string:
111 if header.version >= 2:
112 # Each leap second record has size (time_size + 4)
113 skip_bytes = header.isutcnt + header.isstdcnt + header.leapcnt * 12
114 fobj.seek(skip_bytes, 1)
115
116 c = fobj.read(1) # Should be \n
117 assert c == b"\n", c
118
119 tz_bytes = b""
120 while True:
121 c = fobj.read(1)
122 if c == b"\n":
123 break
124 tz_bytes += c
125
126 tz_str = tz_bytes
127 else:
128 tz_str = None
129
130 return trans_idx, trans_list_utc, utcoff, isdst, abbr, tz_str
131
132
133class _TZifHeader:
134 __slots__ = [
135 "version",
136 "isutcnt",
137 "isstdcnt",
138 "leapcnt",
139 "timecnt",
140 "typecnt",
141 "charcnt",
142 ]
143
144 def __init__(self, *args):
145 assert len(self.__slots__) == len(args)
146 for attr, val in zip(self.__slots__, args):
147 setattr(self, attr, val)
148
149 @classmethod
150 def from_file(cls, stream):
151 # The header starts with a 4-byte "magic" value
152 if stream.read(4) != b"TZif":
153 raise ValueError("Invalid TZif file: magic not found")
154
155 _version = stream.read(1)
156 if _version == b"\x00":
157 version = 1
158 else:
159 version = int(_version)
160 stream.read(15)
161
162 args = (version,)
163
164 # Slots are defined in the order that the bytes are arranged
165 args = args + struct.unpack(">6l", stream.read(24))
166
167 return cls(*args)
168
169
170class ZoneInfoNotFoundError(KeyError):
171 """Exception raised when a ZoneInfo key is not found."""