Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/message.py: 47%
805 statements
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
« prev ^ index » next coverage.py v7.4.1, created at 2024-02-02 06:07 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS Messages"""
20import contextlib
21import io
22import time
23from typing import Any, Dict, List, Optional, Tuple, Union
25import dns.edns
26import dns.entropy
27import dns.enum
28import dns.exception
29import dns.flags
30import dns.name
31import dns.opcode
32import dns.rcode
33import dns.rdata
34import dns.rdataclass
35import dns.rdatatype
36import dns.rdtypes.ANY.OPT
37import dns.rdtypes.ANY.TSIG
38import dns.renderer
39import dns.rrset
40import dns.tsig
41import dns.ttl
42import dns.wire
45class ShortHeader(dns.exception.FormError):
46 """The DNS packet passed to from_wire() is too short."""
49class TrailingJunk(dns.exception.FormError):
50 """The DNS packet passed to from_wire() has extra junk at the end of it."""
53class UnknownHeaderField(dns.exception.DNSException):
54 """The header field name was not recognized when converting from text
55 into a message."""
58class BadEDNS(dns.exception.FormError):
59 """An OPT record occurred somewhere other than
60 the additional data section."""
63class BadTSIG(dns.exception.FormError):
64 """A TSIG record occurred somewhere other than the end of
65 the additional data section."""
68class UnknownTSIGKey(dns.exception.DNSException):
69 """A TSIG with an unknown key was received."""
72class Truncated(dns.exception.DNSException):
73 """The truncated flag is set."""
75 supp_kwargs = {"message"}
77 # We do this as otherwise mypy complains about unexpected keyword argument
78 # idna_exception
79 def __init__(self, *args, **kwargs):
80 super().__init__(*args, **kwargs)
82 def message(self):
83 """As much of the message as could be processed.
85 Returns a ``dns.message.Message``.
86 """
87 return self.kwargs["message"]
90class NotQueryResponse(dns.exception.DNSException):
91 """Message is not a response to a query."""
94class ChainTooLong(dns.exception.DNSException):
95 """The CNAME chain is too long."""
98class AnswerForNXDOMAIN(dns.exception.DNSException):
99 """The rcode is NXDOMAIN but an answer was found."""
102class NoPreviousName(dns.exception.SyntaxError):
103 """No previous name was known."""
106class MessageSection(dns.enum.IntEnum):
107 """Message sections"""
109 QUESTION = 0
110 ANSWER = 1
111 AUTHORITY = 2
112 ADDITIONAL = 3
114 @classmethod
115 def _maximum(cls):
116 return 3
119class MessageError:
120 def __init__(self, exception: Exception, offset: int):
121 self.exception = exception
122 self.offset = offset
125DEFAULT_EDNS_PAYLOAD = 1232
126MAX_CHAIN = 16
128IndexKeyType = Tuple[
129 int,
130 dns.name.Name,
131 dns.rdataclass.RdataClass,
132 dns.rdatatype.RdataType,
133 Optional[dns.rdatatype.RdataType],
134 Optional[dns.rdataclass.RdataClass],
135]
136IndexType = Dict[IndexKeyType, dns.rrset.RRset]
137SectionType = Union[int, str, List[dns.rrset.RRset]]
140class Message:
141 """A DNS message."""
143 _section_enum = MessageSection
145 def __init__(self, id: Optional[int] = None):
146 if id is None:
147 self.id = dns.entropy.random_16()
148 else:
149 self.id = id
150 self.flags = 0
151 self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []]
152 self.opt: Optional[dns.rrset.RRset] = None
153 self.request_payload = 0
154 self.pad = 0
155 self.keyring: Any = None
156 self.tsig: Optional[dns.rrset.RRset] = None
157 self.request_mac = b""
158 self.xfr = False
159 self.origin: Optional[dns.name.Name] = None
160 self.tsig_ctx: Optional[Any] = None
161 self.index: IndexType = {}
162 self.errors: List[MessageError] = []
163 self.time = 0.0
165 @property
166 def question(self) -> List[dns.rrset.RRset]:
167 """The question section."""
168 return self.sections[0]
170 @question.setter
171 def question(self, v):
172 self.sections[0] = v
174 @property
175 def answer(self) -> List[dns.rrset.RRset]:
176 """The answer section."""
177 return self.sections[1]
179 @answer.setter
180 def answer(self, v):
181 self.sections[1] = v
183 @property
184 def authority(self) -> List[dns.rrset.RRset]:
185 """The authority section."""
186 return self.sections[2]
188 @authority.setter
189 def authority(self, v):
190 self.sections[2] = v
192 @property
193 def additional(self) -> List[dns.rrset.RRset]:
194 """The additional data section."""
195 return self.sections[3]
197 @additional.setter
198 def additional(self, v):
199 self.sections[3] = v
201 def __repr__(self):
202 return "<DNS message, ID " + repr(self.id) + ">"
204 def __str__(self):
205 return self.to_text()
207 def to_text(
208 self,
209 origin: Optional[dns.name.Name] = None,
210 relativize: bool = True,
211 **kw: Dict[str, Any],
212 ) -> str:
213 """Convert the message to text.
215 The *origin*, *relativize*, and any other keyword
216 arguments are passed to the RRset ``to_wire()`` method.
218 Returns a ``str``.
219 """
221 s = io.StringIO()
222 s.write("id %d\n" % self.id)
223 s.write("opcode %s\n" % dns.opcode.to_text(self.opcode()))
224 s.write("rcode %s\n" % dns.rcode.to_text(self.rcode()))
225 s.write("flags %s\n" % dns.flags.to_text(self.flags))
226 if self.edns >= 0:
227 s.write("edns %s\n" % self.edns)
228 if self.ednsflags != 0:
229 s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags))
230 s.write("payload %d\n" % self.payload)
231 for opt in self.options:
232 s.write("option %s\n" % opt.to_text())
233 for name, which in self._section_enum.__members__.items():
234 s.write(f";{name}\n")
235 for rrset in self.section_from_number(which):
236 s.write(rrset.to_text(origin, relativize, **kw))
237 s.write("\n")
238 #
239 # We strip off the final \n so the caller can print the result without
240 # doing weird things to get around eccentricities in Python print
241 # formatting
242 #
243 return s.getvalue()[:-1]
245 def __eq__(self, other):
246 """Two messages are equal if they have the same content in the
247 header, question, answer, and authority sections.
249 Returns a ``bool``.
250 """
252 if not isinstance(other, Message):
253 return False
254 if self.id != other.id:
255 return False
256 if self.flags != other.flags:
257 return False
258 for i, section in enumerate(self.sections):
259 other_section = other.sections[i]
260 for n in section:
261 if n not in other_section:
262 return False
263 for n in other_section:
264 if n not in section:
265 return False
266 return True
268 def __ne__(self, other):
269 return not self.__eq__(other)
271 def is_response(self, other: "Message") -> bool:
272 """Is *other*, also a ``dns.message.Message``, a response to this
273 message?
275 Returns a ``bool``.
276 """
278 if (
279 other.flags & dns.flags.QR == 0
280 or self.id != other.id
281 or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags)
282 ):
283 return False
284 if other.rcode() in {
285 dns.rcode.FORMERR,
286 dns.rcode.SERVFAIL,
287 dns.rcode.NOTIMP,
288 dns.rcode.REFUSED,
289 }:
290 # We don't check the question section in these cases if
291 # the other question section is empty, even though they
292 # still really ought to have a question section.
293 if len(other.question) == 0:
294 return True
295 if dns.opcode.is_update(self.flags):
296 # This is assuming the "sender doesn't include anything
297 # from the update", but we don't care to check the other
298 # case, which is that all the sections are returned and
299 # identical.
300 return True
301 for n in self.question:
302 if n not in other.question:
303 return False
304 for n in other.question:
305 if n not in self.question:
306 return False
307 return True
309 def section_number(self, section: List[dns.rrset.RRset]) -> int:
310 """Return the "section number" of the specified section for use
311 in indexing.
313 *section* is one of the section attributes of this message.
315 Raises ``ValueError`` if the section isn't known.
317 Returns an ``int``.
318 """
320 for i, our_section in enumerate(self.sections):
321 if section is our_section:
322 return self._section_enum(i)
323 raise ValueError("unknown section")
325 def section_from_number(self, number: int) -> List[dns.rrset.RRset]:
326 """Return the section list associated with the specified section
327 number.
329 *number* is a section number `int` or the text form of a section
330 name.
332 Raises ``ValueError`` if the section isn't known.
334 Returns a ``list``.
335 """
337 section = self._section_enum.make(number)
338 return self.sections[section]
340 def find_rrset(
341 self,
342 section: SectionType,
343 name: dns.name.Name,
344 rdclass: dns.rdataclass.RdataClass,
345 rdtype: dns.rdatatype.RdataType,
346 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
347 deleting: Optional[dns.rdataclass.RdataClass] = None,
348 create: bool = False,
349 force_unique: bool = False,
350 idna_codec: Optional[dns.name.IDNACodec] = None,
351 ) -> dns.rrset.RRset:
352 """Find the RRset with the given attributes in the specified section.
354 *section*, an ``int`` section number, a ``str`` section name, or one of
355 the section attributes of this message. This specifies the
356 the section of the message to search. For example::
358 my_message.find_rrset(my_message.answer, name, rdclass, rdtype)
359 my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype)
360 my_message.find_rrset("ANSWER", name, rdclass, rdtype)
362 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
364 *rdclass*, an ``int`` or ``str``, the class of the RRset.
366 *rdtype*, an ``int`` or ``str``, the type of the RRset.
368 *covers*, an ``int`` or ``str``, the covers value of the RRset.
369 The default is ``dns.rdatatype.NONE``.
371 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
372 RRset. The default is ``None``.
374 *create*, a ``bool``. If ``True``, create the RRset if it is not found.
375 The created RRset is appended to *section*.
377 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``,
378 create a new RRset regardless of whether a matching RRset exists
379 already. The default is ``False``. This is useful when creating
380 DDNS Update messages, as order matters for them.
382 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
383 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
384 is used.
386 Raises ``KeyError`` if the RRset was not found and create was
387 ``False``.
389 Returns a ``dns.rrset.RRset object``.
390 """
392 if isinstance(section, int):
393 section_number = section
394 section = self.section_from_number(section_number)
395 elif isinstance(section, str):
396 section_number = self._section_enum.from_text(section)
397 section = self.section_from_number(section_number)
398 else:
399 section_number = self.section_number(section)
400 if isinstance(name, str):
401 name = dns.name.from_text(name, idna_codec=idna_codec)
402 rdtype = dns.rdatatype.RdataType.make(rdtype)
403 rdclass = dns.rdataclass.RdataClass.make(rdclass)
404 covers = dns.rdatatype.RdataType.make(covers)
405 if deleting is not None:
406 deleting = dns.rdataclass.RdataClass.make(deleting)
407 key = (section_number, name, rdclass, rdtype, covers, deleting)
408 if not force_unique:
409 if self.index is not None:
410 rrset = self.index.get(key)
411 if rrset is not None:
412 return rrset
413 else:
414 for rrset in section:
415 if rrset.full_match(name, rdclass, rdtype, covers, deleting):
416 return rrset
417 if not create:
418 raise KeyError
419 rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting)
420 section.append(rrset)
421 if self.index is not None:
422 self.index[key] = rrset
423 return rrset
425 def get_rrset(
426 self,
427 section: SectionType,
428 name: dns.name.Name,
429 rdclass: dns.rdataclass.RdataClass,
430 rdtype: dns.rdatatype.RdataType,
431 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
432 deleting: Optional[dns.rdataclass.RdataClass] = None,
433 create: bool = False,
434 force_unique: bool = False,
435 idna_codec: Optional[dns.name.IDNACodec] = None,
436 ) -> Optional[dns.rrset.RRset]:
437 """Get the RRset with the given attributes in the specified section.
439 If the RRset is not found, None is returned.
441 *section*, an ``int`` section number, a ``str`` section name, or one of
442 the section attributes of this message. This specifies the
443 the section of the message to search. For example::
445 my_message.get_rrset(my_message.answer, name, rdclass, rdtype)
446 my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype)
447 my_message.get_rrset("ANSWER", name, rdclass, rdtype)
449 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
451 *rdclass*, an ``int`` or ``str``, the class of the RRset.
453 *rdtype*, an ``int`` or ``str``, the type of the RRset.
455 *covers*, an ``int`` or ``str``, the covers value of the RRset.
456 The default is ``dns.rdatatype.NONE``.
458 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
459 RRset. The default is ``None``.
461 *create*, a ``bool``. If ``True``, create the RRset if it is not found.
462 The created RRset is appended to *section*.
464 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``,
465 create a new RRset regardless of whether a matching RRset exists
466 already. The default is ``False``. This is useful when creating
467 DDNS Update messages, as order matters for them.
469 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
470 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
471 is used.
473 Returns a ``dns.rrset.RRset object`` or ``None``.
474 """
476 try:
477 rrset = self.find_rrset(
478 section,
479 name,
480 rdclass,
481 rdtype,
482 covers,
483 deleting,
484 create,
485 force_unique,
486 idna_codec,
487 )
488 except KeyError:
489 rrset = None
490 return rrset
492 def section_count(self, section: SectionType) -> int:
493 """Returns the number of records in the specified section.
495 *section*, an ``int`` section number, a ``str`` section name, or one of
496 the section attributes of this message. This specifies the
497 the section of the message to count. For example::
499 my_message.section_count(my_message.answer)
500 my_message.section_count(dns.message.ANSWER)
501 my_message.section_count("ANSWER")
502 """
504 if isinstance(section, int):
505 section_number = section
506 section = self.section_from_number(section_number)
507 elif isinstance(section, str):
508 section_number = self._section_enum.from_text(section)
509 section = self.section_from_number(section_number)
510 else:
511 section_number = self.section_number(section)
512 count = sum(max(1, len(rrs)) for rrs in section)
513 if section_number == MessageSection.ADDITIONAL:
514 if self.opt is not None:
515 count += 1
516 if self.tsig is not None:
517 count += 1
518 return count
520 def _compute_opt_reserve(self) -> int:
521 """Compute the size required for the OPT RR, padding excluded"""
522 if not self.opt:
523 return 0
524 # 1 byte for the root name, 10 for the standard RR fields
525 size = 11
526 # This would be more efficient if options had a size() method, but we won't
527 # worry about that for now. We also don't worry if there is an existing padding
528 # option, as it is unlikely and probably harmless, as the worst case is that we
529 # may add another, and this seems to be legal.
530 for option in self.opt[0].options:
531 wire = option.to_wire()
532 # We add 4 here to account for the option type and length
533 size += len(wire) + 4
534 if self.pad:
535 # Padding will be added, so again add the option type and length.
536 size += 4
537 return size
539 def _compute_tsig_reserve(self) -> int:
540 """Compute the size required for the TSIG RR"""
541 # This would be more efficient if TSIGs had a size method, but we won't
542 # worry about for now. Also, we can't really cope with the potential
543 # compressibility of the TSIG owner name, so we estimate with the uncompressed
544 # size. We will disable compression when TSIG and padding are both is active
545 # so that the padding comes out right.
546 if not self.tsig:
547 return 0
548 f = io.BytesIO()
549 self.tsig.to_wire(f)
550 return len(f.getvalue())
552 def to_wire(
553 self,
554 origin: Optional[dns.name.Name] = None,
555 max_size: int = 0,
556 multi: bool = False,
557 tsig_ctx: Optional[Any] = None,
558 prepend_length: bool = False,
559 prefer_truncation: bool = False,
560 **kw: Dict[str, Any],
561 ) -> bytes:
562 """Return a string containing the message in DNS compressed wire
563 format.
565 Additional keyword arguments are passed to the RRset ``to_wire()``
566 method.
568 *origin*, a ``dns.name.Name`` or ``None``, the origin to be appended
569 to any relative names. If ``None``, and the message has an origin
570 attribute that is not ``None``, then it will be used.
572 *max_size*, an ``int``, the maximum size of the wire format
573 output; default is 0, which means "the message's request
574 payload, if nonzero, or 65535".
576 *multi*, a ``bool``, should be set to ``True`` if this message is
577 part of a multiple message sequence.
579 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the
580 ongoing TSIG context, used when signing zone transfers.
582 *prepend_length*, a ``bool``, should be set to ``True`` if the caller
583 wants the message length prepended to the message itself. This is
584 useful for messages sent over TCP, TLS (DoT), or QUIC (DoQ).
586 *prefer_truncation*, a ``bool``, should be set to ``True`` if the caller
587 wants the message to be truncated if it would otherwise exceed the
588 maximum length. If the truncation occurs before the additional section,
589 the TC bit will be set.
591 Raises ``dns.exception.TooBig`` if *max_size* was exceeded.
593 Returns a ``bytes``.
594 """
596 if origin is None and self.origin is not None:
597 origin = self.origin
598 if max_size == 0:
599 if self.request_payload != 0:
600 max_size = self.request_payload
601 else:
602 max_size = 65535
603 if max_size < 512:
604 max_size = 512
605 elif max_size > 65535:
606 max_size = 65535
607 r = dns.renderer.Renderer(self.id, self.flags, max_size, origin)
608 opt_reserve = self._compute_opt_reserve()
609 r.reserve(opt_reserve)
610 tsig_reserve = self._compute_tsig_reserve()
611 r.reserve(tsig_reserve)
612 try:
613 for rrset in self.question:
614 r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
615 for rrset in self.answer:
616 r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
617 for rrset in self.authority:
618 r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
619 for rrset in self.additional:
620 r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
621 except dns.exception.TooBig:
622 if prefer_truncation:
623 if r.section < dns.renderer.ADDITIONAL:
624 r.flags |= dns.flags.TC
625 else:
626 raise
627 r.release_reserved()
628 if self.opt is not None:
629 r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve)
630 r.write_header()
631 if self.tsig is not None:
632 (new_tsig, ctx) = dns.tsig.sign(
633 r.get_wire(),
634 self.keyring,
635 self.tsig[0],
636 int(time.time()),
637 self.request_mac,
638 tsig_ctx,
639 multi,
640 )
641 self.tsig.clear()
642 self.tsig.add(new_tsig)
643 r.add_rrset(dns.renderer.ADDITIONAL, self.tsig)
644 r.write_header()
645 if multi:
646 self.tsig_ctx = ctx
647 wire = r.get_wire()
648 if prepend_length:
649 wire = len(wire).to_bytes(2, "big") + wire
650 return wire
652 @staticmethod
653 def _make_tsig(
654 keyname, algorithm, time_signed, fudge, mac, original_id, error, other
655 ):
656 tsig = dns.rdtypes.ANY.TSIG.TSIG(
657 dns.rdataclass.ANY,
658 dns.rdatatype.TSIG,
659 algorithm,
660 time_signed,
661 fudge,
662 mac,
663 original_id,
664 error,
665 other,
666 )
667 return dns.rrset.from_rdata(keyname, 0, tsig)
669 def use_tsig(
670 self,
671 keyring: Any,
672 keyname: Optional[Union[dns.name.Name, str]] = None,
673 fudge: int = 300,
674 original_id: Optional[int] = None,
675 tsig_error: int = 0,
676 other_data: bytes = b"",
677 algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
678 ) -> None:
679 """When sending, a TSIG signature using the specified key
680 should be added.
682 *key*, a ``dns.tsig.Key`` is the key to use. If a key is specified,
683 the *keyring* and *algorithm* fields are not used.
685 *keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either
686 the TSIG keyring or key to use.
688 The format of a keyring dict is a mapping from TSIG key name, as
689 ``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``.
690 If a ``dict`` *keyring* is specified but a *keyname* is not, the key
691 used will be the first key in the *keyring*. Note that the order of
692 keys in a dictionary is not defined, so applications should supply a
693 keyname when a ``dict`` keyring is used, unless they know the keyring
694 contains only one key. If a ``callable`` keyring is specified, the
695 callable will be called with the message and the keyname, and is
696 expected to return a key.
698 *keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of
699 this TSIG key to use; defaults to ``None``. If *keyring* is a
700 ``dict``, the key must be defined in it. If *keyring* is a
701 ``dns.tsig.Key``, this is ignored.
703 *fudge*, an ``int``, the TSIG time fudge.
705 *original_id*, an ``int``, the TSIG original id. If ``None``,
706 the message's id is used.
708 *tsig_error*, an ``int``, the TSIG error code.
710 *other_data*, a ``bytes``, the TSIG other data.
712 *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is
713 only used if *keyring* is a ``dict``, and the key entry is a ``bytes``.
714 """
716 if isinstance(keyring, dns.tsig.Key):
717 key = keyring
718 keyname = key.name
719 elif callable(keyring):
720 key = keyring(self, keyname)
721 else:
722 if isinstance(keyname, str):
723 keyname = dns.name.from_text(keyname)
724 if keyname is None:
725 keyname = next(iter(keyring))
726 key = keyring[keyname]
727 if isinstance(key, bytes):
728 key = dns.tsig.Key(keyname, key, algorithm)
729 self.keyring = key
730 if original_id is None:
731 original_id = self.id
732 self.tsig = self._make_tsig(
733 keyname,
734 self.keyring.algorithm,
735 0,
736 fudge,
737 b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm],
738 original_id,
739 tsig_error,
740 other_data,
741 )
743 @property
744 def keyname(self) -> Optional[dns.name.Name]:
745 if self.tsig:
746 return self.tsig.name
747 else:
748 return None
750 @property
751 def keyalgorithm(self) -> Optional[dns.name.Name]:
752 if self.tsig:
753 return self.tsig[0].algorithm
754 else:
755 return None
757 @property
758 def mac(self) -> Optional[bytes]:
759 if self.tsig:
760 return self.tsig[0].mac
761 else:
762 return None
764 @property
765 def tsig_error(self) -> Optional[int]:
766 if self.tsig:
767 return self.tsig[0].error
768 else:
769 return None
771 @property
772 def had_tsig(self) -> bool:
773 return bool(self.tsig)
775 @staticmethod
776 def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None):
777 opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ())
778 return dns.rrset.from_rdata(dns.name.root, int(flags), opt)
780 def use_edns(
781 self,
782 edns: Optional[Union[int, bool]] = 0,
783 ednsflags: int = 0,
784 payload: int = DEFAULT_EDNS_PAYLOAD,
785 request_payload: Optional[int] = None,
786 options: Optional[List[dns.edns.Option]] = None,
787 pad: int = 0,
788 ) -> None:
789 """Configure EDNS behavior.
791 *edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``,
792 or ``-1`` means "do not use EDNS", and in this case the other parameters are
793 ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0".
795 *ednsflags*, an ``int``, the EDNS flag values.
797 *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum
798 size of UDP datagram the sender can handle. I.e. how big a response to this
799 message can be.
801 *request_payload*, an ``int``, is the EDNS payload size to use when sending this
802 message. If not specified, defaults to the value of *payload*.
804 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options.
806 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
807 padding bytes to make the message size a multiple of *pad*. Note that if
808 padding is non-zero, an EDNS PADDING option will always be added to the
809 message.
810 """
812 if edns is None or edns is False:
813 edns = -1
814 elif edns is True:
815 edns = 0
816 if edns < 0:
817 self.opt = None
818 self.request_payload = 0
819 else:
820 # make sure the EDNS version in ednsflags agrees with edns
821 ednsflags &= 0xFF00FFFF
822 ednsflags |= edns << 16
823 if options is None:
824 options = []
825 self.opt = self._make_opt(ednsflags, payload, options)
826 if request_payload is None:
827 request_payload = payload
828 self.request_payload = request_payload
829 if pad < 0:
830 raise ValueError("pad must be non-negative")
831 self.pad = pad
833 @property
834 def edns(self) -> int:
835 if self.opt:
836 return (self.ednsflags & 0xFF0000) >> 16
837 else:
838 return -1
840 @property
841 def ednsflags(self) -> int:
842 if self.opt:
843 return self.opt.ttl
844 else:
845 return 0
847 @ednsflags.setter
848 def ednsflags(self, v):
849 if self.opt:
850 self.opt.ttl = v
851 elif v:
852 self.opt = self._make_opt(v)
854 @property
855 def payload(self) -> int:
856 if self.opt:
857 return self.opt[0].payload
858 else:
859 return 0
861 @property
862 def options(self) -> Tuple:
863 if self.opt:
864 return self.opt[0].options
865 else:
866 return ()
868 def want_dnssec(self, wanted: bool = True) -> None:
869 """Enable or disable 'DNSSEC desired' flag in requests.
871 *wanted*, a ``bool``. If ``True``, then DNSSEC data is
872 desired in the response, EDNS is enabled if required, and then
873 the DO bit is set. If ``False``, the DO bit is cleared if
874 EDNS is enabled.
875 """
877 if wanted:
878 self.ednsflags |= dns.flags.DO
879 elif self.opt:
880 self.ednsflags &= ~int(dns.flags.DO)
882 def rcode(self) -> dns.rcode.Rcode:
883 """Return the rcode.
885 Returns a ``dns.rcode.Rcode``.
886 """
887 return dns.rcode.from_flags(int(self.flags), int(self.ednsflags))
889 def set_rcode(self, rcode: dns.rcode.Rcode) -> None:
890 """Set the rcode.
892 *rcode*, a ``dns.rcode.Rcode``, is the rcode to set.
893 """
894 (value, evalue) = dns.rcode.to_flags(rcode)
895 self.flags &= 0xFFF0
896 self.flags |= value
897 self.ednsflags &= 0x00FFFFFF
898 self.ednsflags |= evalue
900 def opcode(self) -> dns.opcode.Opcode:
901 """Return the opcode.
903 Returns a ``dns.opcode.Opcode``.
904 """
905 return dns.opcode.from_flags(int(self.flags))
907 def set_opcode(self, opcode: dns.opcode.Opcode) -> None:
908 """Set the opcode.
910 *opcode*, a ``dns.opcode.Opcode``, is the opcode to set.
911 """
912 self.flags &= 0x87FF
913 self.flags |= dns.opcode.to_flags(opcode)
915 def _get_one_rr_per_rrset(self, value):
916 # What the caller picked is fine.
917 return value
919 # pylint: disable=unused-argument
921 def _parse_rr_header(self, section, name, rdclass, rdtype):
922 return (rdclass, rdtype, None, False)
924 # pylint: enable=unused-argument
926 def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype):
927 if rdtype == dns.rdatatype.OPT:
928 if (
929 section != MessageSection.ADDITIONAL
930 or self.opt
931 or name != dns.name.root
932 ):
933 raise BadEDNS
934 elif rdtype == dns.rdatatype.TSIG:
935 if (
936 section != MessageSection.ADDITIONAL
937 or rdclass != dns.rdatatype.ANY
938 or position != count - 1
939 ):
940 raise BadTSIG
941 return (rdclass, rdtype, None, False)
944class ChainingResult:
945 """The result of a call to dns.message.QueryMessage.resolve_chaining().
947 The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't
948 exist.
950 The ``canonical_name`` attribute is the canonical name after all
951 chaining has been applied (this is the same name as ``rrset.name`` in cases
952 where rrset is not ``None``).
954 The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to
955 use if caching the data. It is the smallest of all the CNAME TTLs
956 and either the answer TTL if it exists or the SOA TTL and SOA
957 minimum values for negative answers.
959 The ``cnames`` attribute is a list of all the CNAME RRSets followed to
960 get to the canonical name.
961 """
963 def __init__(
964 self,
965 canonical_name: dns.name.Name,
966 answer: Optional[dns.rrset.RRset],
967 minimum_ttl: int,
968 cnames: List[dns.rrset.RRset],
969 ):
970 self.canonical_name = canonical_name
971 self.answer = answer
972 self.minimum_ttl = minimum_ttl
973 self.cnames = cnames
976class QueryMessage(Message):
977 def resolve_chaining(self) -> ChainingResult:
978 """Follow the CNAME chain in the response to determine the answer
979 RRset.
981 Raises ``dns.message.NotQueryResponse`` if the message is not
982 a response.
984 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
986 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
987 but an answer was found.
989 Raises ``dns.exception.FormError`` if the question count is not 1.
991 Returns a ChainingResult object.
992 """
993 if self.flags & dns.flags.QR == 0:
994 raise NotQueryResponse
995 if len(self.question) != 1:
996 raise dns.exception.FormError
997 question = self.question[0]
998 qname = question.name
999 min_ttl = dns.ttl.MAX_TTL
1000 answer = None
1001 count = 0
1002 cnames = []
1003 while count < MAX_CHAIN:
1004 try:
1005 answer = self.find_rrset(
1006 self.answer, qname, question.rdclass, question.rdtype
1007 )
1008 min_ttl = min(min_ttl, answer.ttl)
1009 break
1010 except KeyError:
1011 if question.rdtype != dns.rdatatype.CNAME:
1012 try:
1013 crrset = self.find_rrset(
1014 self.answer, qname, question.rdclass, dns.rdatatype.CNAME
1015 )
1016 cnames.append(crrset)
1017 min_ttl = min(min_ttl, crrset.ttl)
1018 for rd in crrset:
1019 qname = rd.target
1020 break
1021 count += 1
1022 continue
1023 except KeyError:
1024 # Exit the chaining loop
1025 break
1026 else:
1027 # Exit the chaining loop
1028 break
1029 if count >= MAX_CHAIN:
1030 raise ChainTooLong
1031 if self.rcode() == dns.rcode.NXDOMAIN and answer is not None:
1032 raise AnswerForNXDOMAIN
1033 if answer is None:
1034 # Further minimize the TTL with NCACHE.
1035 auname = qname
1036 while True:
1037 # Look for an SOA RR whose owner name is a superdomain
1038 # of qname.
1039 try:
1040 srrset = self.find_rrset(
1041 self.authority, auname, question.rdclass, dns.rdatatype.SOA
1042 )
1043 min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum)
1044 break
1045 except KeyError:
1046 try:
1047 auname = auname.parent()
1048 except dns.name.NoParent:
1049 break
1050 return ChainingResult(qname, answer, min_ttl, cnames)
1052 def canonical_name(self) -> dns.name.Name:
1053 """Return the canonical name of the first name in the question
1054 section.
1056 Raises ``dns.message.NotQueryResponse`` if the message is not
1057 a response.
1059 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
1061 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
1062 but an answer was found.
1064 Raises ``dns.exception.FormError`` if the question count is not 1.
1065 """
1066 return self.resolve_chaining().canonical_name
1069def _maybe_import_update():
1070 # We avoid circular imports by doing this here. We do it in another
1071 # function as doing it in _message_factory_from_opcode() makes "dns"
1072 # a local symbol, and the first line fails :)
1074 # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import
1075 import dns.update # noqa: F401
1078def _message_factory_from_opcode(opcode):
1079 if opcode == dns.opcode.QUERY:
1080 return QueryMessage
1081 elif opcode == dns.opcode.UPDATE:
1082 _maybe_import_update()
1083 return dns.update.UpdateMessage
1084 else:
1085 return Message
1088class _WireReader:
1090 """Wire format reader.
1092 parser: the binary parser
1093 message: The message object being built
1094 initialize_message: Callback to set message parsing options
1095 question_only: Are we only reading the question?
1096 one_rr_per_rrset: Put each RR into its own RRset?
1097 keyring: TSIG keyring
1098 ignore_trailing: Ignore trailing junk at end of request?
1099 multi: Is this message part of a multi-message sequence?
1100 DNS dynamic updates.
1101 continue_on_error: try to extract as much information as possible from
1102 the message, accumulating MessageErrors in the *errors* attribute instead of
1103 raising them.
1104 """
1106 def __init__(
1107 self,
1108 wire,
1109 initialize_message,
1110 question_only=False,
1111 one_rr_per_rrset=False,
1112 ignore_trailing=False,
1113 keyring=None,
1114 multi=False,
1115 continue_on_error=False,
1116 ):
1117 self.parser = dns.wire.Parser(wire)
1118 self.message = None
1119 self.initialize_message = initialize_message
1120 self.question_only = question_only
1121 self.one_rr_per_rrset = one_rr_per_rrset
1122 self.ignore_trailing = ignore_trailing
1123 self.keyring = keyring
1124 self.multi = multi
1125 self.continue_on_error = continue_on_error
1126 self.errors = []
1128 def _get_question(self, section_number, qcount):
1129 """Read the next *qcount* records from the wire data and add them to
1130 the question section.
1131 """
1132 assert self.message is not None
1133 section = self.message.sections[section_number]
1134 for _ in range(qcount):
1135 qname = self.parser.get_name(self.message.origin)
1136 (rdtype, rdclass) = self.parser.get_struct("!HH")
1137 (rdclass, rdtype, _, _) = self.message._parse_rr_header(
1138 section_number, qname, rdclass, rdtype
1139 )
1140 self.message.find_rrset(
1141 section, qname, rdclass, rdtype, create=True, force_unique=True
1142 )
1144 def _add_error(self, e):
1145 self.errors.append(MessageError(e, self.parser.current))
1147 def _get_section(self, section_number, count):
1148 """Read the next I{count} records from the wire data and add them to
1149 the specified section.
1151 section_number: the section of the message to which to add records
1152 count: the number of records to read
1153 """
1154 assert self.message is not None
1155 section = self.message.sections[section_number]
1156 force_unique = self.one_rr_per_rrset
1157 for i in range(count):
1158 rr_start = self.parser.current
1159 absolute_name = self.parser.get_name()
1160 if self.message.origin is not None:
1161 name = absolute_name.relativize(self.message.origin)
1162 else:
1163 name = absolute_name
1164 (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH")
1165 if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG):
1166 (
1167 rdclass,
1168 rdtype,
1169 deleting,
1170 empty,
1171 ) = self.message._parse_special_rr_header(
1172 section_number, count, i, name, rdclass, rdtype
1173 )
1174 else:
1175 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
1176 section_number, name, rdclass, rdtype
1177 )
1178 rdata_start = self.parser.current
1179 try:
1180 if empty:
1181 if rdlen > 0:
1182 raise dns.exception.FormError
1183 rd = None
1184 covers = dns.rdatatype.NONE
1185 else:
1186 with self.parser.restrict_to(rdlen):
1187 rd = dns.rdata.from_wire_parser(
1188 rdclass, rdtype, self.parser, self.message.origin
1189 )
1190 covers = rd.covers()
1191 if self.message.xfr and rdtype == dns.rdatatype.SOA:
1192 force_unique = True
1193 if rdtype == dns.rdatatype.OPT:
1194 self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
1195 elif rdtype == dns.rdatatype.TSIG:
1196 if self.keyring is None:
1197 raise UnknownTSIGKey("got signed message without keyring")
1198 if isinstance(self.keyring, dict):
1199 key = self.keyring.get(absolute_name)
1200 if isinstance(key, bytes):
1201 key = dns.tsig.Key(absolute_name, key, rd.algorithm)
1202 elif callable(self.keyring):
1203 key = self.keyring(self.message, absolute_name)
1204 else:
1205 key = self.keyring
1206 if key is None:
1207 raise UnknownTSIGKey("key '%s' unknown" % name)
1208 self.message.keyring = key
1209 self.message.tsig_ctx = dns.tsig.validate(
1210 self.parser.wire,
1211 key,
1212 absolute_name,
1213 rd,
1214 int(time.time()),
1215 self.message.request_mac,
1216 rr_start,
1217 self.message.tsig_ctx,
1218 self.multi,
1219 )
1220 self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd)
1221 else:
1222 rrset = self.message.find_rrset(
1223 section,
1224 name,
1225 rdclass,
1226 rdtype,
1227 covers,
1228 deleting,
1229 True,
1230 force_unique,
1231 )
1232 if rd is not None:
1233 if ttl > 0x7FFFFFFF:
1234 ttl = 0
1235 rrset.add(rd, ttl)
1236 except Exception as e:
1237 if self.continue_on_error:
1238 self._add_error(e)
1239 self.parser.seek(rdata_start + rdlen)
1240 else:
1241 raise
1243 def read(self):
1244 """Read a wire format DNS message and build a dns.message.Message
1245 object."""
1247 if self.parser.remaining() < 12:
1248 raise ShortHeader
1249 (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct(
1250 "!HHHHHH"
1251 )
1252 factory = _message_factory_from_opcode(dns.opcode.from_flags(flags))
1253 self.message = factory(id=id)
1254 self.message.flags = dns.flags.Flag(flags)
1255 self.initialize_message(self.message)
1256 self.one_rr_per_rrset = self.message._get_one_rr_per_rrset(
1257 self.one_rr_per_rrset
1258 )
1259 try:
1260 self._get_question(MessageSection.QUESTION, qcount)
1261 if self.question_only:
1262 return self.message
1263 self._get_section(MessageSection.ANSWER, ancount)
1264 self._get_section(MessageSection.AUTHORITY, aucount)
1265 self._get_section(MessageSection.ADDITIONAL, adcount)
1266 if not self.ignore_trailing and self.parser.remaining() != 0:
1267 raise TrailingJunk
1268 if self.multi and self.message.tsig_ctx and not self.message.had_tsig:
1269 self.message.tsig_ctx.update(self.parser.wire)
1270 except Exception as e:
1271 if self.continue_on_error:
1272 self._add_error(e)
1273 else:
1274 raise
1275 return self.message
1278def from_wire(
1279 wire: bytes,
1280 keyring: Optional[Any] = None,
1281 request_mac: Optional[bytes] = b"",
1282 xfr: bool = False,
1283 origin: Optional[dns.name.Name] = None,
1284 tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None,
1285 multi: bool = False,
1286 question_only: bool = False,
1287 one_rr_per_rrset: bool = False,
1288 ignore_trailing: bool = False,
1289 raise_on_truncation: bool = False,
1290 continue_on_error: bool = False,
1291) -> Message:
1292 """Convert a DNS wire format message into a message object.
1294 *keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message
1295 is signed.
1297 *request_mac*, a ``bytes`` or ``None``. If the message is a response to a
1298 TSIG-signed request, *request_mac* should be set to the MAC of that request.
1300 *xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone
1301 transfer.
1303 *origin*, a ``dns.name.Name`` or ``None``. If the message is part of a zone
1304 transfer, *origin* should be the origin name of the zone. If not ``None``, names
1305 will be relativized to the origin.
1307 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG
1308 context, used when validating zone transfers.
1310 *multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple
1311 message sequence.
1313 *question_only*, a ``bool``. If ``True``, read only up to the end of the question
1314 section.
1316 *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset.
1318 *ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the
1319 message.
1321 *raise_on_truncation*, a ``bool``. If ``True``, raise an exception if the TC bit is
1322 set.
1324 *continue_on_error*, a ``bool``. If ``True``, try to continue parsing even if
1325 errors occur. Erroneous rdata will be ignored. Errors will be accumulated as a
1326 list of MessageError objects in the message's ``errors`` attribute. This option is
1327 recommended only for DNS analysis tools, or for use in a server as part of an error
1328 handling path. The default is ``False``.
1330 Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long.
1332 Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end
1333 of the proper DNS message, and *ignore_trailing* is ``False``.
1335 Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or
1336 occurred more than once.
1338 Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the
1339 additional data section.
1341 Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is
1342 ``True``.
1344 Returns a ``dns.message.Message``.
1345 """
1347 # We permit None for request_mac solely for backwards compatibility
1348 if request_mac is None:
1349 request_mac = b""
1351 def initialize_message(message):
1352 message.request_mac = request_mac
1353 message.xfr = xfr
1354 message.origin = origin
1355 message.tsig_ctx = tsig_ctx
1357 reader = _WireReader(
1358 wire,
1359 initialize_message,
1360 question_only,
1361 one_rr_per_rrset,
1362 ignore_trailing,
1363 keyring,
1364 multi,
1365 continue_on_error,
1366 )
1367 try:
1368 m = reader.read()
1369 except dns.exception.FormError:
1370 if (
1371 reader.message
1372 and (reader.message.flags & dns.flags.TC)
1373 and raise_on_truncation
1374 ):
1375 raise Truncated(message=reader.message)
1376 else:
1377 raise
1378 # Reading a truncated message might not have any errors, so we
1379 # have to do this check here too.
1380 if m.flags & dns.flags.TC and raise_on_truncation:
1381 raise Truncated(message=m)
1382 if continue_on_error:
1383 m.errors = reader.errors
1385 return m
1388class _TextReader:
1390 """Text format reader.
1392 tok: the tokenizer.
1393 message: The message object being built.
1394 DNS dynamic updates.
1395 last_name: The most recently read name when building a message object.
1396 one_rr_per_rrset: Put each RR into its own RRset?
1397 origin: The origin for relative names
1398 relativize: relativize names?
1399 relativize_to: the origin to relativize to.
1400 """
1402 def __init__(
1403 self,
1404 text,
1405 idna_codec,
1406 one_rr_per_rrset=False,
1407 origin=None,
1408 relativize=True,
1409 relativize_to=None,
1410 ):
1411 self.message = None
1412 self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec)
1413 self.last_name = None
1414 self.one_rr_per_rrset = one_rr_per_rrset
1415 self.origin = origin
1416 self.relativize = relativize
1417 self.relativize_to = relativize_to
1418 self.id = None
1419 self.edns = -1
1420 self.ednsflags = 0
1421 self.payload = DEFAULT_EDNS_PAYLOAD
1422 self.rcode = None
1423 self.opcode = dns.opcode.QUERY
1424 self.flags = 0
1426 def _header_line(self, _):
1427 """Process one line from the text format header section."""
1429 token = self.tok.get()
1430 what = token.value
1431 if what == "id":
1432 self.id = self.tok.get_int()
1433 elif what == "flags":
1434 while True:
1435 token = self.tok.get()
1436 if not token.is_identifier():
1437 self.tok.unget(token)
1438 break
1439 self.flags = self.flags | dns.flags.from_text(token.value)
1440 elif what == "edns":
1441 self.edns = self.tok.get_int()
1442 self.ednsflags = self.ednsflags | (self.edns << 16)
1443 elif what == "eflags":
1444 if self.edns < 0:
1445 self.edns = 0
1446 while True:
1447 token = self.tok.get()
1448 if not token.is_identifier():
1449 self.tok.unget(token)
1450 break
1451 self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value)
1452 elif what == "payload":
1453 self.payload = self.tok.get_int()
1454 if self.edns < 0:
1455 self.edns = 0
1456 elif what == "opcode":
1457 text = self.tok.get_string()
1458 self.opcode = dns.opcode.from_text(text)
1459 self.flags = self.flags | dns.opcode.to_flags(self.opcode)
1460 elif what == "rcode":
1461 text = self.tok.get_string()
1462 self.rcode = dns.rcode.from_text(text)
1463 else:
1464 raise UnknownHeaderField
1465 self.tok.get_eol()
1467 def _question_line(self, section_number):
1468 """Process one line from the text format question section."""
1470 section = self.message.sections[section_number]
1471 token = self.tok.get(want_leading=True)
1472 if not token.is_whitespace():
1473 self.last_name = self.tok.as_name(
1474 token, self.message.origin, self.relativize, self.relativize_to
1475 )
1476 name = self.last_name
1477 if name is None:
1478 raise NoPreviousName
1479 token = self.tok.get()
1480 if not token.is_identifier():
1481 raise dns.exception.SyntaxError
1482 # Class
1483 try:
1484 rdclass = dns.rdataclass.from_text(token.value)
1485 token = self.tok.get()
1486 if not token.is_identifier():
1487 raise dns.exception.SyntaxError
1488 except dns.exception.SyntaxError:
1489 raise dns.exception.SyntaxError
1490 except Exception:
1491 rdclass = dns.rdataclass.IN
1492 # Type
1493 rdtype = dns.rdatatype.from_text(token.value)
1494 (rdclass, rdtype, _, _) = self.message._parse_rr_header(
1495 section_number, name, rdclass, rdtype
1496 )
1497 self.message.find_rrset(
1498 section, name, rdclass, rdtype, create=True, force_unique=True
1499 )
1500 self.tok.get_eol()
1502 def _rr_line(self, section_number):
1503 """Process one line from the text format answer, authority, or
1504 additional data sections.
1505 """
1507 section = self.message.sections[section_number]
1508 # Name
1509 token = self.tok.get(want_leading=True)
1510 if not token.is_whitespace():
1511 self.last_name = self.tok.as_name(
1512 token, self.message.origin, self.relativize, self.relativize_to
1513 )
1514 name = self.last_name
1515 if name is None:
1516 raise NoPreviousName
1517 token = self.tok.get()
1518 if not token.is_identifier():
1519 raise dns.exception.SyntaxError
1520 # TTL
1521 try:
1522 ttl = int(token.value, 0)
1523 token = self.tok.get()
1524 if not token.is_identifier():
1525 raise dns.exception.SyntaxError
1526 except dns.exception.SyntaxError:
1527 raise dns.exception.SyntaxError
1528 except Exception:
1529 ttl = 0
1530 # Class
1531 try:
1532 rdclass = dns.rdataclass.from_text(token.value)
1533 token = self.tok.get()
1534 if not token.is_identifier():
1535 raise dns.exception.SyntaxError
1536 except dns.exception.SyntaxError:
1537 raise dns.exception.SyntaxError
1538 except Exception:
1539 rdclass = dns.rdataclass.IN
1540 # Type
1541 rdtype = dns.rdatatype.from_text(token.value)
1542 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
1543 section_number, name, rdclass, rdtype
1544 )
1545 token = self.tok.get()
1546 if empty and not token.is_eol_or_eof():
1547 raise dns.exception.SyntaxError
1548 if not empty and token.is_eol_or_eof():
1549 raise dns.exception.UnexpectedEnd
1550 if not token.is_eol_or_eof():
1551 self.tok.unget(token)
1552 rd = dns.rdata.from_text(
1553 rdclass,
1554 rdtype,
1555 self.tok,
1556 self.message.origin,
1557 self.relativize,
1558 self.relativize_to,
1559 )
1560 covers = rd.covers()
1561 else:
1562 rd = None
1563 covers = dns.rdatatype.NONE
1564 rrset = self.message.find_rrset(
1565 section,
1566 name,
1567 rdclass,
1568 rdtype,
1569 covers,
1570 deleting,
1571 True,
1572 self.one_rr_per_rrset,
1573 )
1574 if rd is not None:
1575 rrset.add(rd, ttl)
1577 def _make_message(self):
1578 factory = _message_factory_from_opcode(self.opcode)
1579 message = factory(id=self.id)
1580 message.flags = self.flags
1581 if self.edns >= 0:
1582 message.use_edns(self.edns, self.ednsflags, self.payload)
1583 if self.rcode:
1584 message.set_rcode(self.rcode)
1585 if self.origin:
1586 message.origin = self.origin
1587 return message
1589 def read(self):
1590 """Read a text format DNS message and build a dns.message.Message
1591 object."""
1593 line_method = self._header_line
1594 section_number = None
1595 while 1:
1596 token = self.tok.get(True, True)
1597 if token.is_eol_or_eof():
1598 break
1599 if token.is_comment():
1600 u = token.value.upper()
1601 if u == "HEADER":
1602 line_method = self._header_line
1604 if self.message:
1605 message = self.message
1606 else:
1607 # If we don't have a message, create one with the current
1608 # opcode, so that we know which section names to parse.
1609 message = self._make_message()
1610 try:
1611 section_number = message._section_enum.from_text(u)
1612 # We found a section name. If we don't have a message,
1613 # use the one we just created.
1614 if not self.message:
1615 self.message = message
1616 self.one_rr_per_rrset = message._get_one_rr_per_rrset(
1617 self.one_rr_per_rrset
1618 )
1619 if section_number == MessageSection.QUESTION:
1620 line_method = self._question_line
1621 else:
1622 line_method = self._rr_line
1623 except Exception:
1624 # It's just a comment.
1625 pass
1626 self.tok.get_eol()
1627 continue
1628 self.tok.unget(token)
1629 line_method(section_number)
1630 if not self.message:
1631 self.message = self._make_message()
1632 return self.message
1635def from_text(
1636 text: str,
1637 idna_codec: Optional[dns.name.IDNACodec] = None,
1638 one_rr_per_rrset: bool = False,
1639 origin: Optional[dns.name.Name] = None,
1640 relativize: bool = True,
1641 relativize_to: Optional[dns.name.Name] = None,
1642) -> Message:
1643 """Convert the text format message into a message object.
1645 The reader stops after reading the first blank line in the input to
1646 facilitate reading multiple messages from a single file with
1647 ``dns.message.from_file()``.
1649 *text*, a ``str``, the text format message.
1651 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1652 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1653 is used.
1655 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put
1656 into its own rrset. The default is ``False``.
1658 *origin*, a ``dns.name.Name`` (or ``None``), the
1659 origin to use for relative names.
1661 *relativize*, a ``bool``. If true, name will be relativized.
1663 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
1664 when relativizing names. If not set, the *origin* value will be used.
1666 Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
1668 Raises ``dns.exception.SyntaxError`` if the text is badly formed.
1670 Returns a ``dns.message.Message object``
1671 """
1673 # 'text' can also be a file, but we don't publish that fact
1674 # since it's an implementation detail. The official file
1675 # interface is from_file().
1677 reader = _TextReader(
1678 text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to
1679 )
1680 return reader.read()
1683def from_file(
1684 f: Any,
1685 idna_codec: Optional[dns.name.IDNACodec] = None,
1686 one_rr_per_rrset: bool = False,
1687) -> Message:
1688 """Read the next text format message from the specified file.
1690 Message blocks are separated by a single blank line.
1692 *f*, a ``file`` or ``str``. If *f* is text, it is treated as the
1693 pathname of a file to open.
1695 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1696 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1697 is used.
1699 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put
1700 into its own rrset. The default is ``False``.
1702 Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
1704 Raises ``dns.exception.SyntaxError`` if the text is badly formed.
1706 Returns a ``dns.message.Message object``
1707 """
1709 if isinstance(f, str):
1710 cm: contextlib.AbstractContextManager = open(f)
1711 else:
1712 cm = contextlib.nullcontext(f)
1713 with cm as f:
1714 return from_text(f, idna_codec, one_rr_per_rrset)
1715 assert False # for mypy lgtm[py/unreachable-statement]
1718def make_query(
1719 qname: Union[dns.name.Name, str],
1720 rdtype: Union[dns.rdatatype.RdataType, str],
1721 rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
1722 use_edns: Optional[Union[int, bool]] = None,
1723 want_dnssec: bool = False,
1724 ednsflags: Optional[int] = None,
1725 payload: Optional[int] = None,
1726 request_payload: Optional[int] = None,
1727 options: Optional[List[dns.edns.Option]] = None,
1728 idna_codec: Optional[dns.name.IDNACodec] = None,
1729 id: Optional[int] = None,
1730 flags: int = dns.flags.RD,
1731 pad: int = 0,
1732) -> QueryMessage:
1733 """Make a query message.
1735 The query name, type, and class may all be specified either
1736 as objects of the appropriate type, or as strings.
1738 The query will have a randomly chosen query id, and its DNS flags
1739 will be set to dns.flags.RD.
1741 qname, a ``dns.name.Name`` or ``str``, the query name.
1743 *rdtype*, an ``int`` or ``str``, the desired rdata type.
1745 *rdclass*, an ``int`` or ``str``, the desired rdata class; the default
1746 is class IN.
1748 *use_edns*, an ``int``, ``bool`` or ``None``. The EDNS level to use; the
1749 default is ``None``. If ``None``, EDNS will be enabled only if other
1750 parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are
1751 set.
1752 See the description of dns.message.Message.use_edns() for the possible
1753 values for use_edns and their meanings.
1755 *want_dnssec*, a ``bool``. If ``True``, DNSSEC data is desired.
1757 *ednsflags*, an ``int``, the EDNS flag values.
1759 *payload*, an ``int``, is the EDNS sender's payload field, which is the
1760 maximum size of UDP datagram the sender can handle. I.e. how big
1761 a response to this message can be.
1763 *request_payload*, an ``int``, is the EDNS payload size to use when
1764 sending this message. If not specified, defaults to the value of
1765 *payload*.
1767 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS
1768 options.
1770 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1771 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1772 is used.
1774 *id*, an ``int`` or ``None``, the desired query id. The default is
1775 ``None``, which generates a random query id.
1777 *flags*, an ``int``, the desired query flags. The default is
1778 ``dns.flags.RD``.
1780 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
1781 padding bytes to make the message size a multiple of *pad*. Note that if
1782 padding is non-zero, an EDNS PADDING option will always be added to the
1783 message.
1785 Returns a ``dns.message.QueryMessage``
1786 """
1788 if isinstance(qname, str):
1789 qname = dns.name.from_text(qname, idna_codec=idna_codec)
1790 rdtype = dns.rdatatype.RdataType.make(rdtype)
1791 rdclass = dns.rdataclass.RdataClass.make(rdclass)
1792 m = QueryMessage(id=id)
1793 m.flags = dns.flags.Flag(flags)
1794 m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True)
1795 # only pass keywords on to use_edns if they have been set to a
1796 # non-None value. Setting a field will turn EDNS on if it hasn't
1797 # been configured.
1798 kwargs: Dict[str, Any] = {}
1799 if ednsflags is not None:
1800 kwargs["ednsflags"] = ednsflags
1801 if payload is not None:
1802 kwargs["payload"] = payload
1803 if request_payload is not None:
1804 kwargs["request_payload"] = request_payload
1805 if options is not None:
1806 kwargs["options"] = options
1807 if kwargs and use_edns is None:
1808 use_edns = 0
1809 kwargs["edns"] = use_edns
1810 kwargs["pad"] = pad
1811 m.use_edns(**kwargs)
1812 m.want_dnssec(want_dnssec)
1813 return m
1816def make_response(
1817 query: Message,
1818 recursion_available: bool = False,
1819 our_payload: int = 8192,
1820 fudge: int = 300,
1821 tsig_error: int = 0,
1822 pad: Optional[int] = None,
1823) -> Message:
1824 """Make a message which is a response for the specified query.
1825 The message returned is really a response skeleton; it has all of the infrastructure
1826 required of a response, but none of the content.
1828 The response's question section is a shallow copy of the query's question section,
1829 so the query's question RRsets should not be changed.
1831 *query*, a ``dns.message.Message``, the query to respond to.
1833 *recursion_available*, a ``bool``, should RA be set in the response?
1835 *our_payload*, an ``int``, the payload size to advertise in EDNS responses.
1837 *fudge*, an ``int``, the TSIG time fudge.
1839 *tsig_error*, an ``int``, the TSIG error.
1841 *pad*, a non-negative ``int`` or ``None``. If 0, the default, do not pad; otherwise
1842 if not ``None`` add padding bytes to make the message size a multiple of *pad*.
1843 Note that if padding is non-zero, an EDNS PADDING option will always be added to the
1844 message. If ``None``, add padding following RFC 8467, namely if the request is
1845 padded, pad the response to 468 otherwise do not pad.
1847 Returns a ``dns.message.Message`` object whose specific class is appropriate for the
1848 query. For example, if query is a ``dns.update.UpdateMessage``, response will be
1849 too.
1850 """
1852 if query.flags & dns.flags.QR:
1853 raise dns.exception.FormError("specified query message is not a query")
1854 factory = _message_factory_from_opcode(query.opcode())
1855 response = factory(id=query.id)
1856 response.flags = dns.flags.QR | (query.flags & dns.flags.RD)
1857 if recursion_available:
1858 response.flags |= dns.flags.RA
1859 response.set_opcode(query.opcode())
1860 response.question = list(query.question)
1861 if query.edns >= 0:
1862 if pad is None:
1863 # Set response padding per RFC 8467
1864 pad = 0
1865 for option in query.options:
1866 if option.otype == dns.edns.OptionType.PADDING:
1867 pad = 468
1868 response.use_edns(0, 0, our_payload, query.payload, pad=pad)
1869 if query.had_tsig:
1870 response.use_tsig(
1871 query.keyring,
1872 query.keyname,
1873 fudge,
1874 None,
1875 tsig_error,
1876 b"",
1877 query.keyalgorithm,
1878 )
1879 response.request_mac = query.mac
1880 return response
1883### BEGIN generated MessageSection constants
1885QUESTION = MessageSection.QUESTION
1886ANSWER = MessageSection.ANSWER
1887AUTHORITY = MessageSection.AUTHORITY
1888ADDITIONAL = MessageSection.ADDITIONAL
1890### END generated MessageSection constants