Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/pendulum/tz/zoneinfo/reader.py: 29%

118 statements  

« prev     ^ index     » next       coverage.py v7.2.7, created at 2023-06-07 06:35 +0000

1import os 

2 

3from collections import namedtuple 

4from struct import unpack 

5from typing import IO 

6from typing import Any 

7from typing import Dict 

8from typing import List 

9from typing import Optional 

10from typing import Tuple 

11 

12import pytzdata 

13 

14from pytzdata.exceptions import TimezoneNotFound 

15 

16from pendulum.utils._compat import PY2 

17 

18from .exceptions import InvalidTimezone 

19from .exceptions import InvalidZoneinfoFile 

20from .posix_timezone import PosixTimezone 

21from .posix_timezone import posix_spec 

22from .timezone import Timezone 

23from .transition import Transition 

24from .transition_type import TransitionType 

25 

26 

27_offset = namedtuple("offset", "utc_total_offset is_dst abbr_idx") 

28 

29header = namedtuple( 

30 "header", 

31 "version " "utclocals " "stdwalls " "leaps " "transitions " "types " "abbr_size", 

32) 

33 

34 

35class Reader: 

36 """ 

37 Reads compiled zoneinfo TZif (\0, 2 or 3) files. 

38 """ 

39 

40 def __init__(self, extend=True): # type: (bool) -> None 

41 self._extend = extend 

42 

43 def read_for(self, timezone): # type: (str) -> Timezone 

44 """ 

45 Read the zoneinfo structure for a given timezone name. 

46 

47 :param timezone: The timezone. 

48 """ 

49 try: 

50 file_path = pytzdata.tz_path(timezone) 

51 except TimezoneNotFound: 

52 raise InvalidTimezone(timezone) 

53 

54 return self.read(file_path) 

55 

56 def read(self, file_path): # type: (str) -> Timezone 

57 """ 

58 Read a zoneinfo structure from the given path. 

59 

60 :param file_path: The path of a zoneinfo file. 

61 """ 

62 if not os.path.exists(file_path): 

63 raise InvalidZoneinfoFile("The tzinfo file does not exist") 

64 

65 with open(file_path, "rb") as fd: 

66 return self._parse(fd) 

67 

68 def _check_read(self, fd, nbytes): # type: (...) -> bytes 

69 """ 

70 Reads the given number of bytes from the given file 

71 and checks that the correct number of bytes could be read. 

72 """ 

73 result = fd.read(nbytes) 

74 

75 if (not result and nbytes > 0) or len(result) != nbytes: 

76 raise InvalidZoneinfoFile( 

77 "Expected {} bytes reading {}, " 

78 "but got {}".format(nbytes, fd.name, len(result) if result else 0) 

79 ) 

80 

81 if PY2: 

82 return bytearray(result) 

83 

84 return result 

85 

86 def _parse(self, fd): # type: (...) -> Timezone 

87 """ 

88 Parse a zoneinfo file. 

89 """ 

90 hdr = self._parse_header(fd) 

91 

92 if hdr.version in (2, 3): 

93 # We're skipping the entire v1 file since 

94 # at least the same data will be found in TZFile 2. 

95 fd.seek( 

96 hdr.transitions * 5 

97 + hdr.types * 6 

98 + hdr.abbr_size 

99 + hdr.leaps * 4 

100 + hdr.stdwalls 

101 + hdr.utclocals, 

102 1, 

103 ) 

104 

105 # Parse the second header 

106 hdr = self._parse_header(fd) 

107 

108 if hdr.version != 2 and hdr.version != 3: 

109 raise InvalidZoneinfoFile( 

110 "Header versions mismatch for file {}".format(fd.name) 

111 ) 

112 

113 # Parse the v2 data 

114 trans = self._parse_trans_64(fd, hdr.transitions) 

115 type_idx = self._parse_type_idx(fd, hdr.transitions) 

116 types = self._parse_types(fd, hdr.types) 

117 abbrs = self._parse_abbrs(fd, hdr.abbr_size, types) 

