Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdtypes/util.py: 28%

191 statements  

« prev     ^ index     » next       coverage.py v7.3.1, created at 2023-09-25 07:09 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2006, 2007, 2009-2011 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18import collections 

19import random 

20import struct 

21from typing import Any, List 

22 

23import dns.exception 

24import dns.ipv4 

25import dns.ipv6 

26import dns.name 

27import dns.rdata 

28 

29 

30class Gateway: 

31 """A helper class for the IPSECKEY gateway and AMTRELAY relay fields""" 

32 

33 name = "" 

34 

35 def __init__(self, type, gateway=None): 

36 self.type = dns.rdata.Rdata._as_uint8(type) 

37 self.gateway = gateway 

38 self._check() 

39 

40 @classmethod 

41 def _invalid_type(cls, gateway_type): 

42 return f"invalid {cls.name} type: {gateway_type}" 

43 

44 def _check(self): 

45 if self.type == 0: 

46 if self.gateway not in (".", None): 

47 raise SyntaxError(f"invalid {self.name} for type 0") 

48 self.gateway = None 

49 elif self.type == 1: 

50 # check that it's OK 

51 dns.ipv4.inet_aton(self.gateway) 

52 elif self.type == 2: 

53 # check that it's OK 

54 dns.ipv6.inet_aton(self.gateway) 

55 elif self.type == 3: 

56 if not isinstance(self.gateway, dns.name.Name): 

57 raise SyntaxError(f"invalid {self.name}; not a name") 

58 else: 

59 raise SyntaxError(self._invalid_type(self.type)) 

60 

61 def to_text(self, origin=None, relativize=True): 

62 if self.type == 0: 

63 return "." 

64 elif self.type in (1, 2): 

65 return self.gateway 

66 elif self.type == 3: 

67 return str(self.gateway.choose_relativity(origin, relativize)) 

68 else: 

69 raise ValueError(self._invalid_type(self.type)) # pragma: no cover 

70 

71 @classmethod 

72 def from_text( 

73 cls, gateway_type, tok, origin=None, relativize=True, relativize_to=None 

74 ): 

75 if gateway_type in (0, 1, 2): 

76 gateway = tok.get_string() 

77 elif gateway_type == 3: 

78 gateway = tok.get_name(origin, relativize, relativize_to) 

79 else: 

80 raise dns.exception.SyntaxError( 

81 cls._invalid_type(gateway_type) 

82 ) # pragma: no cover 

83 return cls(gateway_type, gateway) 

84 

85 # pylint: disable=unused-argument 

86 def to_wire(self, file, compress=None, origin=None, canonicalize=False): 

87 if self.type == 0: 

88 pass 

89 elif self.type == 1: 

90 file.write(dns.ipv4.inet_aton(self.gateway)) 

91 elif self.type == 2: 

92 file.write(dns.ipv6.inet_aton(self.gateway)) 

93 elif self.type == 3: 

94 self.gateway.to_wire(file, None, origin, False) 

95 else: 

96 raise ValueError(self._invalid_type(self.type)) # pragma: no cover 

97 

98 # pylint: enable=unused-argument 

99 

100 @classmethod 

101 def from_wire_parser(cls, gateway_type, parser, origin=None): 

102 if gateway_type == 0: 

103 gateway = None 

104 elif gateway_type == 1: 

105 gateway = dns.ipv4.inet_ntoa(parser.get_bytes(4)) 

106 elif gateway_type == 2: 

107 gateway = dns.ipv6.inet_ntoa(parser.get_bytes(16)) 

108 elif gateway_type == 3: 

109 gateway = parser.get_name(origin) 

110 else: 

111 raise dns.exception.FormError(cls._invalid_type(gateway_type)) 

112 return cls(gateway_type, gateway) 

113 

114 

115class Bitmap: 

116 """A helper class for the NSEC/NSEC3/CSYNC type bitmaps""" 

117 

118 type_name = "" 

119 

120 def __init__(self, windows=None): 

121 last_window = -1 

122 self.windows = windows 

123 for window, bitmap in self.windows: 

124 if not isinstance(window, int): 

125 raise ValueError(f"bad {self.type_name} window type") 

126 if window <= last_window: 

127 raise ValueError(f"bad {self.type_name} window order") 

128 if window > 256: 

129 raise ValueError(f"bad {self.type_name} window number") 

130 last_window = window 

131 if not isinstance(bitmap, bytes): 

132 raise ValueError(f"bad {self.type_name} octets type") 

133 if len(bitmap) == 0 or len(bitmap) > 32: 

134 raise ValueError(f"bad {self.type_name} octets") 

135 

136 def to_text(self) -> str: 

137 text = "" 

