Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/edns.py: 41%

246 statements  

« prev     ^ index     » next       coverage.py v7.4.1, created at 2024-02-02 06:07 +0000

1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 

2 

3# Copyright (C) 2009-2017 Nominum, Inc. 

4# 

5# Permission to use, copy, modify, and distribute this software and its 

6# documentation for any purpose with or without fee is hereby granted, 

7# provided that the above copyright notice and this permission notice 

8# appear in all copies. 

9# 

10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 

11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 

12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 

13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 

14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 

15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 

16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 

17 

18"""EDNS Options""" 

19 

20import math 

21import socket 

22import struct 

23from typing import Any, Dict, Optional, Union 

24 

25import dns.enum 

26import dns.inet 

27import dns.rdata 

28import dns.wire 

29 

30 

31class OptionType(dns.enum.IntEnum): 

32 #: NSID 

33 NSID = 3 

34 #: DAU 

35 DAU = 5 

36 #: DHU 

37 DHU = 6 

38 #: N3U 

39 N3U = 7 

40 #: ECS (client-subnet) 

41 ECS = 8 

42 #: EXPIRE 

43 EXPIRE = 9 

44 #: COOKIE 

45 COOKIE = 10 

46 #: KEEPALIVE 

47 KEEPALIVE = 11 

48 #: PADDING 

49 PADDING = 12 

50 #: CHAIN 

51 CHAIN = 13 

52 #: EDE (extended-dns-error) 

53 EDE = 15 

54 

55 @classmethod 

56 def _maximum(cls): 

57 return 65535 

58 

59 

60class Option: 

61 

62 """Base class for all EDNS option types.""" 

63 

64 def __init__(self, otype: Union[OptionType, str]): 

65 """Initialize an option. 

66 

67 *otype*, a ``dns.edns.OptionType``, is the option type. 

68 """ 

69 self.otype = OptionType.make(otype) 

70 

71 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: 

72 """Convert an option to wire format. 

73 

74 Returns a ``bytes`` or ``None``. 

75 

76 """ 

77 raise NotImplementedError # pragma: no cover 

78 

79 @classmethod 

80 def from_wire_parser(cls, otype: OptionType, parser: "dns.wire.Parser") -> "Option": 

81 """Build an EDNS option object from wire format. 

82 

83 *otype*, a ``dns.edns.OptionType``, is the option type. 

84 

85 *parser*, a ``dns.wire.Parser``, the parser, which should be 

86 restructed to the option length. 

87 

88 Returns a ``dns.edns.Option``. 

89 """ 

90 raise NotImplementedError # pragma: no cover 

91 

92 def _cmp(self, other): 

93 """Compare an EDNS option with another option of the same type. 

94 

95 Returns < 0 if < *other*, 0 if == *other*, and > 0 if > *other*. 

96 """ 

97 wire = self.to_wire() 

98 owire = other.to_wire() 

99 if wire == owire: 

100 return 0 

101 if wire > owire: 

102 return 1 

103 return -1 

104 

105 def __eq__(self, other): 

106 if not isinstance(other, Option): 

107 return False 

108 if self.otype != other.otype: 

109 return False 

110 return self._cmp(other) == 0 

111 

112 def __ne__(self, other): 

113 if not isinstance(other, Option): 

114 return True 

115 if self.otype != other.otype: 

116 return True 

117 return self._cmp(other) != 0 

118 

119 def __lt__(self, other): 

120 if not isinstance(other, Option) or self.otype != other.otype: 

121 return NotImplemented 

122 return self._cmp(other) < 0 

123 

124 def __le__(self, other): 

125 if not isinstance(other, Option) or self.otype != other.otype: 

126 return NotImplemented 

127 return self._cmp(other) <= 0 

128 

129 def __ge__(self, other): 

130 if not isinstance(other, Option) or self.otype != other.otype: 

131 return NotImplemented 

132 return self._cmp(other) >= 0 

133 

134 def __gt__(self, other): 

135 if not isinstance(other, Option) or self.otype != other.otype: 

136 return NotImplemented 