118 

119 fd.seek(hdr.leaps * 8 + hdr.stdwalls + hdr.utclocals, 1) 

120 

121 trule = self._parse_posix_tz(fd) 

122 else: 

123 # TZFile v1 

124 trans = self._parse_trans_32(fd, hdr.transitions) 

125 type_idx = self._parse_type_idx(fd, hdr.transitions) 

126 types = self._parse_types(fd, hdr.types) 

127 abbrs = self._parse_abbrs(fd, hdr.abbr_size, types) 

128 trule = None 

129 

130 types = [ 

131 TransitionType(off, is_dst, abbrs[abbr]) for off, is_dst, abbr in types 

132 ] 

133 

134 transitions = [] 

135 previous = None 

136 for trans, idx in zip(trans, type_idx): 

137 transition = Transition(trans, types[idx], previous) 

138 transitions.append(transition) 

139 

140 previous = transition 

141 

142 if not transitions: 

143 transitions.append(Transition(0, types[0], None)) 

144 

145 return Timezone(transitions, posix_rule=trule, extended=self._extend) 

146 

147 def _parse_header(self, fd): # type: (...) -> header 

148 buff = self._check_read(fd, 44) 

149 

150 if buff[:4] != b"TZif": 

151 raise InvalidZoneinfoFile( 

152 'The file "{}" has an invalid header.'.format(fd.name) 

153 ) 

154 

155 version = {0x00: 1, 0x32: 2, 0x33: 3}.get(buff[4]) 

156 

157 if version is None: 

158 raise InvalidZoneinfoFile( 

159 'The file "{}" has an invalid version.'.format(fd.name) 

160 ) 

161 

162 hdr = header(version, *unpack(">6l", buff[20:44])) 

163 

164 return hdr 

165 

166 def _parse_trans_64(self, fd, n): # type: (IO[Any], int) -> List[int] 

167 trans = [] 

168 for _ in range(n): 

169 buff = self._check_read(fd, 8) 

170 trans.append(unpack(">q", buff)[0]) 

171 

172 return trans 

173 

174 def _parse_trans_32(self, fd, n): # type: (IO[Any], int) -> List[int] 

175 trans = [] 

176 for _ in range(n): 

177 buff = self._check_read(fd, 4) 

178 trans.append(unpack(">i", buff)[0]) 

179 

180 return trans 

181 

182 def _parse_type_idx(self, fd, n): # type: (IO[Any], int) -> List[int] 

183 buff = self._check_read(fd, n) 

184 

185 return list(unpack("{}B".format(n), buff)) 

186 

187 def _parse_types( 

188 self, fd, n 

189 ): # type: (IO[Any], int) -> List[Tuple[Any, bool, int]] 

190 types = [] 

191 

192 for _ in range(n): 

193 buff = self._check_read(fd, 6) 

194 offset = unpack(">l", buff[:4])[0] 

195 is_dst = buff[4] == 1 

196 types.append((offset, is_dst, buff[5])) 

197 

198 return types 

199 

200 def _parse_abbrs( 

201 self, fd, n, types 

202 ): # type: (IO[Any], int, List[Tuple[Any, bool, int]]) -> Dict[int, str] 

203 abbrs = {} 

204 buff = self._check_read(fd, n) 

205 

206 for offset, is_dst, idx in types: 

207 if idx not in abbrs: 

208 abbr = buff[idx : buff.find(b"\0", idx)].decode("utf-8") 

209 abbrs[idx] = abbr 

210 

211 return abbrs 

212 

213 def _parse_posix_tz(self, fd): # type: (...) -> Optional[PosixTimezone] 

214 s = fd.read().decode("utf-8") 

215 

216 if not s.startswith("\n") or not s.endswith("\n"): 

217 raise InvalidZoneinfoFile('Invalid posix rule in file "{}"'.format(fd.name)) 

218 

219 s = s.strip() 

220 

221 if not s: 

222 return 

223 

224 return posix_spec(s)