Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/rdata.py: 59%
379 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:09 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:09 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS rdata."""
20import base64
21import binascii
22import inspect
23import io
24import itertools
25import random
26from importlib import import_module
27from typing import Any, Dict, Optional, Tuple, Union
29import dns.exception
30import dns.immutable
31import dns.ipv4
32import dns.ipv6
33import dns.name
34import dns.rdataclass
35import dns.rdatatype
36import dns.tokenizer
37import dns.ttl
38import dns.wire
40_chunksize = 32
42# We currently allow comparisons for rdata with relative names for backwards
43# compatibility, but in the future we will not, as these kinds of comparisons
44# can lead to subtle bugs if code is not carefully written.
45#
46# This switch allows the future behavior to be turned on so code can be
47# tested with it.
48_allow_relative_comparisons = True
51class NoRelativeRdataOrdering(dns.exception.DNSException):
52 """An attempt was made to do an ordered comparison of one or more
53 rdata with relative names. The only reliable way of sorting rdata
54 is to use non-relativized rdata.
56 """
59def _wordbreak(data, chunksize=_chunksize, separator=b" "):
60 """Break a binary string into chunks of chunksize characters separated by
61 a space.
62 """
64 if not chunksize:
65 return data.decode()
66 return separator.join(
67 [data[i : i + chunksize] for i in range(0, len(data), chunksize)]
68 ).decode()
71# pylint: disable=unused-argument
74def _hexify(data, chunksize=_chunksize, separator=b" ", **kw):
75 """Convert a binary string into its hex encoding, broken up into chunks
76 of chunksize characters separated by a separator.
77 """
79 return _wordbreak(binascii.hexlify(data), chunksize, separator)
82def _base64ify(data, chunksize=_chunksize, separator=b" ", **kw):
83 """Convert a binary string into its base64 encoding, broken up into chunks
84 of chunksize characters separated by a separator.
85 """
87 return _wordbreak(base64.b64encode(data), chunksize, separator)
90# pylint: enable=unused-argument
92__escaped = b'"\\'
95def _escapify(qstring):
96 """Escape the characters in a quoted string which need it."""
98 if isinstance(qstring, str):
99 qstring = qstring.encode()
100 if not isinstance(qstring, bytearray):
101 qstring = bytearray(qstring)
103 text = ""
104 for c in qstring:
105 if c in __escaped:
106 text += "\\" + chr(c)
107 elif c >= 0x20 and c < 0x7F:
108 text += chr(c)
109 else:
110 text += "\\%03d" % c
111 return text
114def _truncate_bitmap(what):
115 """Determine the index of greatest byte that isn't all zeros, and
116 return the bitmap that contains all the bytes less than that index.
117 """
119 for i in range(len(what) - 1, -1, -1):
120 if what[i] != 0:
121 return what[0 : i + 1]
122 return what[0:1]
125# So we don't have to edit all the rdata classes...
126_constify = dns.immutable.constify
129@dns.immutable.immutable
130class Rdata:
131 """Base class for all DNS rdata types."""
133 __slots__ = ["rdclass", "rdtype", "rdcomment"]
135 def __init__(self, rdclass, rdtype):
136 """Initialize an rdata.
138 *rdclass*, an ``int`` is the rdataclass of the Rdata.
140 *rdtype*, an ``int`` is the rdatatype of the Rdata.
141 """
143 self.rdclass = self._as_rdataclass(rdclass)
144 self.rdtype = self._as_rdatatype(rdtype)
145 self.rdcomment = None
147 def _get_all_slots(self):
148 return itertools.chain.from_iterable(
149 getattr(cls, "__slots__", []) for cls in self.__class__.__mro__
150 )
152 def __getstate__(self):
153 # We used to try to do a tuple of all slots here, but it
154 # doesn't work as self._all_slots isn't available at
155 # __setstate__() time. Before that we tried to store a tuple
156 # of __slots__, but that didn't work as it didn't store the
157 # slots defined by ancestors. This older way didn't fail
158 # outright, but ended up with partially broken objects, e.g.
159 # if you unpickled an A RR it wouldn't have rdclass and rdtype
160 # attributes, and would compare badly.
161 state = {}
162 for slot in self._get_all_slots():
163 state[slot] = getattr(self, slot)
164 return state
166 def __setstate__(self, state):
167 for slot, val in state.items():
168 object.__setattr__(self, slot, val)
169 if not hasattr(self, "rdcomment"):
170 # Pickled rdata from 2.0.x might not have a rdcomment, so add
171 # it if needed.
172 object.__setattr__(self, "rdcomment", None)
174 def covers(self) -> dns.rdatatype.RdataType:
175 """Return the type a Rdata covers.
177 DNS SIG/RRSIG rdatas apply to a specific type; this type is
178 returned by the covers() function. If the rdata type is not
179 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when
180 creating rdatasets, allowing the rdataset to contain only RRSIGs
181 of a particular type, e.g. RRSIG(NS).
183 Returns a ``dns.rdatatype.RdataType``.
184 """
186 return dns.rdatatype.NONE
188 def extended_rdatatype(self) -> int:
189 """Return a 32-bit type value, the least significant 16 bits of
190 which are the ordinary DNS type, and the upper 16 bits of which are
191 the "covered" type, if any.
193 Returns an ``int``.
194 """
196 return self.covers() << 16 | self.rdtype
198 def to_text(
199 self,
200 origin: Optional[dns.name.Name] = None,
201 relativize: bool = True,
202 **kw: Dict[str, Any]
203 ) -> str:
204 """Convert an rdata to text format.
206 Returns a ``str``.
207 """
209 raise NotImplementedError # pragma: no cover
211 def _to_wire(
212 self,
213 file: Optional[Any],
214 compress: Optional[dns.name.CompressType] = None,
215 origin: Optional[dns.name.Name] = None,
216 canonicalize: bool = False,
217 ) -> bytes:
218 raise NotImplementedError # pragma: no cover
220 def to_wire(
221 self,
222 file: Optional[Any] = None,
223 compress: Optional[dns.name.CompressType] = None,
224 origin: Optional[dns.name.Name] = None,
225 canonicalize: bool = False,
226 ) -> bytes:
227 """Convert an rdata to wire format.
229 Returns a ``bytes`` or ``None``.
230 """
232 if file:
233 return self._to_wire(file, compress, origin, canonicalize)
234 else:
235 f = io.BytesIO()
236 self._to_wire(f, compress, origin, canonicalize)
237 return f.getvalue()
239 def to_generic(
240 self, origin: Optional[dns.name.Name] = None
241 ) -> "dns.rdata.GenericRdata":
242 """Creates a dns.rdata.GenericRdata equivalent of this rdata.
244 Returns a ``dns.rdata.GenericRdata``.
245 """
246 return dns.rdata.GenericRdata(
247 self.rdclass, self.rdtype, self.to_wire(origin=origin)
248 )
250 def to_digestable(self, origin: Optional[dns.name.Name] = None) -> bytes:
251 """Convert rdata to a format suitable for digesting in hashes. This
252 is also the DNSSEC canonical form.
254 Returns a ``bytes``.
255 """
257 return self.to_wire(origin=origin, canonicalize=True)
259 def __repr__(self):
260 covers = self.covers()
261 if covers == dns.rdatatype.NONE:
262 ctext = ""
263 else:
264 ctext = "(" + dns.rdatatype.to_text(covers) + ")"
265 return (
266 "<DNS "
267 + dns.rdataclass.to_text(self.rdclass)
268 + " "
269 + dns.rdatatype.to_text(self.rdtype)
270 + ctext
271 + " rdata: "
272 + str(self)
273 + ">"
274 )
276 def __str__(self):
277 return self.to_text()
279 def _cmp(self, other):
280 """Compare an rdata with another rdata of the same rdtype and
281 rdclass.
283 For rdata with only absolute names:
284 Return < 0 if self < other in the DNSSEC ordering, 0 if self
285 == other, and > 0 if self > other.
286 For rdata with at least one relative names:
287 The rdata sorts before any rdata with only absolute names.
288 When compared with another relative rdata, all names are
289 made absolute as if they were relative to the root, as the
290 proper origin is not available. While this creates a stable
291 ordering, it is NOT guaranteed to be the DNSSEC ordering.
292 In the future, all ordering comparisons for rdata with
293 relative names will be disallowed.
294 """
295 try:
296 our = self.to_digestable()
297 our_relative = False
298 except dns.name.NeedAbsoluteNameOrOrigin:
299 if _allow_relative_comparisons:
300 our = self.to_digestable(dns.name.root)
301 our_relative = True
302 try:
303 their = other.to_digestable()
304 their_relative = False
305 except dns.name.NeedAbsoluteNameOrOrigin:
306 if _allow_relative_comparisons:
307 their = other.to_digestable(dns.name.root)
308 their_relative = True
309 if _allow_relative_comparisons:
310 if our_relative != their_relative:
311 # For the purpose of comparison, all rdata with at least one
312 # relative name is less than an rdata with only absolute names.
313 if our_relative:
314 return -1
315 else:
316 return 1
317 elif our_relative or their_relative:
318 raise NoRelativeRdataOrdering
319 if our == their:
320 return 0
321 elif our > their:
322 return 1
323 else:
324 return -1
326 def __eq__(self, other):
327 if not isinstance(other, Rdata):
328 return False
329 if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
330 return False
331 our_relative = False
332 their_relative = False
333 try:
334 our = self.to_digestable()
335 except dns.name.NeedAbsoluteNameOrOrigin:
336 our = self.to_digestable(dns.name.root)
337 our_relative = True
338 try:
339 their = other.to_digestable()
340 except dns.name.NeedAbsoluteNameOrOrigin:
341 their = other.to_digestable(dns.name.root)
342 their_relative = True
343 if our_relative != their_relative:
344 return False
345 return our == their
347 def __ne__(self, other):
348 if not isinstance(other, Rdata):
349 return True
350 if self.rdclass != other.rdclass or self.rdtype != other.rdtype:
351 return True
352 return not self.__eq__(other)
354 def __lt__(self, other):
355 if (
356 not isinstance(other, Rdata)
357 or self.rdclass != other.rdclass
358 or self.rdtype != other.rdtype
359 ):
360 return NotImplemented
361 return self._cmp(other) < 0
363 def __le__(self, other):
364 if (
365 not isinstance(other, Rdata)
366 or self.rdclass != other.rdclass
367 or self.rdtype != other.rdtype
368 ):
369 return NotImplemented
370 return self._cmp(other) <= 0
372 def __ge__(self, other):
373 if (
374 not isinstance(other, Rdata)
375 or self.rdclass != other.rdclass
376 or self.rdtype != other.rdtype
377 ):
378 return NotImplemented
379 return self._cmp(other) >= 0
381 def __gt__(self, other):
382 if (
383 not isinstance(other, Rdata)
384 or self.rdclass != other.rdclass
385 or self.rdtype != other.rdtype
386 ):
387 return NotImplemented
388 return self._cmp(other) > 0
390 def __hash__(self):
391 return hash(self.to_digestable(dns.name.root))
393 @classmethod
394 def from_text(
395 cls,
396 rdclass: dns.rdataclass.RdataClass,
397 rdtype: dns.rdatatype.RdataType,
398 tok: dns.tokenizer.Tokenizer,
399 origin: Optional[dns.name.Name] = None,
400 relativize: bool = True,
401 relativize_to: Optional[dns.name.Name] = None,
402 ) -> "Rdata":
403 raise NotImplementedError # pragma: no cover
405 @classmethod
406 def from_wire_parser(
407 cls,
408 rdclass: dns.rdataclass.RdataClass,
409 rdtype: dns.rdatatype.RdataType,
410 parser: dns.wire.Parser,
411 origin: Optional[dns.name.Name] = None,
412 ) -> "Rdata":
413 raise NotImplementedError # pragma: no cover
415 def replace(self, **kwargs: Any) -> "Rdata":
416 """
417 Create a new Rdata instance based on the instance replace was
418 invoked on. It is possible to pass different parameters to
419 override the corresponding properties of the base Rdata.
421 Any field specific to the Rdata type can be replaced, but the
422 *rdtype* and *rdclass* fields cannot.
424 Returns an instance of the same Rdata subclass as *self*.
425 """
427 # Get the constructor parameters.
428 parameters = inspect.signature(self.__init__).parameters # type: ignore
430 # Ensure that all of the arguments correspond to valid fields.
431 # Don't allow rdclass or rdtype to be changed, though.
432 for key in kwargs:
433 if key == "rdcomment":
434 continue
435 if key not in parameters:
436 raise AttributeError(
437 "'{}' object has no attribute '{}'".format(
438 self.__class__.__name__, key
439 )
440 )
441 if key in ("rdclass", "rdtype"):
442 raise AttributeError(
443 "Cannot overwrite '{}' attribute '{}'".format(
444 self.__class__.__name__, key
445 )
446 )
448 # Construct the parameter list. For each field, use the value in
449 # kwargs if present, and the current value otherwise.
450 args = (kwargs.get(key, getattr(self, key)) for key in parameters)
452 # Create, validate, and return the new object.
453 rd = self.__class__(*args)
454 # The comment is not set in the constructor, so give it special
455 # handling.
456 rdcomment = kwargs.get("rdcomment", self.rdcomment)
457 if rdcomment is not None:
458 object.__setattr__(rd, "rdcomment", rdcomment)
459 return rd
461 # Type checking and conversion helpers. These are class methods as
462 # they don't touch object state and may be useful to others.
464 @classmethod
465 def _as_rdataclass(cls, value):
466 return dns.rdataclass.RdataClass.make(value)
468 @classmethod
469 def _as_rdatatype(cls, value):
470 return dns.rdatatype.RdataType.make(value)
472 @classmethod
473 def _as_bytes(
474 cls,
475 value: Any,
476 encode: bool = False,
477 max_length: Optional[int] = None,
478 empty_ok: bool = True,
479 ) -> bytes:
480 if encode and isinstance(value, str):
481 bvalue = value.encode()
482 elif isinstance(value, bytearray):
483 bvalue = bytes(value)
484 elif isinstance(value, bytes):
485 bvalue = value
486 else:
487 raise ValueError("not bytes")
488 if max_length is not None and len(bvalue) > max_length:
489 raise ValueError("too long")
490 if not empty_ok and len(bvalue) == 0:
491 raise ValueError("empty bytes not allowed")
492 return bvalue
494 @classmethod
495 def _as_name(cls, value):
496 # Note that proper name conversion (e.g. with origin and IDNA
497 # awareness) is expected to be done via from_text. This is just
498 # a simple thing for people invoking the constructor directly.
499 if isinstance(value, str):
500 return dns.name.from_text(value)
501 elif not isinstance(value, dns.name.Name):
502 raise ValueError("not a name")
503 return value
505 @classmethod
506 def _as_uint8(cls, value):
507 if not isinstance(value, int):
508 raise ValueError("not an integer")
509 if value < 0 or value > 255:
510 raise ValueError("not a uint8")
511 return value
513 @classmethod
514 def _as_uint16(cls, value):
515 if not isinstance(value, int):
516 raise ValueError("not an integer")
517 if value < 0 or value > 65535:
518 raise ValueError("not a uint16")
519 return value
521 @classmethod
522 def _as_uint32(cls, value):
523 if not isinstance(value, int):
524 raise ValueError("not an integer")
525 if value < 0 or value > 4294967295:
526 raise ValueError("not a uint32")
527 return value
529 @classmethod
530 def _as_uint48(cls, value):
531 if not isinstance(value, int):
532 raise ValueError("not an integer")
533 if value < 0 or value > 281474976710655:
534 raise ValueError("not a uint48")
535 return value
537 @classmethod
538 def _as_int(cls, value, low=None, high=None):
539 if not isinstance(value, int):
540 raise ValueError("not an integer")
541 if low is not None and value < low:
542 raise ValueError("value too small")
543 if high is not None and value > high:
544 raise ValueError("value too large")
545 return value
547 @classmethod
548 def _as_ipv4_address(cls, value):
549 if isinstance(value, str):
550 # call to check validity
551 dns.ipv4.inet_aton(value)
552 return value
553 elif isinstance(value, bytes):
554 return dns.ipv4.inet_ntoa(value)
555 else:
556 raise ValueError("not an IPv4 address")
558 @classmethod
559 def _as_ipv6_address(cls, value):
560 if isinstance(value, str):
561 # call to check validity
562 dns.ipv6.inet_aton(value)
563 return value
564 elif isinstance(value, bytes):
565 return dns.ipv6.inet_ntoa(value)
566 else:
567 raise ValueError("not an IPv6 address")
569 @classmethod
570 def _as_bool(cls, value):
571 if isinstance(value, bool):
572 return value
573 else:
574 raise ValueError("not a boolean")
576 @classmethod
577 def _as_ttl(cls, value):
578 if isinstance(value, int):
579 return cls._as_int(value, 0, dns.ttl.MAX_TTL)
580 elif isinstance(value, str):
581 return dns.ttl.from_text(value)
582 else:
583 raise ValueError("not a TTL")
585 @classmethod
586 def _as_tuple(cls, value, as_value):
587 try:
588 # For user convenience, if value is a singleton of the list
589 # element type, wrap it in a tuple.
590 return (as_value(value),)
591 except Exception:
592 # Otherwise, check each element of the iterable *value*
593 # against *as_value*.
594 return tuple(as_value(v) for v in value)
596 # Processing order
598 @classmethod
599 def _processing_order(cls, iterable):
600 items = list(iterable)
601 random.shuffle(items)
602 return items
605@dns.immutable.immutable
606class GenericRdata(Rdata):
608 """Generic Rdata Class
610 This class is used for rdata types for which we have no better
611 implementation. It implements the DNS "unknown RRs" scheme.
612 """
614 __slots__ = ["data"]
616 def __init__(self, rdclass, rdtype, data):
617 super().__init__(rdclass, rdtype)
618 self.data = data
620 def to_text(
621 self,
622 origin: Optional[dns.name.Name] = None,
623 relativize: bool = True,
624 **kw: Dict[str, Any]
625 ) -> str:
626 return r"\# %d " % len(self.data) + _hexify(self.data, **kw)
628 @classmethod
629 def from_text(
630 cls, rdclass, rdtype, tok, origin=None, relativize=True, relativize_to=None
631 ):
632 token = tok.get()
633 if not token.is_identifier() or token.value != r"\#":
634 raise dns.exception.SyntaxError(r"generic rdata does not start with \#")
635 length = tok.get_int()
636 hex = tok.concatenate_remaining_identifiers(True).encode()
637 data = binascii.unhexlify(hex)
638 if len(data) != length:
639 raise dns.exception.SyntaxError("generic rdata hex data has wrong length")
640 return cls(rdclass, rdtype, data)
642 def _to_wire(self, file, compress=None, origin=None, canonicalize=False):
643 file.write(self.data)
645 @classmethod
646 def from_wire_parser(cls, rdclass, rdtype, parser, origin=None):
647 return cls(rdclass, rdtype, parser.get_remaining())
650_rdata_classes: Dict[
651 Tuple[dns.rdataclass.RdataClass, dns.rdatatype.RdataType], Any
652] = {}
653_module_prefix = "dns.rdtypes"
656def get_rdata_class(rdclass, rdtype):
657 cls = _rdata_classes.get((rdclass, rdtype))
658 if not cls:
659 cls = _rdata_classes.get((dns.rdatatype.ANY, rdtype))
660 if not cls:
661 rdclass_text = dns.rdataclass.to_text(rdclass)
662 rdtype_text = dns.rdatatype.to_text(rdtype)
663 rdtype_text = rdtype_text.replace("-", "_")
664 try:
665 mod = import_module(
666 ".".join([_module_prefix, rdclass_text, rdtype_text])
667 )
668 cls = getattr(mod, rdtype_text)
669 _rdata_classes[(rdclass, rdtype)] = cls
670 except ImportError:
671 try:
672 mod = import_module(".".join([_module_prefix, "ANY", rdtype_text]))
673 cls = getattr(mod, rdtype_text)
674 _rdata_classes[(dns.rdataclass.ANY, rdtype)] = cls
675 _rdata_classes[(rdclass, rdtype)] = cls
676 except ImportError:
677 pass
678 if not cls:
679 cls = GenericRdata
680 _rdata_classes[(rdclass, rdtype)] = cls
681 return cls
684def from_text(
685 rdclass: Union[dns.rdataclass.RdataClass, str],
686 rdtype: Union[dns.rdatatype.RdataType, str],
687 tok: Union[dns.tokenizer.Tokenizer, str],
688 origin: Optional[dns.name.Name] = None,
689 relativize: bool = True,
690 relativize_to: Optional[dns.name.Name] = None,
691 idna_codec: Optional[dns.name.IDNACodec] = None,
692) -> Rdata:
693 """Build an rdata object from text format.
695 This function attempts to dynamically load a class which
696 implements the specified rdata class and type. If there is no
697 class-and-type-specific implementation, the GenericRdata class
698 is used.
700 Once a class is chosen, its from_text() class method is called
701 with the parameters to this function.
703 If *tok* is a ``str``, then a tokenizer is created and the string
704 is used as its input.
706 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
708 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
710 *tok*, a ``dns.tokenizer.Tokenizer`` or a ``str``.
712 *origin*, a ``dns.name.Name`` (or ``None``), the
713 origin to use for relative names.
715 *relativize*, a ``bool``. If true, name will be relativized.
717 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
718 when relativizing names. If not set, the *origin* value will be used.
720 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
721 encoder/decoder to use if a tokenizer needs to be created. If
722 ``None``, the default IDNA 2003 encoder/decoder is used. If a
723 tokenizer is not created, then the codec associated with the tokenizer
724 is the one that is used.
726 Returns an instance of the chosen Rdata subclass.
728 """
729 if isinstance(tok, str):
730 tok = dns.tokenizer.Tokenizer(tok, idna_codec=idna_codec)
731 rdclass = dns.rdataclass.RdataClass.make(rdclass)
732 rdtype = dns.rdatatype.RdataType.make(rdtype)
733 cls = get_rdata_class(rdclass, rdtype)
734 with dns.exception.ExceptionWrapper(dns.exception.SyntaxError):
735 rdata = None
736 if cls != GenericRdata:
737 # peek at first token
738 token = tok.get()
739 tok.unget(token)
740 if token.is_identifier() and token.value == r"\#":
741 #
742 # Known type using the generic syntax. Extract the
743 # wire form from the generic syntax, and then run
744 # from_wire on it.
745 #
746 grdata = GenericRdata.from_text(
747 rdclass, rdtype, tok, origin, relativize, relativize_to
748 )
749 rdata = from_wire(
750 rdclass, rdtype, grdata.data, 0, len(grdata.data), origin
751 )
752 #
753 # If this comparison isn't equal, then there must have been
754 # compressed names in the wire format, which is an error,
755 # there being no reasonable context to decompress with.
756 #
757 rwire = rdata.to_wire()
758 if rwire != grdata.data:
759 raise dns.exception.SyntaxError(
760 "compressed data in "
761 "generic syntax form "
762 "of known rdatatype"
763 )
764 if rdata is None:
765 rdata = cls.from_text(
766 rdclass, rdtype, tok, origin, relativize, relativize_to
767 )
768 token = tok.get_eol_as_token()
769 if token.comment is not None:
770 object.__setattr__(rdata, "rdcomment", token.comment)
771 return rdata
774def from_wire_parser(
775 rdclass: Union[dns.rdataclass.RdataClass, str],
776 rdtype: Union[dns.rdatatype.RdataType, str],
777 parser: dns.wire.Parser,
778 origin: Optional[dns.name.Name] = None,
779) -> Rdata:
780 """Build an rdata object from wire format
782 This function attempts to dynamically load a class which
783 implements the specified rdata class and type. If there is no
784 class-and-type-specific implementation, the GenericRdata class
785 is used.
787 Once a class is chosen, its from_wire() class method is called
788 with the parameters to this function.
790 *rdclass*, a ``dns.rdataclass.RdataClass`` or ``str``, the rdataclass.
792 *rdtype*, a ``dns.rdatatype.RdataType`` or ``str``, the rdatatype.
794 *parser*, a ``dns.wire.Parser``, the parser, which should be
795 restricted to the rdata length.
797 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
798 then names will be relativized to this origin.
800 Returns an instance of the chosen Rdata subclass.
801 """
803 rdclass = dns.rdataclass.RdataClass.make(rdclass)
804 rdtype = dns.rdatatype.RdataType.make(rdtype)
805 cls = get_rdata_class(rdclass, rdtype)
806 with dns.exception.ExceptionWrapper(dns.exception.FormError):
807 return cls.from_wire_parser(rdclass, rdtype, parser, origin)
810def from_wire(
811 rdclass: Union[dns.rdataclass.RdataClass, str],
812 rdtype: Union[dns.rdatatype.RdataType, str],
813 wire: bytes,
814 current: int,
815 rdlen: int,
816 origin: Optional[dns.name.Name] = None,
817) -> Rdata:
818 """Build an rdata object from wire format
820 This function attempts to dynamically load a class which
821 implements the specified rdata class and type. If there is no
822 class-and-type-specific implementation, the GenericRdata class
823 is used.
825 Once a class is chosen, its from_wire() class method is called
826 with the parameters to this function.
828 *rdclass*, an ``int``, the rdataclass.
830 *rdtype*, an ``int``, the rdatatype.
832 *wire*, a ``bytes``, the wire-format message.
834 *current*, an ``int``, the offset in wire of the beginning of
835 the rdata.
837 *rdlen*, an ``int``, the length of the wire-format rdata
839 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``,
840 then names will be relativized to this origin.
842 Returns an instance of the chosen Rdata subclass.
843 """
844 parser = dns.wire.Parser(wire, current)
845 with parser.restrict_to(rdlen):
846 return from_wire_parser(rdclass, rdtype, parser, origin)
849class RdatatypeExists(dns.exception.DNSException):
850 """DNS rdatatype already exists."""
852 supp_kwargs = {"rdclass", "rdtype"}
853 fmt = (
854 "The rdata type with class {rdclass:d} and rdtype {rdtype:d} "
855 + "already exists."
856 )
859def register_type(
860 implementation: Any,
861 rdtype: int,
862 rdtype_text: str,
863 is_singleton: bool = False,
864 rdclass: dns.rdataclass.RdataClass = dns.rdataclass.IN,
865) -> None:
866 """Dynamically register a module to handle an rdatatype.
868 *implementation*, a module implementing the type in the usual dnspython
869 way.
871 *rdtype*, an ``int``, the rdatatype to register.
873 *rdtype_text*, a ``str``, the textual form of the rdatatype.
875 *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e.
876 RRsets of the type can have only one member.)
878 *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if
879 it applies to all classes.
880 """
882 rdtype = dns.rdatatype.RdataType.make(rdtype)
883 existing_cls = get_rdata_class(rdclass, rdtype)
884 if existing_cls != GenericRdata or dns.rdatatype.is_metatype(rdtype):
885 raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype)
886 _rdata_classes[(rdclass, rdtype)] = getattr(
887 implementation, rdtype_text.replace("-", "_")
888 )
889 dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton)