Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pendulum/tz/zoneinfo/reader.py: 29%
118 statements
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
« prev ^ index » next coverage.py v7.2.7, created at 2023-06-07 06:35 +0000
1import os
3from collections import namedtuple
4from struct import unpack
5from typing import IO
6from typing import Any
7from typing import Dict
8from typing import List
9from typing import Optional
10from typing import Tuple
12import pytzdata
14from pytzdata.exceptions import TimezoneNotFound
16from pendulum.utils._compat import PY2
18from .exceptions import InvalidTimezone
19from .exceptions import InvalidZoneinfoFile
20from .posix_timezone import PosixTimezone
21from .posix_timezone import posix_spec
22from .timezone import Timezone
23from .transition import Transition
24from .transition_type import TransitionType
27_offset = namedtuple("offset", "utc_total_offset is_dst abbr_idx")
29header = namedtuple(
30 "header",
31 "version " "utclocals " "stdwalls " "leaps " "transitions " "types " "abbr_size",
32)
35class Reader:
36 """
37 Reads compiled zoneinfo TZif (\0, 2 or 3) files.
38 """
40 def __init__(self, extend=True): # type: (bool) -> None
41 self._extend = extend
43 def read_for(self, timezone): # type: (str) -> Timezone
44 """
45 Read the zoneinfo structure for a given timezone name.
47 :param timezone: The timezone.
48 """
49 try:
50 file_path = pytzdata.tz_path(timezone)
51 except TimezoneNotFound:
52 raise InvalidTimezone(timezone)
54 return self.read(file_path)
56 def read(self, file_path): # type: (str) -> Timezone
57 """
58 Read a zoneinfo structure from the given path.
60 :param file_path: The path of a zoneinfo file.
61 """
62 if not os.path.exists(file_path):
63 raise InvalidZoneinfoFile("The tzinfo file does not exist")
65 with open(file_path, "rb") as fd:
66 return self._parse(fd)
68 def _check_read(self, fd, nbytes): # type: (...) -> bytes
69 """
70 Reads the given number of bytes from the given file
71 and checks that the correct number of bytes could be read.
72 """
73 result = fd.read(nbytes)
75 if (not result and nbytes > 0) or len(result) != nbytes:
76 raise InvalidZoneinfoFile(
77 "Expected {} bytes reading {}, "
78 "but got {}".format(nbytes, fd.name, len(result) if result else 0)
79 )
81 if PY2:
82 return bytearray(result)
84 return result
86 def _parse(self, fd): # type: (...) -> Timezone
87 """
88 Parse a zoneinfo file.
89 """
90 hdr = self._parse_header(fd)
92 if hdr.version in (2, 3):
93 # We're skipping the entire v1 file since
94 # at least the same data will be found in TZFile 2.
95 fd.seek(
96 hdr.transitions * 5
97 + hdr.types * 6
98 + hdr.abbr_size
99 + hdr.leaps * 4
100 + hdr.stdwalls
101 + hdr.utclocals,
102 1,
103 )
105 # Parse the second header
106 hdr = self._parse_header(fd)
108 if hdr.version != 2 and hdr.version != 3:
109 raise InvalidZoneinfoFile(
110 "Header versions mismatch for file {}".format(fd.name)
111 )
113 # Parse the v2 data
114 trans = self._parse_trans_64(fd, hdr.transitions)
115 type_idx = self._parse_type_idx(fd, hdr.transitions)
116 types = self._parse_types(fd, hdr.types)
117 abbrs = self._parse_abbrs(fd, hdr.abbr_size, types)
119 fd.seek(hdr.leaps * 8 + hdr.stdwalls + hdr.utclocals, 1)
121 trule = self._parse_posix_tz(fd)
122 else:
123 # TZFile v1
124 trans = self._parse_trans_32(fd, hdr.transitions)
125 type_idx = self._parse_type_idx(fd, hdr.transitions)
126 types = self._parse_types(fd, hdr.types)
127 abbrs = self._parse_abbrs(fd, hdr.abbr_size, types)
128 trule = None
130 types = [
131 TransitionType(off, is_dst, abbrs[abbr]) for off, is_dst, abbr in types
132 ]
134 transitions = []
135 previous = None
136 for trans, idx in zip(trans, type_idx):
137 transition = Transition(trans, types[idx], previous)
138 transitions.append(transition)
140 previous = transition
142 if not transitions:
143 transitions.append(Transition(0, types[0], None))
145 return Timezone(transitions, posix_rule=trule, extended=self._extend)
147 def _parse_header(self, fd): # type: (...) -> header
148 buff = self._check_read(fd, 44)
150 if buff[:4] != b"TZif":
151 raise InvalidZoneinfoFile(
152 'The file "{}" has an invalid header.'.format(fd.name)
153 )
155 version = {0x00: 1, 0x32: 2, 0x33: 3}.get(buff[4])
157 if version is None:
158 raise InvalidZoneinfoFile(
159 'The file "{}" has an invalid version.'.format(fd.name)
160 )
162 hdr = header(version, *unpack(">6l", buff[20:44]))
164 return hdr
166 def _parse_trans_64(self, fd, n): # type: (IO[Any], int) -> List[int]
167 trans = []
168 for _ in range(n):
169 buff = self._check_read(fd, 8)
170 trans.append(unpack(">q", buff)[0])
172 return trans
174 def _parse_trans_32(self, fd, n): # type: (IO[Any], int) -> List[int]
175 trans = []
176 for _ in range(n):
177 buff = self._check_read(fd, 4)
178 trans.append(unpack(">i", buff)[0])
180 return trans
182 def _parse_type_idx(self, fd, n): # type: (IO[Any], int) -> List[int]
183 buff = self._check_read(fd, n)
185 return list(unpack("{}B".format(n), buff))
187 def _parse_types(
188 self, fd, n
189 ): # type: (IO[Any], int) -> List[Tuple[Any, bool, int]]
190 types = []
192 for _ in range(n):
193 buff = self._check_read(fd, 6)
194 offset = unpack(">l", buff[:4])[0]
195 is_dst = buff[4] == 1
196 types.append((offset, is_dst, buff[5]))
198 return types
200 def _parse_abbrs(
201 self, fd, n, types
202 ): # type: (IO[Any], int, List[Tuple[Any, bool, int]]) -> Dict[int, str]
203 abbrs = {}
204 buff = self._check_read(fd, n)
206 for offset, is_dst, idx in types:
207 if idx not in abbrs:
208 abbr = buff[idx : buff.find(b"\0", idx)].decode("utf-8")
209 abbrs[idx] = abbr
211 return abbrs
213 def _parse_posix_tz(self, fd): # type: (...) -> Optional[PosixTimezone]
214 s = fd.read().decode("utf-8")
216 if not s.startswith("\n") or not s.endswith("\n"):
217 raise InvalidZoneinfoFile('Invalid posix rule in file "{}"'.format(fd.name))
219 s = s.strip()
221 if not s:
222 return
224 return posix_spec(s)