Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/message.py: 48%
774 statements
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:09 +0000
« prev ^ index » next coverage.py v7.3.1, created at 2023-09-25 07:09 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS Messages"""
20import contextlib
21import io
22import time
23from typing import Any, Dict, List, Optional, Tuple, Union
25import dns.edns
26import dns.entropy
27import dns.enum
28import dns.exception
29import dns.flags
30import dns.name
31import dns.opcode
32import dns.rcode
33import dns.rdata
34import dns.rdataclass
35import dns.rdatatype
36import dns.rdtypes.ANY.OPT
37import dns.rdtypes.ANY.TSIG
38import dns.renderer
39import dns.rrset
40import dns.tsig
41import dns.ttl
42import dns.wire
45class ShortHeader(dns.exception.FormError):
46 """The DNS packet passed to from_wire() is too short."""
49class TrailingJunk(dns.exception.FormError):
50 """The DNS packet passed to from_wire() has extra junk at the end of it."""
53class UnknownHeaderField(dns.exception.DNSException):
54 """The header field name was not recognized when converting from text
55 into a message."""
58class BadEDNS(dns.exception.FormError):
59 """An OPT record occurred somewhere other than
60 the additional data section."""
63class BadTSIG(dns.exception.FormError):
64 """A TSIG record occurred somewhere other than the end of
65 the additional data section."""
68class UnknownTSIGKey(dns.exception.DNSException):
69 """A TSIG with an unknown key was received."""
72class Truncated(dns.exception.DNSException):
73 """The truncated flag is set."""
75 supp_kwargs = {"message"}
77 # We do this as otherwise mypy complains about unexpected keyword argument
78 # idna_exception
79 def __init__(self, *args, **kwargs):
80 super().__init__(*args, **kwargs)
82 def message(self):
83 """As much of the message as could be processed.
85 Returns a ``dns.message.Message``.
86 """
87 return self.kwargs["message"]
90class NotQueryResponse(dns.exception.DNSException):
91 """Message is not a response to a query."""
94class ChainTooLong(dns.exception.DNSException):
95 """The CNAME chain is too long."""
98class AnswerForNXDOMAIN(dns.exception.DNSException):
99 """The rcode is NXDOMAIN but an answer was found."""
102class NoPreviousName(dns.exception.SyntaxError):
103 """No previous name was known."""
106class MessageSection(dns.enum.IntEnum):
107 """Message sections"""
109 QUESTION = 0
110 ANSWER = 1
111 AUTHORITY = 2
112 ADDITIONAL = 3
114 @classmethod
115 def _maximum(cls):
116 return 3
119class MessageError:
120 def __init__(self, exception: Exception, offset: int):
121 self.exception = exception
122 self.offset = offset
125DEFAULT_EDNS_PAYLOAD = 1232
126MAX_CHAIN = 16
128IndexKeyType = Tuple[
129 int,
130 dns.name.Name,
131 dns.rdataclass.RdataClass,
132 dns.rdatatype.RdataType,
133 Optional[dns.rdatatype.RdataType],
134 Optional[dns.rdataclass.RdataClass],
135]
136IndexType = Dict[IndexKeyType, dns.rrset.RRset]
137SectionType = Union[int, str, List[dns.rrset.RRset]]
140class Message:
141 """A DNS message."""
143 _section_enum = MessageSection
145 def __init__(self, id: Optional[int] = None):
146 if id is None:
147 self.id = dns.entropy.random_16()
148 else:
149 self.id = id
150 self.flags = 0
151 self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []]
152 self.opt: Optional[dns.rrset.RRset] = None
153 self.request_payload = 0
154 self.pad = 0
155 self.keyring: Any = None
156 self.tsig: Optional[dns.rrset.RRset] = None
157 self.request_mac = b""
158 self.xfr = False
159 self.origin: Optional[dns.name.Name] = None
160 self.tsig_ctx: Optional[Any] = None
161 self.index: IndexType = {}
162 self.errors: List[MessageError] = []
163 self.time = 0.0
165 @property
166 def question(self) -> List[dns.rrset.RRset]:
167 """The question section."""
168 return self.sections[0]
170 @question.setter
171 def question(self, v):
172 self.sections[0] = v
174 @property
175 def answer(self) -> List[dns.rrset.RRset]:
176 """The answer section."""
177 return self.sections[1]
179 @answer.setter
180 def answer(self, v):
181 self.sections[1] = v
183 @property
184 def authority(self) -> List[dns.rrset.RRset]:
185 """The authority section."""
186 return self.sections[2]
188 @authority.setter
189 def authority(self, v):
190 self.sections[2] = v
192 @property
193 def additional(self) -> List[dns.rrset.RRset]:
194 """The additional data section."""
195 return self.sections[3]
197 @additional.setter
198 def additional(self, v):
199 self.sections[3] = v
201 def __repr__(self):
202 return "<DNS message, ID " + repr(self.id) + ">"
204 def __str__(self):
205 return self.to_text()
207 def to_text(
208 self,
209 origin: Optional[dns.name.Name] = None,
210 relativize: bool = True,
211 **kw: Dict[str, Any],
212 ) -> str:
213 """Convert the message to text.
215 The *origin*, *relativize*, and any other keyword
216 arguments are passed to the RRset ``to_wire()`` method.
218 Returns a ``str``.
219 """
221 s = io.StringIO()
222 s.write("id %d\n" % self.id)
223 s.write("opcode %s\n" % dns.opcode.to_text(self.opcode()))
224 s.write("rcode %s\n" % dns.rcode.to_text(self.rcode()))
225 s.write("flags %s\n" % dns.flags.to_text(self.flags))
226 if self.edns >= 0:
227 s.write("edns %s\n" % self.edns)
228 if self.ednsflags != 0:
229 s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags))
230 s.write("payload %d\n" % self.payload)
231 for opt in self.options:
232 s.write("option %s\n" % opt.to_text())
233 for name, which in self._section_enum.__members__.items():
234 s.write(f";{name}\n")
235 for rrset in self.section_from_number(which):
236 s.write(rrset.to_text(origin, relativize, **kw))
237 s.write("\n")
238 #
239 # We strip off the final \n so the caller can print the result without
240 # doing weird things to get around eccentricities in Python print
241 # formatting
242 #
243 return s.getvalue()[:-1]
245 def __eq__(self, other):
246 """Two messages are equal if they have the same content in the
247 header, question, answer, and authority sections.
249 Returns a ``bool``.
250 """
252 if not isinstance(other, Message):
253 return False
254 if self.id != other.id:
255 return False
256 if self.flags != other.flags:
257 return False
258 for i, section in enumerate(self.sections):
259 other_section = other.sections[i]
260 for n in section:
261 if n not in other_section:
262 return False
263 for n in other_section:
264 if n not in section:
265 return False
266 return True
268 def __ne__(self, other):
269 return not self.__eq__(other)
271 def is_response(self, other: "Message") -> bool:
272 """Is *other*, also a ``dns.message.Message``, a response to this
273 message?
275 Returns a ``bool``.
276 """
278 if (
279 other.flags & dns.flags.QR == 0
280 or self.id != other.id
281 or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags)
282 ):
283 return False
284 if other.rcode() in {
285 dns.rcode.FORMERR,
286 dns.rcode.SERVFAIL,
287 dns.rcode.NOTIMP,
288 dns.rcode.REFUSED,
289 }:
290 # We don't check the question section in these cases if
291 # the other question section is empty, even though they
292 # still really ought to have a question section.
293 if len(other.question) == 0:
294 return True
295 if dns.opcode.is_update(self.flags):
296 # This is assuming the "sender doesn't include anything
297 # from the update", but we don't care to check the other
298 # case, which is that all the sections are returned and
299 # identical.
300 return True
301 for n in self.question:
302 if n not in other.question:
303 return False
304 for n in other.question:
305 if n not in self.question:
306 return False
307 return True
309 def section_number(self, section: List[dns.rrset.RRset]) -> int:
310 """Return the "section number" of the specified section for use
311 in indexing.
313 *section* is one of the section attributes of this message.
315 Raises ``ValueError`` if the section isn't known.
317 Returns an ``int``.
318 """
320 for i, our_section in enumerate(self.sections):
321 if section is our_section:
322 return self._section_enum(i)
323 raise ValueError("unknown section")
325 def section_from_number(self, number: int) -> List[dns.rrset.RRset]:
326 """Return the section list associated with the specified section
327 number.
329 *number* is a section number `int` or the text form of a section
330 name.
332 Raises ``ValueError`` if the section isn't known.
334 Returns a ``list``.
335 """
337 section = self._section_enum.make(number)
338 return self.sections[section]
340 def find_rrset(
341 self,
342 section: SectionType,
343 name: dns.name.Name,
344 rdclass: dns.rdataclass.RdataClass,
345 rdtype: dns.rdatatype.RdataType,
346 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
347 deleting: Optional[dns.rdataclass.RdataClass] = None,
348 create: bool = False,
349 force_unique: bool = False,
350 idna_codec: Optional[dns.name.IDNACodec] = None,
351 ) -> dns.rrset.RRset:
352 """Find the RRset with the given attributes in the specified section.
354 *section*, an ``int`` section number, a ``str`` section name, or one of
355 the section attributes of this message. This specifies the
356 the section of the message to search. For example::
358 my_message.find_rrset(my_message.answer, name, rdclass, rdtype)
359 my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype)
360 my_message.find_rrset("ANSWER", name, rdclass, rdtype)
362 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
364 *rdclass*, an ``int`` or ``str``, the class of the RRset.
366 *rdtype*, an ``int`` or ``str``, the type of the RRset.
368 *covers*, an ``int`` or ``str``, the covers value of the RRset.
369 The default is ``dns.rdatatype.NONE``.
371 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
372 RRset. The default is ``None``.
374 *create*, a ``bool``. If ``True``, create the RRset if it is not found.
375 The created RRset is appended to *section*.
377 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``,
378 create a new RRset regardless of whether a matching RRset exists
379 already. The default is ``False``. This is useful when creating
380 DDNS Update messages, as order matters for them.
382 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
383 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
384 is used.
386 Raises ``KeyError`` if the RRset was not found and create was
387 ``False``.
389 Returns a ``dns.rrset.RRset object``.
390 """
392 if isinstance(section, int):
393 section_number = section
394 section = self.section_from_number(section_number)
395 elif isinstance(section, str):
396 section_number = MessageSection.from_text(section)
397 section = self.section_from_number(section_number)
398 else:
399 section_number = self.section_number(section)
400 if isinstance(name, str):
401 name = dns.name.from_text(name, idna_codec=idna_codec)
402 rdtype = dns.rdatatype.RdataType.make(rdtype)
403 rdclass = dns.rdataclass.RdataClass.make(rdclass)
404 covers = dns.rdatatype.RdataType.make(covers)
405 if deleting is not None:
406 deleting = dns.rdataclass.RdataClass.make(deleting)
407 key = (section_number, name, rdclass, rdtype, covers, deleting)
408 if not force_unique:
409 if self.index is not None:
410 rrset = self.index.get(key)
411 if rrset is not None:
412 return rrset
413 else:
414 for rrset in section:
415 if rrset.full_match(name, rdclass, rdtype, covers, deleting):
416 return rrset
417 if not create:
418 raise KeyError
419 rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting)
420 section.append(rrset)
421 if self.index is not None:
422 self.index[key] = rrset
423 return rrset
425 def get_rrset(
426 self,
427 section: SectionType,
428 name: dns.name.Name,
429 rdclass: dns.rdataclass.RdataClass,
430 rdtype: dns.rdatatype.RdataType,
431 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
432 deleting: Optional[dns.rdataclass.RdataClass] = None,
433 create: bool = False,
434 force_unique: bool = False,
435 idna_codec: Optional[dns.name.IDNACodec] = None,
436 ) -> Optional[dns.rrset.RRset]:
437 """Get the RRset with the given attributes in the specified section.
439 If the RRset is not found, None is returned.
441 *section*, an ``int`` section number, a ``str`` section name, or one of
442 the section attributes of this message. This specifies the
443 the section of the message to search. For example::
445 my_message.get_rrset(my_message.answer, name, rdclass, rdtype)
446 my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype)
447 my_message.get_rrset("ANSWER", name, rdclass, rdtype)
449 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
451 *rdclass*, an ``int`` or ``str``, the class of the RRset.
453 *rdtype*, an ``int`` or ``str``, the type of the RRset.
455 *covers*, an ``int`` or ``str``, the covers value of the RRset.
456 The default is ``dns.rdatatype.NONE``.
458 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
459 RRset. The default is ``None``.
461 *create*, a ``bool``. If ``True``, create the RRset if it is not found.
462 The created RRset is appended to *section*.
464 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``,
465 create a new RRset regardless of whether a matching RRset exists
466 already. The default is ``False``. This is useful when creating
467 DDNS Update messages, as order matters for them.
469 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
470 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
471 is used.
473 Returns a ``dns.rrset.RRset object`` or ``None``.
474 """
476 try:
477 rrset = self.find_rrset(
478 section,
479 name,
480 rdclass,
481 rdtype,
482 covers,
483 deleting,
484 create,
485 force_unique,
486 idna_codec,
487 )
488 except KeyError:
489 rrset = None
490 return rrset
492 def _compute_opt_reserve(self) -> int:
493 """Compute the size required for the OPT RR, padding excluded"""
494 if not self.opt:
495 return 0
496 # 1 byte for the root name, 10 for the standard RR fields
497 size = 11
498 # This would be more efficient if options had a size() method, but we won't
499 # worry about that for now. We also don't worry if there is an existing padding
500 # option, as it is unlikely and probably harmless, as the worst case is that we
501 # may add another, and this seems to be legal.
502 for option in self.opt[0].options:
503 wire = option.to_wire()
504 # We add 4 here to account for the option type and length
505 size += len(wire) + 4
506 if self.pad:
507 # Padding will be added, so again add the option type and length.
508 size += 4
509 return size
511 def _compute_tsig_reserve(self) -> int:
512 """Compute the size required for the TSIG RR"""
513 # This would be more efficient if TSIGs had a size method, but we won't
514 # worry about for now. Also, we can't really cope with the potential
515 # compressibility of the TSIG owner name, so we estimate with the uncompressed
516 # size. We will disable compression when TSIG and padding are both is active
517 # so that the padding comes out right.
518 if not self.tsig:
519 return 0
520 f = io.BytesIO()
521 self.tsig.to_wire(f)
522 return len(f.getvalue())
524 def to_wire(
525 self,
526 origin: Optional[dns.name.Name] = None,
527 max_size: int = 0,
528 multi: bool = False,
529 tsig_ctx: Optional[Any] = None,
530 **kw: Dict[str, Any],
531 ) -> bytes:
532 """Return a string containing the message in DNS compressed wire
533 format.
535 Additional keyword arguments are passed to the RRset ``to_wire()``
536 method.
538 *origin*, a ``dns.name.Name`` or ``None``, the origin to be appended
539 to any relative names. If ``None``, and the message has an origin
540 attribute that is not ``None``, then it will be used.
542 *max_size*, an ``int``, the maximum size of the wire format
543 output; default is 0, which means "the message's request
544 payload, if nonzero, or 65535".
546 *multi*, a ``bool``, should be set to ``True`` if this message is
547 part of a multiple message sequence.
549 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the
550 ongoing TSIG context, used when signing zone transfers.
552 Raises ``dns.exception.TooBig`` if *max_size* was exceeded.
554 Returns a ``bytes``.
555 """
557 if origin is None and self.origin is not None:
558 origin = self.origin
559 if max_size == 0:
560 if self.request_payload != 0:
561 max_size = self.request_payload
562 else:
563 max_size = 65535
564 if max_size < 512:
565 max_size = 512
566 elif max_size > 65535:
567 max_size = 65535
568 r = dns.renderer.Renderer(self.id, self.flags, max_size, origin)
569 opt_reserve = self._compute_opt_reserve()
570 r.reserve(opt_reserve)
571 tsig_reserve = self._compute_tsig_reserve()
572 r.reserve(tsig_reserve)
573 for rrset in self.question:
574 r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
575 for rrset in self.answer:
576 r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
577 for rrset in self.authority:
578 r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
579 for rrset in self.additional:
580 r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
581 r.release_reserved()
582 if self.opt is not None:
583 r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve)
584 r.write_header()
585 if self.tsig is not None:
586 (new_tsig, ctx) = dns.tsig.sign(
587 r.get_wire(),
588 self.keyring,
589 self.tsig[0],
590 int(time.time()),
591 self.request_mac,
592 tsig_ctx,
593 multi,
594 )
595 self.tsig.clear()
596 self.tsig.add(new_tsig)
597 r.add_rrset(dns.renderer.ADDITIONAL, self.tsig)
598 r.write_header()
599 if multi:
600 self.tsig_ctx = ctx
601 return r.get_wire()
603 @staticmethod
604 def _make_tsig(
605 keyname, algorithm, time_signed, fudge, mac, original_id, error, other
606 ):
607 tsig = dns.rdtypes.ANY.TSIG.TSIG(
608 dns.rdataclass.ANY,
609 dns.rdatatype.TSIG,
610 algorithm,
611 time_signed,
612 fudge,
613 mac,
614 original_id,
615 error,
616 other,
617 )
618 return dns.rrset.from_rdata(keyname, 0, tsig)
620 def use_tsig(
621 self,
622 keyring: Any,
623 keyname: Optional[Union[dns.name.Name, str]] = None,
624 fudge: int = 300,
625 original_id: Optional[int] = None,
626 tsig_error: int = 0,
627 other_data: bytes = b"",
628 algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
629 ) -> None:
630 """When sending, a TSIG signature using the specified key
631 should be added.
633 *key*, a ``dns.tsig.Key`` is the key to use. If a key is specified,
634 the *keyring* and *algorithm* fields are not used.
636 *keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either
637 the TSIG keyring or key to use.
639 The format of a keyring dict is a mapping from TSIG key name, as
640 ``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``.
641 If a ``dict`` *keyring* is specified but a *keyname* is not, the key
642 used will be the first key in the *keyring*. Note that the order of
643 keys in a dictionary is not defined, so applications should supply a
644 keyname when a ``dict`` keyring is used, unless they know the keyring
645 contains only one key. If a ``callable`` keyring is specified, the
646 callable will be called with the message and the keyname, and is
647 expected to return a key.
649 *keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of
650 this TSIG key to use; defaults to ``None``. If *keyring* is a
651 ``dict``, the key must be defined in it. If *keyring* is a
652 ``dns.tsig.Key``, this is ignored.
654 *fudge*, an ``int``, the TSIG time fudge.
656 *original_id*, an ``int``, the TSIG original id. If ``None``,
657 the message's id is used.
659 *tsig_error*, an ``int``, the TSIG error code.
661 *other_data*, a ``bytes``, the TSIG other data.
663 *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is
664 only used if *keyring* is a ``dict``, and the key entry is a ``bytes``.
665 """
667 if isinstance(keyring, dns.tsig.Key):
668 key = keyring
669 keyname = key.name
670 elif callable(keyring):
671 key = keyring(self, keyname)
672 else:
673 if isinstance(keyname, str):
674 keyname = dns.name.from_text(keyname)
675 if keyname is None:
676 keyname = next(iter(keyring))
677 key = keyring[keyname]
678 if isinstance(key, bytes):
679 key = dns.tsig.Key(keyname, key, algorithm)
680 self.keyring = key
681 if original_id is None:
682 original_id = self.id
683 self.tsig = self._make_tsig(
684 keyname,
685 self.keyring.algorithm,
686 0,
687 fudge,
688 b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm],
689 original_id,
690 tsig_error,
691 other_data,
692 )
694 @property
695 def keyname(self) -> Optional[dns.name.Name]:
696 if self.tsig:
697 return self.tsig.name
698 else:
699 return None
701 @property
702 def keyalgorithm(self) -> Optional[dns.name.Name]:
703 if self.tsig:
704 return self.tsig[0].algorithm
705 else:
706 return None
708 @property
709 def mac(self) -> Optional[bytes]:
710 if self.tsig:
711 return self.tsig[0].mac
712 else:
713 return None
715 @property
716 def tsig_error(self) -> Optional[int]:
717 if self.tsig:
718 return self.tsig[0].error
719 else:
720 return None
722 @property
723 def had_tsig(self) -> bool:
724 return bool(self.tsig)
726 @staticmethod
727 def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None):
728 opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ())
729 return dns.rrset.from_rdata(dns.name.root, int(flags), opt)
731 def use_edns(
732 self,
733 edns: Optional[Union[int, bool]] = 0,
734 ednsflags: int = 0,
735 payload: int = DEFAULT_EDNS_PAYLOAD,
736 request_payload: Optional[int] = None,
737 options: Optional[List[dns.edns.Option]] = None,
738 pad: int = 0,
739 ) -> None:
740 """Configure EDNS behavior.
742 *edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``,
743 or ``-1`` means "do not use EDNS", and in this case the other parameters are
744 ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0".
746 *ednsflags*, an ``int``, the EDNS flag values.
748 *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum
749 size of UDP datagram the sender can handle. I.e. how big a response to this
750 message can be.
752 *request_payload*, an ``int``, is the EDNS payload size to use when sending this
753 message. If not specified, defaults to the value of *payload*.
755 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options.
757 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
758 padding bytes to make the message size a multiple of *pad*. Note that if
759 padding is non-zero, an EDNS PADDING option will always be added to the
760 message.
761 """
763 if edns is None or edns is False:
764 edns = -1
765 elif edns is True:
766 edns = 0
767 if edns < 0:
768 self.opt = None
769 self.request_payload = 0
770 else:
771 # make sure the EDNS version in ednsflags agrees with edns
772 ednsflags &= 0xFF00FFFF
773 ednsflags |= edns << 16
774 if options is None:
775 options = []
776 self.opt = self._make_opt(ednsflags, payload, options)
777 if request_payload is None:
778 request_payload = payload
779 self.request_payload = request_payload
780 self.pad = pad
782 @property
783 def edns(self) -> int:
784 if self.opt:
785 return (self.ednsflags & 0xFF0000) >> 16
786 else:
787 return -1
789 @property
790 def ednsflags(self) -> int:
791 if self.opt:
792 return self.opt.ttl
793 else:
794 return 0
796 @ednsflags.setter
797 def ednsflags(self, v):
798 if self.opt:
799 self.opt.ttl = v
800 elif v:
801 self.opt = self._make_opt(v)
803 @property
804 def payload(self) -> int:
805 if self.opt:
806 return self.opt[0].payload
807 else:
808 return 0
810 @property
811 def options(self) -> Tuple:
812 if self.opt:
813 return self.opt[0].options
814 else:
815 return ()
817 def want_dnssec(self, wanted: bool = True) -> None:
818 """Enable or disable 'DNSSEC desired' flag in requests.
820 *wanted*, a ``bool``. If ``True``, then DNSSEC data is
821 desired in the response, EDNS is enabled if required, and then
822 the DO bit is set. If ``False``, the DO bit is cleared if
823 EDNS is enabled.
824 """
826 if wanted:
827 self.ednsflags |= dns.flags.DO
828 elif self.opt:
829 self.ednsflags &= ~dns.flags.DO
831 def rcode(self) -> dns.rcode.Rcode:
832 """Return the rcode.
834 Returns a ``dns.rcode.Rcode``.
835 """
836 return dns.rcode.from_flags(int(self.flags), int(self.ednsflags))
838 def set_rcode(self, rcode: dns.rcode.Rcode) -> None:
839 """Set the rcode.
841 *rcode*, a ``dns.rcode.Rcode``, is the rcode to set.
842 """
843 (value, evalue) = dns.rcode.to_flags(rcode)
844 self.flags &= 0xFFF0
845 self.flags |= value
846 self.ednsflags &= 0x00FFFFFF
847 self.ednsflags |= evalue
849 def opcode(self) -> dns.opcode.Opcode:
850 """Return the opcode.
852 Returns a ``dns.opcode.Opcode``.
853 """
854 return dns.opcode.from_flags(int(self.flags))
856 def set_opcode(self, opcode: dns.opcode.Opcode) -> None:
857 """Set the opcode.
859 *opcode*, a ``dns.opcode.Opcode``, is the opcode to set.
860 """
861 self.flags &= 0x87FF
862 self.flags |= dns.opcode.to_flags(opcode)
864 def _get_one_rr_per_rrset(self, value):
865 # What the caller picked is fine.
866 return value
868 # pylint: disable=unused-argument
870 def _parse_rr_header(self, section, name, rdclass, rdtype):
871 return (rdclass, rdtype, None, False)
873 # pylint: enable=unused-argument
875 def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype):
876 if rdtype == dns.rdatatype.OPT:
877 if (
878 section != MessageSection.ADDITIONAL
879 or self.opt
880 or name != dns.name.root
881 ):
882 raise BadEDNS
883 elif rdtype == dns.rdatatype.TSIG:
884 if (
885 section != MessageSection.ADDITIONAL
886 or rdclass != dns.rdatatype.ANY
887 or position != count - 1
888 ):
889 raise BadTSIG
890 return (rdclass, rdtype, None, False)
893class ChainingResult:
894 """The result of a call to dns.message.QueryMessage.resolve_chaining().
896 The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't
897 exist.
899 The ``canonical_name`` attribute is the canonical name after all
900 chaining has been applied (this is the same name as ``rrset.name`` in cases
901 where rrset is not ``None``).
903 The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to
904 use if caching the data. It is the smallest of all the CNAME TTLs
905 and either the answer TTL if it exists or the SOA TTL and SOA
906 minimum values for negative answers.
908 The ``cnames`` attribute is a list of all the CNAME RRSets followed to
909 get to the canonical name.
910 """
912 def __init__(
913 self,
914 canonical_name: dns.name.Name,
915 answer: Optional[dns.rrset.RRset],
916 minimum_ttl: int,
917 cnames: List[dns.rrset.RRset],
918 ):
919 self.canonical_name = canonical_name
920 self.answer = answer
921 self.minimum_ttl = minimum_ttl
922 self.cnames = cnames
925class QueryMessage(Message):
926 def resolve_chaining(self) -> ChainingResult:
927 """Follow the CNAME chain in the response to determine the answer
928 RRset.
930 Raises ``dns.message.NotQueryResponse`` if the message is not
931 a response.
933 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
935 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
936 but an answer was found.
938 Raises ``dns.exception.FormError`` if the question count is not 1.
940 Returns a ChainingResult object.
941 """
942 if self.flags & dns.flags.QR == 0:
943 raise NotQueryResponse
944 if len(self.question) != 1:
945 raise dns.exception.FormError
946 question = self.question[0]
947 qname = question.name
948 min_ttl = dns.ttl.MAX_TTL
949 answer = None
950 count = 0
951 cnames = []
952 while count < MAX_CHAIN:
953 try:
954 answer = self.find_rrset(
955 self.answer, qname, question.rdclass, question.rdtype
956 )
957 min_ttl = min(min_ttl, answer.ttl)
958 break
959 except KeyError:
960 if question.rdtype != dns.rdatatype.CNAME:
961 try:
962 crrset = self.find_rrset(
963 self.answer, qname, question.rdclass, dns.rdatatype.CNAME
964 )
965 cnames.append(crrset)
966 min_ttl = min(min_ttl, crrset.ttl)
967 for rd in crrset:
968 qname = rd.target
969 break
970 count += 1
971 continue
972 except KeyError:
973 # Exit the chaining loop
974 break
975 else:
976 # Exit the chaining loop
977 break
978 if count >= MAX_CHAIN:
979 raise ChainTooLong
980 if self.rcode() == dns.rcode.NXDOMAIN and answer is not None:
981 raise AnswerForNXDOMAIN
982 if answer is None:
983 # Further minimize the TTL with NCACHE.
984 auname = qname
985 while True:
986 # Look for an SOA RR whose owner name is a superdomain
987 # of qname.
988 try:
989 srrset = self.find_rrset(
990 self.authority, auname, question.rdclass, dns.rdatatype.SOA
991 )
992 min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum)
993 break
994 except KeyError:
995 try:
996 auname = auname.parent()
997 except dns.name.NoParent:
998 break
999 return ChainingResult(qname, answer, min_ttl, cnames)
1001 def canonical_name(self) -> dns.name.Name:
1002 """Return the canonical name of the first name in the question
1003 section.
1005 Raises ``dns.message.NotQueryResponse`` if the message is not
1006 a response.
1008 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
1010 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
1011 but an answer was found.
1013 Raises ``dns.exception.FormError`` if the question count is not 1.
1014 """
1015 return self.resolve_chaining().canonical_name
1018def _maybe_import_update():
1019 # We avoid circular imports by doing this here. We do it in another
1020 # function as doing it in _message_factory_from_opcode() makes "dns"
1021 # a local symbol, and the first line fails :)
1023 # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import
1024 import dns.update # noqa: F401
1027def _message_factory_from_opcode(opcode):
1028 if opcode == dns.opcode.QUERY:
1029 return QueryMessage
1030 elif opcode == dns.opcode.UPDATE:
1031 _maybe_import_update()
1032 return dns.update.UpdateMessage
1033 else:
1034 return Message
1037class _WireReader:
1039 """Wire format reader.
1041 parser: the binary parser
1042 message: The message object being built
1043 initialize_message: Callback to set message parsing options
1044 question_only: Are we only reading the question?
1045 one_rr_per_rrset: Put each RR into its own RRset?
1046 keyring: TSIG keyring
1047 ignore_trailing: Ignore trailing junk at end of request?
1048 multi: Is this message part of a multi-message sequence?
1049 DNS dynamic updates.
1050 continue_on_error: try to extract as much information as possible from
1051 the message, accumulating MessageErrors in the *errors* attribute instead of
1052 raising them.
1053 """
1055 def __init__(
1056 self,
1057 wire,
1058 initialize_message,
1059 question_only=False,
1060 one_rr_per_rrset=False,
1061 ignore_trailing=False,
1062 keyring=None,
1063 multi=False,
1064 continue_on_error=False,
1065 ):
1066 self.parser = dns.wire.Parser(wire)
1067 self.message = None
1068 self.initialize_message = initialize_message
1069 self.question_only = question_only
1070 self.one_rr_per_rrset = one_rr_per_rrset
1071 self.ignore_trailing = ignore_trailing
1072 self.keyring = keyring
1073 self.multi = multi
1074 self.continue_on_error = continue_on_error
1075 self.errors = []
1077 def _get_question(self, section_number, qcount):
1078 """Read the next *qcount* records from the wire data and add them to
1079 the question section.
1080 """
1081 assert self.message is not None
1082 section = self.message.sections[section_number]
1083 for _ in range(qcount):
1084 qname = self.parser.get_name(self.message.origin)
1085 (rdtype, rdclass) = self.parser.get_struct("!HH")
1086 (rdclass, rdtype, _, _) = self.message._parse_rr_header(
1087 section_number, qname, rdclass, rdtype
1088 )
1089 self.message.find_rrset(
1090 section, qname, rdclass, rdtype, create=True, force_unique=True
1091 )
1093 def _add_error(self, e):
1094 self.errors.append(MessageError(e, self.parser.current))
1096 def _get_section(self, section_number, count):
1097 """Read the next I{count} records from the wire data and add them to
1098 the specified section.
1100 section_number: the section of the message to which to add records
1101 count: the number of records to read
1102 """
1103 assert self.message is not None
1104 section = self.message.sections[section_number]
1105 force_unique = self.one_rr_per_rrset
1106 for i in range(count):
1107 rr_start = self.parser.current
1108 absolute_name = self.parser.get_name()
1109 if self.message.origin is not None:
1110 name = absolute_name.relativize(self.message.origin)
1111 else:
1112 name = absolute_name
1113 (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH")
1114 if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG):
1115 (
1116 rdclass,
1117 rdtype,
1118 deleting,
1119 empty,
1120 ) = self.message._parse_special_rr_header(
1121 section_number, count, i, name, rdclass, rdtype
1122 )
1123 else:
1124 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
1125 section_number, name, rdclass, rdtype
1126 )
1127 rdata_start = self.parser.current
1128 try:
1129 if empty:
1130 if rdlen > 0:
1131 raise dns.exception.FormError
1132 rd = None
1133 covers = dns.rdatatype.NONE
1134 else:
1135 with self.parser.restrict_to(rdlen):
1136 rd = dns.rdata.from_wire_parser(
1137 rdclass, rdtype, self.parser, self.message.origin
1138 )
1139 covers = rd.covers()
1140 if self.message.xfr and rdtype == dns.rdatatype.SOA:
1141 force_unique = True
1142 if rdtype == dns.rdatatype.OPT:
1143 self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
1144 elif rdtype == dns.rdatatype.TSIG:
1145 if self.keyring is None:
1146 raise UnknownTSIGKey("got signed message without keyring")
1147 if isinstance(self.keyring, dict):
1148 key = self.keyring.get(absolute_name)
1149 if isinstance(key, bytes):
1150 key = dns.tsig.Key(absolute_name, key, rd.algorithm)
1151 elif callable(self.keyring):
1152 key = self.keyring(self.message, absolute_name)
1153 else:
1154 key = self.keyring
1155 if key is None:
1156 raise UnknownTSIGKey("key '%s' unknown" % name)
1157 self.message.keyring = key
1158 self.message.tsig_ctx = dns.tsig.validate(
1159 self.parser.wire,
1160 key,
1161 absolute_name,
1162 rd,
1163 int(time.time()),
1164 self.message.request_mac,
1165 rr_start,
1166 self.message.tsig_ctx,
1167 self.multi,
1168 )
1169 self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd)
1170 else:
1171 rrset = self.message.find_rrset(
1172 section,
1173 name,
1174 rdclass,
1175 rdtype,
1176 covers,
1177 deleting,
1178 True,
1179 force_unique,
1180 )
1181 if rd is not None:
1182 if ttl > 0x7FFFFFFF:
1183 ttl = 0
1184 rrset.add(rd, ttl)
1185 except Exception as e:
1186 if self.continue_on_error:
1187 self._add_error(e)
1188 self.parser.seek(rdata_start + rdlen)
1189 else:
1190 raise
1192 def read(self):
1193 """Read a wire format DNS message and build a dns.message.Message
1194 object."""
1196 if self.parser.remaining() < 12:
1197 raise ShortHeader
1198 (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct(
1199 "!HHHHHH"
1200 )
1201 factory = _message_factory_from_opcode(dns.opcode.from_flags(flags))
1202 self.message = factory(id=id)
1203 self.message.flags = dns.flags.Flag(flags)
1204 self.initialize_message(self.message)
1205 self.one_rr_per_rrset = self.message._get_one_rr_per_rrset(
1206 self.one_rr_per_rrset
1207 )
1208 try:
1209 self._get_question(MessageSection.QUESTION, qcount)
1210 if self.question_only:
1211 return self.message
1212 self._get_section(MessageSection.ANSWER, ancount)
1213 self._get_section(MessageSection.AUTHORITY, aucount)
1214 self._get_section(MessageSection.ADDITIONAL, adcount)
1215 if not self.ignore_trailing and self.parser.remaining() != 0:
1216 raise TrailingJunk
1217 if self.multi and self.message.tsig_ctx and not self.message.had_tsig:
1218 self.message.tsig_ctx.update(self.parser.wire)
1219 except Exception as e:
1220 if self.continue_on_error:
1221 self._add_error(e)
1222 else:
1223 raise
1224 return self.message
1227def from_wire(
1228 wire: bytes,
1229 keyring: Optional[Any] = None,
1230 request_mac: Optional[bytes] = b"",
1231 xfr: bool = False,
1232 origin: Optional[dns.name.Name] = None,
1233 tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None,
1234 multi: bool = False,
1235 question_only: bool = False,
1236 one_rr_per_rrset: bool = False,
1237 ignore_trailing: bool = False,
1238 raise_on_truncation: bool = False,
1239 continue_on_error: bool = False,
1240) -> Message:
1241 """Convert a DNS wire format message into a message object.
1243 *keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message
1244 is signed.
1246 *request_mac*, a ``bytes`` or ``None``. If the message is a response to a
1247 TSIG-signed request, *request_mac* should be set to the MAC of that request.
1249 *xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone
1250 transfer.
1252 *origin*, a ``dns.name.Name`` or ``None``. If the message is part of a zone
1253 transfer, *origin* should be the origin name of the zone. If not ``None``, names
1254 will be relativized to the origin.
1256 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG
1257 context, used when validating zone transfers.
1259 *multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple
1260 message sequence.
1262 *question_only*, a ``bool``. If ``True``, read only up to the end of the question
1263 section.
1265 *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset.
1267 *ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the
1268 message.
1270 *raise_on_truncation*, a ``bool``. If ``True``, raise an exception if the TC bit is
1271 set.
1273 *continue_on_error*, a ``bool``. If ``True``, try to continue parsing even if
1274 errors occur. Erroneous rdata will be ignored. Errors will be accumulated as a
1275 list of MessageError objects in the message's ``errors`` attribute. This option is
1276 recommended only for DNS analysis tools, or for use in a server as part of an error
1277 handling path. The default is ``False``.
1279 Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long.
1281 Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end
1282 of the proper DNS message, and *ignore_trailing* is ``False``.
1284 Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or
1285 occurred more than once.
1287 Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the
1288 additional data section.
1290 Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is
1291 ``True``.
1293 Returns a ``dns.message.Message``.
1294 """
1296 # We permit None for request_mac solely for backwards compatibility
1297 if request_mac is None:
1298 request_mac = b""
1300 def initialize_message(message):
1301 message.request_mac = request_mac
1302 message.xfr = xfr
1303 message.origin = origin
1304 message.tsig_ctx = tsig_ctx
1306 reader = _WireReader(
1307 wire,
1308 initialize_message,
1309 question_only,
1310 one_rr_per_rrset,
1311 ignore_trailing,
1312 keyring,
1313 multi,
1314 continue_on_error,
1315 )
1316 try:
1317 m = reader.read()
1318 except dns.exception.FormError:
1319 if (
1320 reader.message
1321 and (reader.message.flags & dns.flags.TC)
1322 and raise_on_truncation
1323 ):
1324 raise Truncated(message=reader.message)
1325 else:
1326 raise
1327 # Reading a truncated message might not have any errors, so we
1328 # have to do this check here too.
1329 if m.flags & dns.flags.TC and raise_on_truncation:
1330 raise Truncated(message=m)
1331 if continue_on_error:
1332 m.errors = reader.errors
1334 return m
1337class _TextReader:
1339 """Text format reader.
1341 tok: the tokenizer.
1342 message: The message object being built.
1343 DNS dynamic updates.
1344 last_name: The most recently read name when building a message object.
1345 one_rr_per_rrset: Put each RR into its own RRset?
1346 origin: The origin for relative names
1347 relativize: relativize names?
1348 relativize_to: the origin to relativize to.
1349 """
1351 def __init__(
1352 self,
1353 text,
1354 idna_codec,
1355 one_rr_per_rrset=False,
1356 origin=None,
1357 relativize=True,
1358 relativize_to=None,
1359 ):
1360 self.message = None
1361 self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec)
1362 self.last_name = None
1363 self.one_rr_per_rrset = one_rr_per_rrset
1364 self.origin = origin
1365 self.relativize = relativize
1366 self.relativize_to = relativize_to
1367 self.id = None
1368 self.edns = -1
1369 self.ednsflags = 0
1370 self.payload = DEFAULT_EDNS_PAYLOAD
1371 self.rcode = None
1372 self.opcode = dns.opcode.QUERY
1373 self.flags = 0
1375 def _header_line(self, _):
1376 """Process one line from the text format header section."""
1378 token = self.tok.get()
1379 what = token.value
1380 if what == "id":
1381 self.id = self.tok.get_int()
1382 elif what == "flags":
1383 while True:
1384 token = self.tok.get()
1385 if not token.is_identifier():
1386 self.tok.unget(token)
1387 break
1388 self.flags = self.flags | dns.flags.from_text(token.value)
1389 elif what == "edns":
1390 self.edns = self.tok.get_int()
1391 self.ednsflags = self.ednsflags | (self.edns << 16)
1392 elif what == "eflags":
1393 if self.edns < 0:
1394 self.edns = 0
1395 while True:
1396 token = self.tok.get()
1397 if not token.is_identifier():
1398 self.tok.unget(token)
1399 break
1400 self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value)
1401 elif what == "payload":
1402 self.payload = self.tok.get_int()
1403 if self.edns < 0:
1404 self.edns = 0
1405 elif what == "opcode":
1406 text = self.tok.get_string()
1407 self.opcode = dns.opcode.from_text(text)
1408 self.flags = self.flags | dns.opcode.to_flags(self.opcode)
1409 elif what == "rcode":
1410 text = self.tok.get_string()
1411 self.rcode = dns.rcode.from_text(text)
1412 else:
1413 raise UnknownHeaderField
1414 self.tok.get_eol()
1416 def _question_line(self, section_number):
1417 """Process one line from the text format question section."""
1419 section = self.message.sections[section_number]
1420 token = self.tok.get(want_leading=True)
1421 if not token.is_whitespace():
1422 self.last_name = self.tok.as_name(
1423 token, self.message.origin, self.relativize, self.relativize_to
1424 )
1425 name = self.last_name
1426 if name is None:
1427 raise NoPreviousName
1428 token = self.tok.get()
1429 if not token.is_identifier():
1430 raise dns.exception.SyntaxError
1431 # Class
1432 try:
1433 rdclass = dns.rdataclass.from_text(token.value)
1434 token = self.tok.get()
1435 if not token.is_identifier():
1436 raise dns.exception.SyntaxError
1437 except dns.exception.SyntaxError:
1438 raise dns.exception.SyntaxError
1439 except Exception:
1440 rdclass = dns.rdataclass.IN
1441 # Type
1442 rdtype = dns.rdatatype.from_text(token.value)
1443 (rdclass, rdtype, _, _) = self.message._parse_rr_header(
1444 section_number, name, rdclass, rdtype
1445 )
1446 self.message.find_rrset(
1447 section, name, rdclass, rdtype, create=True, force_unique=True
1448 )
1449 self.tok.get_eol()
1451 def _rr_line(self, section_number):
1452 """Process one line from the text format answer, authority, or
1453 additional data sections.
1454 """
1456 section = self.message.sections[section_number]
1457 # Name
1458 token = self.tok.get(want_leading=True)
1459 if not token.is_whitespace():
1460 self.last_name = self.tok.as_name(
1461 token, self.message.origin, self.relativize, self.relativize_to
1462 )
1463 name = self.last_name
1464 if name is None:
1465 raise NoPreviousName
1466 token = self.tok.get()
1467 if not token.is_identifier():
1468 raise dns.exception.SyntaxError
1469 # TTL
1470 try:
1471 ttl = int(token.value, 0)
1472 token = self.tok.get()
1473 if not token.is_identifier():
1474 raise dns.exception.SyntaxError
1475 except dns.exception.SyntaxError:
1476 raise dns.exception.SyntaxError
1477 except Exception:
1478 ttl = 0
1479 # Class
1480 try:
1481 rdclass = dns.rdataclass.from_text(token.value)
1482 token = self.tok.get()
1483 if not token.is_identifier():
1484 raise dns.exception.SyntaxError
1485 except dns.exception.SyntaxError:
1486 raise dns.exception.SyntaxError
1487 except Exception:
1488 rdclass = dns.rdataclass.IN
1489 # Type
1490 rdtype = dns.rdatatype.from_text(token.value)
1491 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
1492 section_number, name, rdclass, rdtype
1493 )
1494 token = self.tok.get()
1495 if empty and not token.is_eol_or_eof():
1496 raise dns.exception.SyntaxError
1497 if not empty and token.is_eol_or_eof():
1498 raise dns.exception.UnexpectedEnd
1499 if not token.is_eol_or_eof():
1500 self.tok.unget(token)
1501 rd = dns.rdata.from_text(
1502 rdclass,
1503 rdtype,
1504 self.tok,
1505 self.message.origin,
1506 self.relativize,
1507 self.relativize_to,
1508 )
1509 covers = rd.covers()
1510 else:
1511 rd = None
1512 covers = dns.rdatatype.NONE
1513 rrset = self.message.find_rrset(
1514 section,
1515 name,
1516 rdclass,
1517 rdtype,
1518 covers,
1519 deleting,
1520 True,
1521 self.one_rr_per_rrset,
1522 )
1523 if rd is not None:
1524 rrset.add(rd, ttl)
1526 def _make_message(self):
1527 factory = _message_factory_from_opcode(self.opcode)
1528 message = factory(id=self.id)
1529 message.flags = self.flags
1530 if self.edns >= 0:
1531 message.use_edns(self.edns, self.ednsflags, self.payload)
1532 if self.rcode:
1533 message.set_rcode(self.rcode)
1534 if self.origin:
1535 message.origin = self.origin
1536 return message
1538 def read(self):
1539 """Read a text format DNS message and build a dns.message.Message
1540 object."""
1542 line_method = self._header_line
1543 section_number = None
1544 while 1:
1545 token = self.tok.get(True, True)
1546 if token.is_eol_or_eof():
1547 break
1548 if token.is_comment():
1549 u = token.value.upper()
1550 if u == "HEADER":
1551 line_method = self._header_line
1553 if self.message:
1554 message = self.message
1555 else:
1556 # If we don't have a message, create one with the current
1557 # opcode, so that we know which section names to parse.
1558 message = self._make_message()
1559 try:
1560 section_number = message._section_enum.from_text(u)
1561 # We found a section name. If we don't have a message,
1562 # use the one we just created.
1563 if not self.message:
1564 self.message = message
1565 self.one_rr_per_rrset = message._get_one_rr_per_rrset(
1566 self.one_rr_per_rrset
1567 )
1568 if section_number == MessageSection.QUESTION:
1569 line_method = self._question_line
1570 else:
1571 line_method = self._rr_line
1572 except Exception:
1573 # It's just a comment.
1574 pass
1575 self.tok.get_eol()
1576 continue
1577 self.tok.unget(token)
1578 line_method(section_number)
1579 if not self.message:
1580 self.message = self._make_message()
1581 return self.message
1584def from_text(
1585 text: str,
1586 idna_codec: Optional[dns.name.IDNACodec] = None,
1587 one_rr_per_rrset: bool = False,
1588 origin: Optional[dns.name.Name] = None,
1589 relativize: bool = True,
1590 relativize_to: Optional[dns.name.Name] = None,
1591) -> Message:
1592 """Convert the text format message into a message object.
1594 The reader stops after reading the first blank line in the input to
1595 facilitate reading multiple messages from a single file with
1596 ``dns.message.from_file()``.
1598 *text*, a ``str``, the text format message.
1600 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1601 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1602 is used.
1604 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put
1605 into its own rrset. The default is ``False``.
1607 *origin*, a ``dns.name.Name`` (or ``None``), the
1608 origin to use for relative names.
1610 *relativize*, a ``bool``. If true, name will be relativized.
1612 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
1613 when relativizing names. If not set, the *origin* value will be used.
1615 Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
1617 Raises ``dns.exception.SyntaxError`` if the text is badly formed.
1619 Returns a ``dns.message.Message object``
1620 """
1622 # 'text' can also be a file, but we don't publish that fact
1623 # since it's an implementation detail. The official file
1624 # interface is from_file().
1626 reader = _TextReader(
1627 text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to
1628 )
1629 return reader.read()
1632def from_file(
1633 f: Any,
1634 idna_codec: Optional[dns.name.IDNACodec] = None,
1635 one_rr_per_rrset: bool = False,
1636) -> Message:
1637 """Read the next text format message from the specified file.
1639 Message blocks are separated by a single blank line.
1641 *f*, a ``file`` or ``str``. If *f* is text, it is treated as the
1642 pathname of a file to open.
1644 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1645 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1646 is used.
1648 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put
1649 into its own rrset. The default is ``False``.
1651 Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
1653 Raises ``dns.exception.SyntaxError`` if the text is badly formed.
1655 Returns a ``dns.message.Message object``
1656 """
1658 if isinstance(f, str):
1659 cm: contextlib.AbstractContextManager = open(f)
1660 else:
1661 cm = contextlib.nullcontext(f)
1662 with cm as f:
1663 return from_text(f, idna_codec, one_rr_per_rrset)
1664 assert False # for mypy lgtm[py/unreachable-statement]
1667def make_query(
1668 qname: Union[dns.name.Name, str],
1669 rdtype: Union[dns.rdatatype.RdataType, str],
1670 rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
1671 use_edns: Optional[Union[int, bool]] = None,
1672 want_dnssec: bool = False,
1673 ednsflags: Optional[int] = None,
1674 payload: Optional[int] = None,
1675 request_payload: Optional[int] = None,
1676 options: Optional[List[dns.edns.Option]] = None,
1677 idna_codec: Optional[dns.name.IDNACodec] = None,
1678 id: Optional[int] = None,
1679 flags: int = dns.flags.RD,
1680 pad: int = 0,
1681) -> QueryMessage:
1682 """Make a query message.
1684 The query name, type, and class may all be specified either
1685 as objects of the appropriate type, or as strings.
1687 The query will have a randomly chosen query id, and its DNS flags
1688 will be set to dns.flags.RD.
1690 qname, a ``dns.name.Name`` or ``str``, the query name.
1692 *rdtype*, an ``int`` or ``str``, the desired rdata type.
1694 *rdclass*, an ``int`` or ``str``, the desired rdata class; the default
1695 is class IN.
1697 *use_edns*, an ``int``, ``bool`` or ``None``. The EDNS level to use; the
1698 default is ``None``. If ``None``, EDNS will be enabled only if other
1699 parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are
1700 set.
1701 See the description of dns.message.Message.use_edns() for the possible
1702 values for use_edns and their meanings.
1704 *want_dnssec*, a ``bool``. If ``True``, DNSSEC data is desired.
1706 *ednsflags*, an ``int``, the EDNS flag values.
1708 *payload*, an ``int``, is the EDNS sender's payload field, which is the
1709 maximum size of UDP datagram the sender can handle. I.e. how big
1710 a response to this message can be.
1712 *request_payload*, an ``int``, is the EDNS payload size to use when
1713 sending this message. If not specified, defaults to the value of
1714 *payload*.
1716 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS
1717 options.
1719 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1720 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1721 is used.
1723 *id*, an ``int`` or ``None``, the desired query id. The default is
1724 ``None``, which generates a random query id.
1726 *flags*, an ``int``, the desired query flags. The default is
1727 ``dns.flags.RD``.
1729 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
1730 padding bytes to make the message size a multiple of *pad*. Note that if
1731 padding is non-zero, an EDNS PADDING option will always be added to the
1732 message.
1734 Returns a ``dns.message.QueryMessage``
1735 """
1737 if isinstance(qname, str):
1738 qname = dns.name.from_text(qname, idna_codec=idna_codec)
1739 rdtype = dns.rdatatype.RdataType.make(rdtype)
1740 rdclass = dns.rdataclass.RdataClass.make(rdclass)
1741 m = QueryMessage(id=id)
1742 m.flags = dns.flags.Flag(flags)
1743 m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True)
1744 # only pass keywords on to use_edns if they have been set to a
1745 # non-None value. Setting a field will turn EDNS on if it hasn't
1746 # been configured.
1747 kwargs: Dict[str, Any] = {}
1748 if ednsflags is not None:
1749 kwargs["ednsflags"] = ednsflags
1750 if payload is not None:
1751 kwargs["payload"] = payload
1752 if request_payload is not None:
1753 kwargs["request_payload"] = request_payload
1754 if options is not None:
1755 kwargs["options"] = options
1756 if kwargs and use_edns is None:
1757 use_edns = 0
1758 kwargs["edns"] = use_edns
1759 kwargs["pad"] = pad
1760 m.use_edns(**kwargs)
1761 m.want_dnssec(want_dnssec)
1762 return m
1765def make_response(
1766 query: Message,
1767 recursion_available: bool = False,
1768 our_payload: int = 8192,
1769 fudge: int = 300,
1770 tsig_error: int = 0,
1771) -> Message:
1772 """Make a message which is a response for the specified query.
1773 The message returned is really a response skeleton; it has all
1774 of the infrastructure required of a response, but none of the
1775 content.
1777 The response's question section is a shallow copy of the query's
1778 question section, so the query's question RRsets should not be
1779 changed.
1781 *query*, a ``dns.message.Message``, the query to respond to.
1783 *recursion_available*, a ``bool``, should RA be set in the response?
1785 *our_payload*, an ``int``, the payload size to advertise in EDNS
1786 responses.
1788 *fudge*, an ``int``, the TSIG time fudge.
1790 *tsig_error*, an ``int``, the TSIG error.
1792 Returns a ``dns.message.Message`` object whose specific class is
1793 appropriate for the query. For example, if query is a
1794 ``dns.update.UpdateMessage``, response will be too.
1795 """
1797 if query.flags & dns.flags.QR:
1798 raise dns.exception.FormError("specified query message is not a query")
1799 factory = _message_factory_from_opcode(query.opcode())
1800 response = factory(id=query.id)
1801 response.flags = dns.flags.QR | (query.flags & dns.flags.RD)
1802 if recursion_available:
1803 response.flags |= dns.flags.RA
1804 response.set_opcode(query.opcode())
1805 response.question = list(query.question)
1806 if query.edns >= 0:
1807 response.use_edns(0, 0, our_payload, query.payload)
1808 if query.had_tsig:
1809 response.use_tsig(
1810 query.keyring,
1811 query.keyname,
1812 fudge,
1813 None,
1814 tsig_error,
1815 b"",
1816 query.keyalgorithm,
1817 )
1818 response.request_mac = query.mac
1819 return response
1822### BEGIN generated MessageSection constants
1824QUESTION = MessageSection.QUESTION
1825ANSWER = MessageSection.ANSWER
1826AUTHORITY = MessageSection.AUTHORITY
1827ADDITIONAL = MessageSection.ADDITIONAL
1829### END generated MessageSection constants