137 return self._cmp(other) > 0 

138 

139 def __str__(self): 

140 return self.to_text() 

141 

142 

143class GenericOption(Option): # lgtm[py/missing-equals] 

144 

145 """Generic Option Class 

146 

147 This class is used for EDNS option types for which we have no better 

148 implementation. 

149 """ 

150 

151 def __init__(self, otype: Union[OptionType, str], data: Union[bytes, str]): 

152 super().__init__(otype) 

153 self.data = dns.rdata.Rdata._as_bytes(data, True) 

154 

155 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: 

156 if file: 

157 file.write(self.data) 

158 return None 

159 else: 

160 return self.data 

161 

162 def to_text(self) -> str: 

163 return "Generic %d" % self.otype 

164 

165 @classmethod 

166 def from_wire_parser( 

167 cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" 

168 ) -> Option: 

169 return cls(otype, parser.get_remaining()) 

170 

171 

172class ECSOption(Option): # lgtm[py/missing-equals] 

173 """EDNS Client Subnet (ECS, RFC7871)""" 

174 

175 def __init__(self, address: str, srclen: Optional[int] = None, scopelen: int = 0): 

176 """*address*, a ``str``, is the client address information. 

177 

178 *srclen*, an ``int``, the source prefix length, which is the 

179 leftmost number of bits of the address to be used for the 

180 lookup. The default is 24 for IPv4 and 56 for IPv6. 

181 

182 *scopelen*, an ``int``, the scope prefix length. This value 

183 must be 0 in queries, and should be set in responses. 

184 """ 

185 

186 super().__init__(OptionType.ECS) 

187 af = dns.inet.af_for_address(address) 

188 

189 if af == socket.AF_INET6: 

190 self.family = 2 

191 if srclen is None: 

192 srclen = 56 

193 address = dns.rdata.Rdata._as_ipv6_address(address) 

194 srclen = dns.rdata.Rdata._as_int(srclen, 0, 128) 

195 scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 128) 

196 elif af == socket.AF_INET: 

197 self.family = 1 

198 if srclen is None: 

199 srclen = 24 

200 address = dns.rdata.Rdata._as_ipv4_address(address) 

201 srclen = dns.rdata.Rdata._as_int(srclen, 0, 32) 

202 scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 32) 

203 else: # pragma: no cover (this will never happen) 

204 raise ValueError("Bad address family") 

205 

206 assert srclen is not None 

207 self.address = address 

208 self.srclen = srclen 

209 self.scopelen = scopelen 

210 

211 addrdata = dns.inet.inet_pton(af, address) 

212 nbytes = int(math.ceil(srclen / 8.0)) 

213 

214 # Truncate to srclen and pad to the end of the last octet needed 

215 # See RFC section 6 

216 self.addrdata = addrdata[:nbytes] 

217 nbits = srclen % 8 

218 if nbits != 0: 

219 last = struct.pack("B", ord(self.addrdata[-1:]) & (0xFF << (8 - nbits))) 

220 self.addrdata = self.addrdata[:-1] + last 

221 

222 def to_text(self) -> str: 

223 return "ECS {}/{} scope/{}".format(self.address, self.srclen, self.scopelen) 

224 

225 @staticmethod 

226 def from_text(text: str) -> Option: 

227 """Convert a string into a `dns.edns.ECSOption` 

228 

229 *text*, a `str`, the text form of the option. 

230 

231 Returns a `dns.edns.ECSOption`. 

232 

233 Examples: 

234 

235 >>> import dns.edns 

236 >>> 

237 >>> # basic example 

238 >>> dns.edns.ECSOption.from_text('1.2.3.4/24') 

239 >>> 

240 >>> # also understands scope 

241 >>> dns.edns.ECSOption.from_text('1.2.3.4/24/32') 

242 >>> 

243 >>> # IPv6 

244 >>> dns.edns.ECSOption.from_text('2001:4b98::1/64/64') 

245 >>> 

246 >>> # it understands results from `dns.edns.ECSOption.to_text()` 

247 >>> dns.edns.ECSOption.from_text('ECS 1.2.3.4/24/32') 

248 """ 

