Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/edns.py: 41%
246 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2009-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""EDNS Options"""
20import math
21import socket
22import struct
23from typing import Any, Dict, Optional, Union
25import dns.enum
26import dns.inet
27import dns.rdata
28import dns.wire
31class OptionType(dns.enum.IntEnum):
32 #: NSID
33 NSID = 3
34 #: DAU
35 DAU = 5
36 #: DHU
37 DHU = 6
38 #: N3U
39 N3U = 7
40 #: ECS (client-subnet)
41 ECS = 8
42 #: EXPIRE
43 EXPIRE = 9
44 #: COOKIE
45 COOKIE = 10
46 #: KEEPALIVE
47 KEEPALIVE = 11
48 #: PADDING
49 PADDING = 12
50 #: CHAIN
51 CHAIN = 13
52 #: EDE (extended-dns-error)
53 EDE = 15
55 @classmethod
56 def _maximum(cls):
57 return 65535
60class Option:
62 """Base class for all EDNS option types."""
64 def __init__(self, otype: Union[OptionType, str]):
65 """Initialize an option.
67 *otype*, a ``dns.edns.OptionType``, is the option type.
68 """
69 self.otype = OptionType.make(otype)
71 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
72 """Convert an option to wire format.
74 Returns a ``bytes`` or ``None``.
76 """
77 raise NotImplementedError # pragma: no cover
79 @classmethod
80 def from_wire_parser(cls, otype: OptionType, parser: "dns.wire.Parser") -> "Option":
81 """Build an EDNS option object from wire format.
83 *otype*, a ``dns.edns.OptionType``, is the option type.
85 *parser*, a ``dns.wire.Parser``, the parser, which should be
86 restructed to the option length.
88 Returns a ``dns.edns.Option``.
89 """
90 raise NotImplementedError # pragma: no cover
92 def _cmp(self, other):
93 """Compare an EDNS option with another option of the same type.
95 Returns < 0 if < *other*, 0 if == *other*, and > 0 if > *other*.
96 """
97 wire = self.to_wire()
98 owire = other.to_wire()
99 if wire == owire:
100 return 0
101 if wire > owire:
102 return 1
103 return -1
105 def __eq__(self, other):
106 if not isinstance(other, Option):
107 return False
108 if self.otype != other.otype:
109 return False
110 return self._cmp(other) == 0
112 def __ne__(self, other):
113 if not isinstance(other, Option):
114 return True
115 if self.otype != other.otype:
116 return True
117 return self._cmp(other) != 0
119 def __lt__(self, other):
120 if not isinstance(other, Option) or self.otype != other.otype:
121 return NotImplemented
122 return self._cmp(other) < 0
124 def __le__(self, other):
125 if not isinstance(other, Option) or self.otype != other.otype:
126 return NotImplemented
127 return self._cmp(other) <= 0
129 def __ge__(self, other):
130 if not isinstance(other, Option) or self.otype != other.otype:
131 return NotImplemented
132 return self._cmp(other) >= 0
134 def __gt__(self, other):
135 if not isinstance(other, Option) or self.otype != other.otype:
136 return NotImplemented
137 return self._cmp(other) > 0
139 def __str__(self):
140 return self.to_text()
143class GenericOption(Option): # lgtm[py/missing-equals]
145 """Generic Option Class
147 This class is used for EDNS option types for which we have no better
148 implementation.
149 """
151 def __init__(self, otype: Union[OptionType, str], data: Union[bytes, str]):
152 super().__init__(otype)
153 self.data = dns.rdata.Rdata._as_bytes(data, True)
155 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
156 if file:
157 file.write(self.data)
158 return None
159 else:
160 return self.data
162 def to_text(self) -> str:
163 return "Generic %d" % self.otype
165 @classmethod
166 def from_wire_parser(
167 cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
168 ) -> Option:
169 return cls(otype, parser.get_remaining())
172class ECSOption(Option): # lgtm[py/missing-equals]
173 """EDNS Client Subnet (ECS, RFC7871)"""
175 def __init__(self, address: str, srclen: Optional[int] = None, scopelen: int = 0):
176 """*address*, a ``str``, is the client address information.
178 *srclen*, an ``int``, the source prefix length, which is the
179 leftmost number of bits of the address to be used for the
180 lookup. The default is 24 for IPv4 and 56 for IPv6.
182 *scopelen*, an ``int``, the scope prefix length. This value
183 must be 0 in queries, and should be set in responses.
184 """
186 super().__init__(OptionType.ECS)
187 af = dns.inet.af_for_address(address)
189 if af == socket.AF_INET6:
190 self.family = 2
191 if srclen is None:
192 srclen = 56
193 address = dns.rdata.Rdata._as_ipv6_address(address)
194 srclen = dns.rdata.Rdata._as_int(srclen, 0, 128)
195 scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 128)
196 elif af == socket.AF_INET:
197 self.family = 1
198 if srclen is None:
199 srclen = 24
200 address = dns.rdata.Rdata._as_ipv4_address(address)
201 srclen = dns.rdata.Rdata._as_int(srclen, 0, 32)
202 scopelen = dns.rdata.Rdata._as_int(scopelen, 0, 32)
203 else: # pragma: no cover (this will never happen)
204 raise ValueError("Bad address family")
206 assert srclen is not None
207 self.address = address
208 self.srclen = srclen
209 self.scopelen = scopelen
211 addrdata = dns.inet.inet_pton(af, address)
212 nbytes = int(math.ceil(srclen / 8.0))
214 # Truncate to srclen and pad to the end of the last octet needed
215 # See RFC section 6
216 self.addrdata = addrdata[:nbytes]
217 nbits = srclen % 8
218 if nbits != 0:
219 last = struct.pack("B", ord(self.addrdata[-1:]) & (0xFF << (8 - nbits)))
220 self.addrdata = self.addrdata[:-1] + last
222 def to_text(self) -> str:
223 return "ECS {}/{} scope/{}".format(self.address, self.srclen, self.scopelen)
225 @staticmethod
226 def from_text(text: str) -> Option:
227 """Convert a string into a `dns.edns.ECSOption`
229 *text*, a `str`, the text form of the option.
231 Returns a `dns.edns.ECSOption`.
233 Examples:
235 >>> import dns.edns
236 >>>
237 >>> # basic example
238 >>> dns.edns.ECSOption.from_text('1.2.3.4/24')
239 >>>
240 >>> # also understands scope
241 >>> dns.edns.ECSOption.from_text('1.2.3.4/24/32')
242 >>>
243 >>> # IPv6
244 >>> dns.edns.ECSOption.from_text('2001:4b98::1/64/64')
245 >>>
246 >>> # it understands results from `dns.edns.ECSOption.to_text()`
247 >>> dns.edns.ECSOption.from_text('ECS 1.2.3.4/24/32')
248 """
249 optional_prefix = "ECS"
250 tokens = text.split()
251 ecs_text = None
252 if len(tokens) == 1:
253 ecs_text = tokens[0]
254 elif len(tokens) == 2:
255 if tokens[0] != optional_prefix:
256 raise ValueError('could not parse ECS from "{}"'.format(text))
257 ecs_text = tokens[1]
258 else:
259 raise ValueError('could not parse ECS from "{}"'.format(text))
260 n_slashes = ecs_text.count("/")
261 if n_slashes == 1:
262 address, tsrclen = ecs_text.split("/")
263 tscope = "0"
264 elif n_slashes == 2:
265 address, tsrclen, tscope = ecs_text.split("/")
266 else:
267 raise ValueError('could not parse ECS from "{}"'.format(text))
268 try:
269 scope = int(tscope)
270 except ValueError:
271 raise ValueError(
272 "invalid scope " + '"{}": scope must be an integer'.format(tscope)
273 )
274 try:
275 srclen = int(tsrclen)
276 except ValueError:
277 raise ValueError(
278 "invalid srclen " + '"{}": srclen must be an integer'.format(tsrclen)
279 )
280 return ECSOption(address, srclen, scope)
282 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
283 value = (
284 struct.pack("!HBB", self.family, self.srclen, self.scopelen) + self.addrdata
285 )
286 if file:
287 file.write(value)
288 return None
289 else:
290 return value
292 @classmethod
293 def from_wire_parser(
294 cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
295 ) -> Option:
296 family, src, scope = parser.get_struct("!HBB")
297 addrlen = int(math.ceil(src / 8.0))
298 prefix = parser.get_bytes(addrlen)
299 if family == 1:
300 pad = 4 - addrlen
301 addr = dns.ipv4.inet_ntoa(prefix + b"\x00" * pad)
302 elif family == 2:
303 pad = 16 - addrlen
304 addr = dns.ipv6.inet_ntoa(prefix + b"\x00" * pad)
305 else:
306 raise ValueError("unsupported family")
308 return cls(addr, src, scope)
311class EDECode(dns.enum.IntEnum):
312 OTHER = 0
313 UNSUPPORTED_DNSKEY_ALGORITHM = 1
314 UNSUPPORTED_DS_DIGEST_TYPE = 2
315 STALE_ANSWER = 3
316 FORGED_ANSWER = 4
317 DNSSEC_INDETERMINATE = 5
318 DNSSEC_BOGUS = 6
319 SIGNATURE_EXPIRED = 7
320 SIGNATURE_NOT_YET_VALID = 8
321 DNSKEY_MISSING = 9
322 RRSIGS_MISSING = 10
323 NO_ZONE_KEY_BIT_SET = 11
324 NSEC_MISSING = 12
325 CACHED_ERROR = 13
326 NOT_READY = 14
327 BLOCKED = 15
328 CENSORED = 16
329 FILTERED = 17
330 PROHIBITED = 18
331 STALE_NXDOMAIN_ANSWER = 19
332 NOT_AUTHORITATIVE = 20
333 NOT_SUPPORTED = 21
334 NO_REACHABLE_AUTHORITY = 22
335 NETWORK_ERROR = 23
336 INVALID_DATA = 24
338 @classmethod
339 def _maximum(cls):
340 return 65535
343class EDEOption(Option): # lgtm[py/missing-equals]
344 """Extended DNS Error (EDE, RFC8914)"""
346 def __init__(self, code: Union[EDECode, str], text: Optional[str] = None):
347 """*code*, a ``dns.edns.EDECode`` or ``str``, the info code of the
348 extended error.
350 *text*, a ``str`` or ``None``, specifying additional information about
351 the error.
352 """
354 super().__init__(OptionType.EDE)
356 self.code = EDECode.make(code)
357 if text is not None and not isinstance(text, str):
358 raise ValueError("text must be string or None")
359 self.text = text
361 def to_text(self) -> str:
362 output = f"EDE {self.code}"
363 if self.text is not None:
364 output += f": {self.text}"
365 return output
367 def to_wire(self, file: Optional[Any] = None) -> Optional[bytes]:
368 value = struct.pack("!H", self.code)
369 if self.text is not None:
370 value += self.text.encode("utf8")
372 if file:
373 file.write(value)
374 return None
375 else:
376 return value
378 @classmethod
379 def from_wire_parser(
380 cls, otype: Union[OptionType, str], parser: "dns.wire.Parser"
381 ) -> Option:
382 code = EDECode.make(parser.get_uint16())
383 text = parser.get_remaining()
385 if text:
386 if text[-1] == 0: # text MAY be null-terminated
387 text = text[:-1]
388 btext = text.decode("utf8")
389 else:
390 btext = None
392 return cls(code, btext)
395_type_to_class: Dict[OptionType, Any] = {
396 OptionType.ECS: ECSOption,
397 OptionType.EDE: EDEOption,
398}
401def get_option_class(otype: OptionType) -> Any:
402 """Return the class for the specified option type.
404 The GenericOption class is used if a more specific class is not
405 known.
406 """
408 cls = _type_to_class.get(otype)
409 if cls is None:
410 cls = GenericOption
411 return cls
414def option_from_wire_parser(
415 otype: Union[OptionType, str], parser: "dns.wire.Parser"
416) -> Option:
417 """Build an EDNS option object from wire format.
419 *otype*, an ``int``, is the option type.
421 *parser*, a ``dns.wire.Parser``, the parser, which should be
422 restricted to the option length.
424 Returns an instance of a subclass of ``dns.edns.Option``.
425 """
426 otype = OptionType.make(otype)
427 cls = get_option_class(otype)
428 return cls.from_wire_parser(otype, parser)
431def option_from_wire(
432 otype: Union[OptionType, str], wire: bytes, current: int, olen: int
433) -> Option:
434 """Build an EDNS option object from wire format.
436 *otype*, an ``int``, is the option type.
438 *wire*, a ``bytes``, is the wire-format message.
440 *current*, an ``int``, is the offset in *wire* of the beginning
441 of the rdata.
443 *olen*, an ``int``, is the length of the wire-format option data
445 Returns an instance of a subclass of ``dns.edns.Option``.
446 """
447 parser = dns.wire.Parser(wire, current)
448 with parser.restrict_to(olen):
449 return option_from_wire_parser(otype, parser)
452def register_type(implementation: Any, otype: OptionType) -> None:
453 """Register the implementation of an option type.
455 *implementation*, a ``class``, is a subclass of ``dns.edns.Option``.
457 *otype*, an ``int``, is the option type.
458 """
460 _type_to_class[otype] = implementation
463### BEGIN generated OptionType constants
465NSID = OptionType.NSID
466DAU = OptionType.DAU
467DHU = OptionType.DHU
468N3U = OptionType.N3U
469ECS = OptionType.ECS
470EXPIRE = OptionType.EXPIRE
471COOKIE = OptionType.COOKIE
472KEEPALIVE = OptionType.KEEPALIVE
473PADDING = OptionType.PADDING
474CHAIN = OptionType.CHAIN
475EDE = OptionType.EDE
477### END generated OptionType constants