Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdata.py: 47%
377 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS rdata."""
20import base64
21import binascii
22import inspect
23import io
24import itertools
25import random
26from importlib import import_module
27from typing import Any, Dict, Optional, Tuple, Union
29import dns.exception
30import dns.immutable
31import dns.ipv4
32import dns.ipv6
33import dns.name
34import dns.rdataclass
35import dns.rdatatype
36import dns.tokenizer
37import dns.ttl
38import dns.wire
40_chunksize = 32
42# We currently allow comparisons for rdata with relative names for backwards
43# compatibility, but in the future we will not, as these kinds of comparisons
44# can lead to subtle bugs if code is not carefully written.
45#
46# This switch allows the future behavior to be turned on so code can be
47# tested with it.
48_allow_relative_comparisons = True
51class NoRelativeRdataOrdering(dns.exception.DNSException):
52 """An attempt was made to do an ordered comparison of one or more
53 rdata with relative names. The only reliable way of sorting rdata
54 is to use non-relativized rdata.
56 """
59def _wordbreak(data, chunksize=_chunksize, separator=b" "):
60 """Break a binary string into chunks of chunksize characters separated by
61 a space.
62 """
64 if not chunksize:
65 return data.decode()
66 return separator.join(
67 [data[i : i + chunksize] for i in range(0, len(data), chunksize)]
68 ).decode()
71# pylint: disable=unused-argument
74def _hexify(data, chunksize=_chunksize, separator=b" ", **kw):
75 """Convert a binary string into its hex encoding, broken up into chunks
76 of chunksize characters separated by a separator.
77 """
79 return _wordbreak(binascii.hexlify(data), chunksize, separator)
82def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw):
83 """Convert a binary string into its base64 encoding, broken up into chunks
84 of chunksize characters separated by a separator.
85 """
87 return _wordbreak(base64.b64encode(data), chunksize, separator)
90# pylint: enable=unused-argument
92__escaped = b'"\\'
95def _escapify(qstring):
96 """Escape the characters in a quoted string which need it."""
98 if isinstance(qstring, str):
99 qstring = qstring.encode()
100 if not isinstance(qstring, bytearray):
101 qstring = bytearray(qstring)
103 text = ""
104 for c in qstring:
105 if c in __escaped:
106 text += "\\" + chr(c)
107 elif c >= 0x20 and c < 0x7F:
108 text += chr(c)
109 else:
110 text += "\\%03d" % c
111 return text
114def _truncate_bitmap(what):
115 """Determine the index of greatest byte that isn't all zeros, and
116 return the bitmap that contains all the bytes less than that index.
117 """
119 for i in range(len(what) - 1, -1, -1):
120 if what[i] != 0:
121 return what[0 : i + 1]
122 return what[0:1]
125# So we don't have to edit all the rdata classes...
126_constify = dns.immutable.constify
129@dns.immutable.immutable
130class Rdata:
131 """Base class for all DNS rdata types."""
133 __slots__ = ["rdclass", "rdtype", "rdcomment"]
135 def __init__(self, rdclass, rdtype):
136 """Initialize an rdata.
138 *rdclass*, an ``int`` is the rdataclass of the Rdata.
140 *rdtype*, an ``int`` is the rdatatype of the Rdata.
141 """
143 self.rdclass = self._as_rdataclass(rdclass)
144 self.rdtype = self._as_rdatatype(rdtype)
145 self.rdcomment = None
147 def _get_all_slots(self):
148 return itertools.chain.from_iterable(
149 getattr(cls, "__slots__", []) for cls in self.__class__.__mro__
150 )
152 def __getstate__(self):
153 # We used to try to do a tuple of all slots here, but it
154 # doesn't work as self._all_slots isn't available at
155 # __setstate__() time. Before that we tried to store a tuple
156 # of __slots__, but that didn't work as it didn't store the
157 # slots defined by ancestors. This older way didn't fail
158 # outright, but ended up with partially broken objects, e.g.
159 # if you unpickled an A RR it wouldn't have rdclass and rdtype
160 # attributes, and would compare badly.
161 state = {}
162 for slot in self._get_all_slots():
163 state[slot] = getattr(self, slot)
164 return state
166 def __setstate__(self, state):
167 for slot, val in state.items():
168 object.__setattr__(self, slot, val)
169 if not hasattr(self, "rdcomment"):
170 # Pickled rdata from 2.0.x might not have a rdcomment, so add
171 # it if needed.
172 object.__setattr__(self, "rdcomment", None)
174 def covers(self) -> dns.rdatatype.RdataType:
175 """Return the type a Rdata covers.
177 DNS SIG/RRSIG rdatas apply to a specific type; this type is
178 returned by the covers() function. If the rdata type is not
179 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when
180 creating rdatasets, allowing the rdataset to contain only RRSIGs
181 of a particular type, e.g. RRSIG(NS).
183 Returns a ``dns.rdatatype.RdataType``.
184 """
186 return dns.rdatatype.NONE
188 def extended_rdatatype(self) -> int:
189 """Return a 32-bit type value, the least significant 16 bits of
190 which are the ordinary DNS type, and the upper 16 bits of which are
191 the "covered" type, if any.
193 Returns an ``int``.
194 """
196 return self.covers() << 16 | self.rdtype
198 def to_text(
199 self,
200 origin: Optional[dns.name.Name] = None,
201 relativize: bool = True,
202 **kw: Dict[str, Any],
203 ) -> str:
204 """Convert an rdata to text format.
206 Returns a ``str``.
207 """
209 raise NotImplementedError # pragma: no cover
211 def _to_wire(
212 self,
213 file: Optional[Any],
214 compress: Optional[dns.name.CompressType] = None,
215 origin: Optional[dns.name.Name] = None,
216 canonicalize: bool = False,
217 ) -> bytes:
218 raise NotImplementedError # pragma: no cover
220 def to_wire(
221 self,
222 file: Optional[Any] = None,
223 compress: Optional[dns.name.CompressType] = None,
224 origin: Optional[dns.name.Name] = None,
225 canonicalize: bool = False,
226 ) -> bytes:
227 """Convert an rdata to wire format.
229 Returns a ``bytes`` or ``None``.
230 """
232 if file:
233 return self._to_wire(file, compress, origin, canonicalize)
234 else:
235 f = io.BytesIO()
236 self._to_wire(f, compress, origin, canonicalize)
237 return f.getvalue()
239 def to_generic(
240 self, origin: Optional[dns.name.Name] = None
241 ) -> "dns.rdata.GenericRdata":
242 """Creates a dns.rdata.GenericRdata equivalent of this rdata.
244 Returns a ``dns.rdata.GenericRdata``.
245 """
246 return dns.rdata.GenericRdata(
247 self.rdclass, self.rdtype, self.to_wire(origin=origin)
248 )
250 def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes:
251 """Convert rdata to a format suitable for digesting in hashes. This
252 is also the DNSSEC canonical form.
254 Returns a ``bytes``.
255 """
257 return self.to_wire(origin=origin, canonicalize=True)
259 def __repr__(self):
260 covers = self.covers()
261 if covers == dns.rdatatype.NONE:
262 ctext = ""
263 else:
264 ctext = "(" + dns.rdatatype.to_text(covers) + ")"
265 return (
266 "<DNS "
267 + dns.rdataclass.to_text(self.rdclass)
268 + " "
269 + dns.rdatatype.to_text(self.rdtype)
270 + ctext
271 + " rdata: "
272 + str(self)
273 + ">"
274 )
276 def __str__(self):
277 return self.to_text()
279 def _cmp(self, other):
280 """Compare an rdata with another rdata of the same rdtype and
281 rdclass.
283 For rdata with only absolute names:
284 Return < 0 if self < other in the DNSSEC ordering, 0 if self
285 == other, and > 0 if self > other.
286 For rdata with at least one relative names:
287 The rdata sorts before any rdata with only absolute names.
288 When compared with another relative rdata, all names are
289 made absolute as if they were relative to the root, as the
290 proper origin is not available. While this creates a stable
291 ordering, it is NOT guaranteed to be the DNSSEC ordering.
292 In the future, all ordering comparisons for rdata with
293 relative names will be disallowed.
294 """
295 try:
296 our = self.to_digestable()
297 our_relative = False
298 except dns.name.NeedAbsoluteNameOrOrigin:
299 if _allow_relative_comparisons:
300 our = self.to_digestable(dns.name.root)
301 our_relative = True
302 try:
303 their = other.to_digestable()
304 their_relative = False
305 except dns.name.NeedAbsoluteNameOrOrigin:
306 if _allow_relative_comparisons:
307 their = other.to_digestable(dns.name.root)
308 their_relative = True
309 if _allow_relative_comparisons:
310 if our_relative != their_relative:
311 # For the purpose of comparison, all rdata with at least one
312 # relative name is less than an rdata with only absolute names.
313 if our_relative:
314 return -1
315 else:
316 return 1
317 elif our_relative or their_relative:
318 raise NoRelativeRdataOrdering
319 if our == their:
320 return 0
321 elif our > their:
322 return 1
323 else:
324 return -1
326 def __eq__(self, other):
327 if not isinstance(other, Rdata):
328 return False
329 if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
330 return False
331 our_relative = False
332 their_relative = False
333 try:
334 our = self.to_digestable()
335 except dns.name.NeedAbsoluteNameOrOrigin:
336 our = self.to_digestable(dns.name.root)
337 our_relative = True
338 try:
339 their = other.to_digestable()
340 except dns.name.NeedAbsoluteNameOrOrigin:
341 their = other.to_digestable(dns.name.root)
342 their_relative = True
343 if our_relative != their_relative:
344 return False
345 return our == their
347 def __ne__(self, other):
348 if not isinstance(other, Rdata):
349 return True
350 if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
351 return True
352 return not self.__eq__(other)
354 def __lt__(self, other):
355 if (
356 not isinstance(other, Rdata)
357 or self.rdclass != other.rdclass
358 or self.rdtype != other.rdtype
359 ):
360 return NotImplemented
361 return self._cmp(other) < 0
363 def __le__(self, other):
364 if (
365 not isinstance(other, Rdata)
366 or self.rdclass != other.rdclass
367 or self.rdtype != other.rdtype
368 ):
369 return NotImplemented
370 return self._cmp(other) <= 0
372 def __ge__(self, other):
373 if (
374 not isinstance(other, Rdata)
375 or self.rdclass != other.rdclass
376 or self.rdtype != other.rdtype
377 ):
378 return NotImplemented
379 return self._cmp(other) >= 0
381 def __gt__(self, other):
382 if (
383 not isinstance(other, Rdata)
384 or self.rdclass != other.rdclass
385 or self.rdtype != other.rdtype
386 ):
387 return NotImplemented
388 return self._cmp(other) > 0
390 def __hash__(self):
391 return hash(self.to_digestable(dns.name.root))
393 @classmethod
394 def from_text(
395 cls,
396 rdclass: dns.rdataclass.RdataClass,
397 rdtype: dns.rdatatype.RdataType,
398 tok: dns.tokenizer.Tokenizer,
399 origin: Optional[dns.name.Name] = None,
400 relativize: bool = True,
401 relativize_to: Optional[dns.name.Name] = None,
402 ) -> "Rdata":
403 raise NotImplementedError # pragma: no cover
405 @classmethod
406 def from_wire_parser(
407 cls,
408 rdclass: dns.rdataclass.RdataClass,
409 rdtype: dns.rdatatype.RdataType,
410 parser: dns.wire.Parser,
411 origin: Optional[dns.name.Name] = None,
412 ) -> "Rdata":
413 raise NotImplementedError # pragma: no cover
415 def replace(self, **kwargs: Any) -> "Rdata":
416 """
417 Create a new Rdata instance based on the instance replace was
418 invoked on. It is possible to pass different parameters to
419 override the corresponding properties of the base Rdata.
421 Any field specific to the Rdata type can be replaced, but the
422 *rdtype* and *rdclass* fields cannot.
424 Returns an instance of the same Rdata subclass as *self*.
425 """
427 # Get the constructor parameters.
428 parameters = inspect.signature(self.__init__).parameters # type: ignore
430 # Ensure that all of the arguments correspond to valid fields.
431 # Don't allow rdclass or rdtype to be changed, though.
432 for key in kwargs:
433 if key == "rdcomment":
434 continue
435 if key not in parameters:
436 raise AttributeError(
437 "'{}' object has no attribute '{}'".format(
438 self.__class__.__name__, key
439 )
440 )
441 if key in ("rdclass", "rdtype"):
442 raise AttributeError(
443 "Cannot overwrite '{}' attribute '{}'".format(
444 self.__class__.__name__, key
445 )
446 )
448 # Construct the parameter list. For each field, use the value in
449 # kwargs if present, and the current value otherwise.
450 args = (kwargs.get(key, getattr(self, key)) for key in parameters)
452 # Create, validate, and return the new object.
453 rd = self.__class__(*args)
454 # The comment is not set in the constructor, so give it special
455 # handling.
456 rdcomment = kwargs.get("rdcomment", self.rdcomment)
457 if rdcomment is not None:
458 object.__setattr__(rd, "rdcomment", rdcomment)
459 return rd
461 # Type checking and conversion helpers. These are class methods as
462 # they don't touch object state and may be useful to others.
464 @classmethod
465 def _as_rdataclass(cls, value):
466 return dns.rdataclass.RdataClass.make(value)
468 @classmethod
469 def _as_rdatatype(cls, value):
470 return dns.rdatatype.RdataType.make(value)
472 @classmethod
473 def _as_bytes(
474 cls,
475 value: Any,
476 encode: bool = False,
477 max_length: Optional[int] = None,
478 empty_ok: bool = True,
479 ) -> bytes:
480 if encode and isinstance(value, str):
481 bvalue = value.encode()
482 elif isinstance(value, bytearray):
483 bvalue = bytes(value)
484 elif isinstance(value, bytes):
485 bvalue = value
486 else:
487 raise ValueError("not bytes")
488 if max_length is not None and len(bvalue) > max_length:
489 raise ValueError("too long")
490 if not empty_ok and len(bvalue) == 0:
491 raise ValueError("empty bytes not allowed")
492 return bvalue
494 @classmethod
495 def _as_name(cls, value):
496 # Note that proper name conversion (e.g. with origin and IDNA
497 # awareness) is expected to be done via from_text. This is just
498 # a simple thing for people invoking the constructor directly.
499 if isinstance(value, str):
500 return dns.name.from_text(value)
501 elif not isinstance(value, dns.name.Name):
502 raise ValueError("not a name")
503 return value
505 @classmethod
506 def _as_uint8(cls, value):
507 if not isinstance(value, int):
508 raise ValueError("not an integer")
509 if value < 0 or value > 255:
510 raise ValueError("not a uint8")
511 return value
513 @classmethod
514 def _as_uint16(cls, value):
515 if not isinstance(value, int):
516 raise ValueError("not an integer")
517 if value < 0 or value > 65535:
518 raise ValueError("not a uint16")
519 return value
521 @classmethod
522 def _as_uint32(cls, value):
523 if not isinstance(value, int):
524 raise ValueError("not an integer")
525 if value < 0 or value > 4294967295:
526 raise ValueError("not a uint32")
527 return value
529 @classmethod
530 def _as_uint48(cls, value):
531 if not isinstance(value, int):
532 raise ValueError("not an integer")
533 if value < 0 or value > 281474976710655:
534 raise ValueError("not a uint48")
535 return value
537 @classmethod
538 def _as_int(cls, value, low=None, high=None):
539 if not isinstance(value, int):
540 raise ValueError("not an integer")
541 if low is not None and value < low:
542 raise ValueError("value too small")
543 if high is not None and value > high:
544 raise ValueError("value too large")
545 return value
547 @classmethod
548 def _as_ipv4_address(cls, value):
549 if isinstance(value, str):
550 return dns.ipv4.canonicalize(value)
551 elif isinstance(value, bytes):
552 return dns.ipv4.inet_ntoa(value)
553 else:
554 raise ValueError("not an IPv4 address")
556 @classmethod
557 def _as_ipv6_address(cls, value):
558 if isinstance(value, str):
559 return dns.ipv6.canonicalize(value)
560 elif isinstance(value, bytes):
561 return dns.ipv6.inet_ntoa(value)
562 else:
563 raise ValueError("not an IPv6 address")
565 @classmethod
566 def _as_bool(cls, value):
567 if isinstance(value, bool):
568 return value
569 else:
570 raise ValueError("not a boolean")
572 @classmethod
573 def _as_ttl(cls, value):
574 if isinstance(value, int):
575 return cls._as_int(value, 0, dns.ttl.MAX_TTL)
576 elif isinstance(value, str):
577 return dns.ttl.from_text(value)
578 else:
579 raise ValueError("not a TTL")
581 @classmethod
582 def _as_tuple(cls, value, as_value):
583 try:
584 # For user convenience, if value is a singleton of the list
585 # element type, wrap it in a tuple.
586 return (as_value(value),)
587 except Exception:
588 # Otherwise, check each element of the iterable *value*
589 # against *as_value*.
590 return tuple(as_value(v) for v in value)
592 # Processing order
594 @classmethod
595 def _processing_order(cls, iterable):
596 items = list(iterable)
597 random.shuffle(items)
598 return items
601@dns.immutable.immutable
602class GenericRdata(Rdata):
604 """Generic Rdata Class
606 This class is used for rdata types for which we have no better
607 implementation. It implements the DNS "unknown RRs" scheme.
608 """
610 __slots__ = ["data"]
612 def __init__(self, rdclass, rdtype, data):
613 super().__init__(rdclass, rdtype)
614 self.data = data
616 def to_text(
617 self,
618 origin: Optional[dns.name.Name] = None,
619 relativize: bool = True,
620 **kw: Dict[str, Any],
621 ) -> str:
622 return r"\# %d " % len(self.data) + _hexify(self.data, **kw)
624 @classmethod
625 def from_text(
626 cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
627 ):
628 token = tok.get()
629 if not token.is_identifier() or token.value != r"\#":
630 raise dns.exception.SyntaxError(r"generic rdata does not start with \#")
631 length = tok.get_int()
632 hex = tok.concatenate_remaining_identifiers(True).encode()
633 data = binascii.unhexlify(hex)
634 if len(data) != length:
635 raise dns.exception.SyntaxError("generic rdata hex data has wrong length")
636 return cls(rdclass, rdtype, data)
638 def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
639 file.write(self.data)
641 @classmethod
642 def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
643 return cls(rdclass, rdtype, parser.get_remaining())
646_rdata_classes: Dict[
647 Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any
648] = {}
649_module_prefix = "dns.rdtypes"
652def get_rdata_class(rdclass, rdtype):
653 cls = _rdata_classes.get((rdclass, rdtype))
654 if not cls:
655 cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype))
656 if not cls:
657 rdclass_text = dns.rdataclass.to_text(rdclass)
658 rdtype_text = dns.rdatatype.to_text(rdtype)
659 rdtype_text = rdtype_text.replace("-", "_")
660 try:
661 mod = import_module(
662 ".".join([_module_prefix, rdclass_text, rdtype_text])
663 )
664 cls = getattr(mod, rdtype_text)
665 _rdata_classes[(rdclass, rdtype)] = cls
666 except ImportError:
667 try:
668 mod = import_module(".".join([_module_prefix, "ANY", rdtype_text]))
669 cls = getattr(mod, rdtype_text)
670 _rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls
671 _rdata_classes[(rdclass, rdtype)] = cls
672 except ImportError:
673 pass
674 if not cls:
675 cls = GenericRdata
676 _rdata_classes[(rdclass, rdtype)] = cls
677 return cls
680def from_text(
681 rdclass: Union[dns.rdataclass.RdataClass, str],
682 rdtype: Union[dns.rdatatype.RdataType, str],
683 tok: Union[dns.tokenizer.Tokenizer, str],
684 origin: Optional[dns.name.Name] = None,
685 relativize: bool = True,
686 relativize_to: Optional[dns.name.Name] = None,
687 idna_codec: Optional[dns.name.IDNACodec] = None,
688) -> Rdata:
689 """Build an rdata object from text format.
691 This function attempts to dynamically load a class which
692 implements the specified rdata class and type. If there is no
693 class-and-type-specific implementation, the GenericRdata class
694 is used.
696 Once a class is chosen, its from_text() class method is called
697 with the parameters to this function.
699 If *tok* is a ``str``, then a tokenizer is created and the string
700 is used as its input.
702 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
704 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
706 *tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``.
708 *origin*, a ``dns.name.Name`` (or ``None``), the
709 origin to use for relative names.
711 *relativize*, a ``bool``. If true, name will be relativized.
713 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
714 when relativizing names. If not set, the *origin* value will be used.
716 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
717 encoder/decoder to use if a tokenizer needs to be created. If
718 ``None``, the default IDNA 2003 encoder/decoder is used. If a
719 tokenizer is not created, then the codec associated with the tokenizer
720 is the one that is used.
722 Returns an instance of the chosen Rdata subclass.
724 """
725 if isinstance(tok, str):
726 tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec)
727 rdclass = dns.rdataclass.RdataClass.make(rdclass)
728 rdtype = dns.rdatatype.RdataType.make(rdtype)
729 cls = get_rdata_class(rdclass, rdtype)
730 with dns.exception.ExceptionWrapper(dns.exception.SyntaxError):
731 rdata = None
732 if cls != GenericRdata:
733 # peek at first token
734 token = tok.get()
735 tok.unget(token)
736 if token.is_identifier() and token.value == r"\#":
737 #
738 # Known type using the generic syntax. Extract the
739 # wire form from the generic syntax, and then run
740 # from_wire on it.
741 #
742 grdata = GenericRdata.from_text(
743 rdclass, rdtype, tok, origin, relativize, relativize_to
744 )
745 rdata = from_wire(
746 rdclass, rdtype, grdata.data, 0, len(grdata.data), origin
747 )
748 #
749 # If this comparison isn't equal, then there must have been
750 # compressed names in the wire format, which is an error,
751 # there being no reasonable context to decompress with.
752 #
753 rwire = rdata.to_wire()
754 if rwire != grdata.data:
755 raise dns.exception.SyntaxError(
756 "compressed data in "
757 "generic syntax form "
758 "of known rdatatype"
759 )
760 if rdata is None:
761 rdata = cls.from_text(
762 rdclass, rdtype, tok, origin, relativize, relativize_to
763 )
764 token = tok.get_eol_as_token()
765 if token.comment is not None:
766 object.__setattr__(rdata, "rdcomment", token.comment)
767 return rdata
770def from_wire_parser(
771 rdclass: Union[dns.rdataclass.RdataClass, str],
772 rdtype: Union[dns.rdatatype.RdataType, str],
773 parser: dns.wire.Parser,
774 origin: Optional[dns.name.Name] = None,
775) -> Rdata:
776 """Build an rdata object from wire format
778 This function attempts to dynamically load a class which
779 implements the specified rdata class and type. If there is no
780 class-and-type-specific implementation, the GenericRdata class
781 is used.
783 Once a class is chosen, its from_wire() class method is called
784 with the parameters to this function.
786 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
788 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
790 *parser*, a ``dns.wire.Parser``, the parser, which should be
791 restricted to the rdata length.
793 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
794 then names will be relativized to this origin.
796 Returns an instance of the chosen Rdata subclass.
797 """
799 rdclass = dns.rdataclass.RdataClass.make(rdclass)
800 rdtype = dns.rdatatype.RdataType.make(rdtype)
801 cls = get_rdata_class(rdclass, rdtype)
802 with dns.exception.ExceptionWrapper(dns.exception.FormError):
803 return cls.from_wire_parser(rdclass, rdtype, parser, origin)
806def from_wire(
807 rdclass: Union[dns.rdataclass.RdataClass, str],
808 rdtype: Union[dns.rdatatype.RdataType, str],
809 wire: bytes,
810 current: int,
811 rdlen: int,
812 origin: Optional[dns.name.Name] = None,
813) -> Rdata:
814 """Build an rdata object from wire format
816 This function attempts to dynamically load a class which
817 implements the specified rdata class and type. If there is no
818 class-and-type-specific implementation, the GenericRdata class
819 is used.
821 Once a class is chosen, its from_wire() class method is called
822 with the parameters to this function.
824 *rdclass*, an ``int``, the rdataclass.
826 *rdtype*, an ``int``, the rdatatype.
828 *wire*, a ``bytes``, the wire-format message.
830 *current*, an ``int``, the offset in wire of the beginning of
831 the rdata.
833 *rdlen*, an ``int``, the length of the wire-format rdata
835 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
836 then names will be relativized to this origin.
838 Returns an instance of the chosen Rdata subclass.
839 """
840 parser = dns.wire.Parser(wire, current)
841 with parser.restrict_to(rdlen):
842 return from_wire_parser(rdclass, rdtype, parser, origin)
845class RdatatypeExists(dns.exception.DNSException):
846 """DNS rdatatype already exists."""
848 supp_kwargs = {"rdclass", "rdtype"}
849 fmt = (
850 "The rdata type with class {rdclass:d} and rdtype {rdtype:d} "
851 + "already exists."
852 )
855def register_type(
856 implementation: Any,
857 rdtype: int,
858 rdtype_text: str,
859 is_singleton: bool = False,
860 rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
861) -> None:
862 """Dynamically register a module to handle an rdatatype.
864 *implementation*, a module implementing the type in the usual dnspython
865 way.
867 *rdtype*, an ``int``, the rdatatype to register.
869 *rdtype_text*, a ``str``, the textual form of the rdatatype.
871 *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e.
872 RRsets of the type can have only one member.)
874 *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if
875 it applies to all classes.
876 """
878 rdtype = dns.rdatatype.RdataType.make(rdtype)
879 existing_cls = get_rdata_class(rdclass, rdtype)
880 if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype):
881 raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
882 _rdata_classes[(rdclass, rdtype)] = getattr(
883 implementation, rdtype_text.replace("-", "_")
884 )
885 dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)