249 optional_prefix = "ECS" 

250 tokens = text.split() 

251 ecs_text = None 

252 if len(tokens) == 1: 

253 ecs_text = tokens[0] 

254 elif len(tokens) == 2: 

255 if tokens[0] != optional_prefix: 

256 raise ValueError('could not parse ECS from "{}"'.format(text)) 

257 ecs_text = tokens[1] 

258 else: 

259 raise ValueError('could not parse ECS from "{}"'.format(text)) 

260 n_slashes = ecs_text.count("/") 

261 if n_slashes == 1: 

262 address, tsrclen = ecs_text.split("/") 

263 tscope = "0" 

264 elif n_slashes == 2: 

265 address, tsrclen, tscope = ecs_text.split("/") 

266 else: 

267 raise ValueError('could not parse ECS from "{}"'.format(text)) 

268 try: 

269 scope = int(tscope) 

270 except ValueError: 

271 raise ValueError( 

272 "invalid scope " + '"{}": scope must be an integer'.format(tscope) 

273 ) 

274 try: 

275 srclen = int(tsrclen) 

276 except ValueError: 

277 raise ValueError( 

278 "invalid srclen " + '"{}": srclen must be an integer'.format(tsrclen) 

279 ) 

280 return ECSOption(address, srclen, scope) 

281 

282 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: 

283 value = ( 

284 struct.pack("!HBB", self.family, self.srclen, self.scopelen) + self.addrdata 

285 ) 

286 if file: 

287 file.write(value) 

288 return None 

289 else: 

290 return value 

291 

292 @classmethod 

293 def from_wire_parser( 

294 cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" 

295 ) -> Option: 

296 family, src, scope = parser.get_struct("!HBB") 

297 addrlen = int(math.ceil(src / 8.0)) 

298 prefix = parser.get_bytes(addrlen) 

299 if family == 1: 

300 pad = 4 - addrlen 

301 addr = dns.ipv4.inet_ntoa(prefix + b"\x00" * pad) 

302 elif family == 2: 

303 pad = 16 - addrlen 

304 addr = dns.ipv6.inet_ntoa(prefix + b"\x00" * pad) 

305 else: 

306 raise ValueError("unsupported family") 

307 

308 return cls(addr, src, scope) 

309 

310 

311class EDECode(dns.enum.IntEnum): 

312 OTHER = 0 

313 UNSUPPORTED_DNSKEY_ALGORITHM = 1 

314 UNSUPPORTED_DS_DIGEST_TYPE = 2 

315 STALE_ANSWER = 3 

316 FORGED_ANSWER = 4 

317 DNSSEC_INDETERMINATE = 5 

318 DNSSEC_BOGUS = 6 

319 SIGNATURE_EXPIRED = 7 

320 SIGNATURE_NOT_YET_VALID = 8 

321 DNSKEY_MISSING = 9 

322 RRSIGS_MISSING = 10 

323 NO_ZONE_KEY_BIT_SET = 11 

324 NSEC_MISSING = 12 

325 CACHED_ERROR = 13 

326 NOT_READY = 14 

327 BLOCKED = 15 

328 CENSORED = 16 

329 FILTERED = 17 

330 PROHIBITED = 18 

331 STALE_NXDOMAIN_ANSWER = 19 

332 NOT_AUTHORITATIVE = 20 

333 NOT_SUPPORTED = 21 

334 NO_REACHABLE_AUTHORITY = 22 

335 NETWORK_ERROR = 23 

336 INVALID_DATA = 24 

337 

338 @classmethod 

339 def _maximum(cls): 

340 return 65535 

341 

342 

343class EDEOption(Option): # lgtm[py/missing-equals] 

344 """Extended DNS Error (EDE, RFC8914)""" 

345 

346 def __init__(self, code: Union[EDECode, str], text: Optional[str] = None): 

347 """*code*, a ``dns.edns.EDECode`` or ``str``, the info code of the 

348 extended error. 

349 

350 *text*, a ``str`` or ``None``, specifying additional information about 

351 the error. 

352 """ 