138 for window, bitmap in self.windows: 

139 bits = [] 

140 for i, byte in enumerate(bitmap): 

141 for j in range(0, 8): 

142 if byte & (0x80 >> j): 

143 rdtype = window * 256 + i * 8 + j 

144 bits.append(dns.rdatatype.to_text(rdtype)) 

145 text += " " + " ".join(bits) 

146 return text 

147 

148 @classmethod 

149 def from_text(cls, tok: "dns.tokenizer.Tokenizer") -> "Bitmap": 

150 rdtypes = [] 

151 for token in tok.get_remaining(): 

152 rdtype = dns.rdatatype.from_text(token.unescape().value) 

153 if rdtype == 0: 

154 raise dns.exception.SyntaxError(f"{cls.type_name} with bit 0") 

155 rdtypes.append(rdtype) 

156 return cls.from_rdtypes(rdtypes) 

157 

158 @classmethod 

159 def from_rdtypes(cls, rdtypes: List[dns.rdatatype.RdataType]) -> "Bitmap": 

160 rdtypes = sorted(rdtypes) 

161 window = 0 

162 octets = 0 

163 prior_rdtype = 0 

164 bitmap = bytearray(b"\0" * 32) 

165 windows = [] 

166 for rdtype in rdtypes: 

167 if rdtype == prior_rdtype: 

168 continue 

169 prior_rdtype = rdtype 

170 new_window = rdtype // 256 

171 if new_window != window: 

172 if octets != 0: 

173 windows.append((window, bytes(bitmap[0:octets]))) 

174 bitmap = bytearray(b"\0" * 32) 

175 window = new_window 

176 offset = rdtype % 256 

177 byte = offset // 8 

178 bit = offset % 8 

179 octets = byte + 1 

180 bitmap[byte] = bitmap[byte] | (0x80 >> bit) 

181 if octets != 0: 

182 windows.append((window, bytes(bitmap[0:octets]))) 

183 return cls(windows) 

184 

185 def to_wire(self, file: Any) -> None: 

186 for window, bitmap in self.windows: 

187 file.write(struct.pack("!BB", window, len(bitmap))) 

188 file.write(bitmap) 

189 

190 @classmethod 

191 def from_wire_parser(cls, parser: "dns.wire.Parser") -> "Bitmap": 

192 windows = [] 

193 while parser.remaining() > 0: 

194 window = parser.get_uint8() 

195 bitmap = parser.get_counted_bytes() 

196 windows.append((window, bitmap)) 

197 return cls(windows) 

198 

199 

200def _priority_table(items): 

201 by_priority = collections.defaultdict(list) 

202 for rdata in items: 

203 by_priority[rdata._processing_priority()].append(rdata) 

204 return by_priority 

205 

206 

207def priority_processing_order(iterable): 

208 items = list(iterable) 

209 if len(items) == 1: 

210 return items 

211 by_priority = _priority_table(items) 

212 ordered = [] 

213 for k in sorted(by_priority.keys()): 

214 rdatas = by_priority[k] 

215 random.shuffle(rdatas) 

216 ordered.extend(rdatas) 

217 return ordered 

218 

219 

220_no_weight = 0.1 

221 

222 

223def weighted_processing_order(iterable): 

224 items = list(iterable) 

225 if len(items) == 1: 

226 return items 

227 by_priority = _priority_table(items) 

228 ordered = [] 

229 for k in sorted(by_priority.keys()): 

230 rdatas = by_priority[k] 

231 total = sum(rdata._processing_weight() or _no_weight for rdata in rdatas) 

232 while len(rdatas) > 1: 

233 r = random.uniform(0, total) 

234 for n, rdata in enumerate(rdatas): 

235 weight = rdata._processing_weight() or _no_weight 

236 if weight > r: 

237 break 

238 r -= weight 

239 total -= weight 

240 ordered.append(rdata) # pylint: disable=undefined-loop-variable 

241 del rdatas[n] # pylint: disable=undefined-loop-variable 

242 ordered.append(rdatas[0]) 

243 return ordered 

244 

245 

246def parse_formatted_hex(formatted, num_chunks, chunk_size, separator): 

247 if len(formatted) != num_chunks * (chunk_size + 1) - 1: 

248 raise ValueError("invalid formatted hex string") 

249 value = b"" 

250 for _ in range(num_chunks): 

251 chunk = formatted[0:chunk_size] 

252 value += int(chunk, 16).to_bytes(chunk_size // 2, "big") 

253 formatted = formatted[chunk_size:] 

254 if len(formatted) > 0 and formatted[0] != separator: 

255 raise ValueError("invalid formatted hex string") 

256 formatted = formatted[1:] 

257 return value