353 

354 super().__init__(OptionType.EDE) 

355 

356 self.code = EDECode.make(code) 

357 if text is not None and not isinstance(text, str): 

358 raise ValueError("text must be string or None") 

359 self.text = text 

360 

361 def to_text(self) -> str: 

362 output = f"EDE {self.code}" 

363 if self.text is not None: 

364 output += f": {self.text}" 

365 return output 

366 

367 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]: 

368 value = struct.pack("!H", self.code) 

369 if self.text is not None: 

370 value += self.text.encode("utf8") 

371 

372 if file: 

373 file.write(value) 

374 return None 

375 else: 

376 return value 

377 

378 @classmethod 

379 def from_wire_parser( 

380 cls, otype: Union[OptionType, str], parser: "dns.wire.Parser" 

381 ) -> Option: 

382 code = EDECode.make(parser.get_uint16()) 

383 text = parser.get_remaining() 

384 

385 if text: 

386 if text[-1] == 0: # text MAY be null-terminated 

387 text = text[:-1] 

388 btext = text.decode("utf8") 

389 else: 

390 btext = None 

391 

392 return cls(code, btext) 

393 

394 

395_type_to_class: Dict[OptionType, Any] = { 

396 OptionType.ECS: ECSOption, 

397 OptionType.EDE: EDEOption, 

398} 

399 

400 

401def get_option_class(otype: OptionType) -> Any: 

402 """Return the class for the specified option type. 

403 

404 The GenericOption class is used if a more specific class is not 

405 known. 

406 """ 

407 

408 cls = _type_to_class.get(otype) 

409 if cls is None: 

410 cls = GenericOption 

411 return cls 

412 

413 

414def option_from_wire_parser( 

415 otype: Union[OptionType, str], parser: "dns.wire.Parser" 

416) -> Option: 

417 """Build an EDNS option object from wire format. 

418 

419 *otype*, an ``int``, is the option type. 

420 

421 *parser*, a ``dns.wire.Parser``, the parser, which should be 

422 restricted to the option length. 

423 

424 Returns an instance of a subclass of ``dns.edns.Option``. 

425 """ 

426 otype = OptionType.make(otype) 

427 cls = get_option_class(otype) 

428 return cls.from_wire_parser(otype, parser) 

429 

430 

431def option_from_wire( 

432 otype: Union[OptionType, str], wire: bytes, current: int, olen: int 

433) -> Option: 

434 """Build an EDNS option object from wire format. 

435 

436 *otype*, an ``int``, is the option type. 

437 

438 *wire*, a ``bytes``, is the wire-format message. 

439 

440 *current*, an ``int``, is the offset in *wire* of the beginning 

441 of the rdata. 

442 

443 *olen*, an ``int``, is the length of the wire-format option data 

444 

445 Returns an instance of a subclass of ``dns.edns.Option``. 

446 """ 

447 parser = dns.wire.Parser(wire, current) 

448 with parser.restrict_to(olen): 

449 return option_from_wire_parser(otype, parser) 

450 

451 

452def register_type(implementation: Any, otype: OptionType) -> None: 

453 """Register the implementation of an option type. 

454 

455 *implementation*, a ``class``, is a subclass of ``dns.edns.Option``. 

456 

457 *otype*, an ``int``, is the option type. 

458 """ 

459 

460 _type_to_class[otype] = implementation 

461 

462 

463### BEGIN generated OptionType constants 

464 

465NSID = OptionType.NSID 

466DAU = OptionType.DAU 

467DHU = OptionType.DHU 

468N3U = OptionType.N3U 

469ECS = OptionType.ECS 

470EXPIRE = OptionType.EXPIRE 

471COOKIE = OptionType.COOKIE 

472KEEPALIVE = OptionType.KEEPALIVE 

473PADDING = OptionType.PADDING 

474CHAIN = OptionType.CHAIN 

475EDE = OptionType.EDE 

476 

477### END generated OptionType constants