Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/dns/message.py: 48%
777 statements
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:58 +0000
« prev ^ index » next coverage.py v7.3.2, created at 2023-12-08 06:58 +0000
1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
3# Copyright (C) 2001-2017 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
18"""DNS Messages"""
20import contextlib
21import io
22import time
23from typing import Any, Dict, List, Optional, Tuple, Union
25import dns.edns
26import dns.entropy
27import dns.enum
28import dns.exception
29import dns.flags
30import dns.name
31import dns.opcode
32import dns.rcode
33import dns.rdata
34import dns.rdataclass
35import dns.rdatatype
36import dns.rdtypes.ANY.OPT
37import dns.rdtypes.ANY.TSIG
38import dns.renderer
39import dns.rrset
40import dns.tsig
41import dns.ttl
42import dns.wire
45class ShortHeader(dns.exception.FormError):
46 """The DNS packet passed to from_wire() is too short."""
49class TrailingJunk(dns.exception.FormError):
50 """The DNS packet passed to from_wire() has extra junk at the end of it."""
53class UnknownHeaderField(dns.exception.DNSException):
54 """The header field name was not recognized when converting from text
55 into a message."""
58class BadEDNS(dns.exception.FormError):
59 """An OPT record occurred somewhere other than
60 the additional data section."""
63class BadTSIG(dns.exception.FormError):
64 """A TSIG record occurred somewhere other than the end of
65 the additional data section."""
68class UnknownTSIGKey(dns.exception.DNSException):
69 """A TSIG with an unknown key was received."""
72class Truncated(dns.exception.DNSException):
73 """The truncated flag is set."""
75 supp_kwargs = {"message"}
77 # We do this as otherwise mypy complains about unexpected keyword argument
78 # idna_exception
79 def __init__(self, *args, **kwargs):
80 super().__init__(*args, **kwargs)
82 def message(self):
83 """As much of the message as could be processed.
85 Returns a ``dns.message.Message``.
86 """
87 return self.kwargs["message"]
90class NotQueryResponse(dns.exception.DNSException):
91 """Message is not a response to a query."""
94class ChainTooLong(dns.exception.DNSException):
95 """The CNAME chain is too long."""
98class AnswerForNXDOMAIN(dns.exception.DNSException):
99 """The rcode is NXDOMAIN but an answer was found."""
102class NoPreviousName(dns.exception.SyntaxError):
103 """No previous name was known."""
106class MessageSection(dns.enum.IntEnum):
107 """Message sections"""
109 QUESTION = 0
110 ANSWER = 1
111 AUTHORITY = 2
112 ADDITIONAL = 3
114 @classmethod
115 def _maximum(cls):
116 return 3
119class MessageError:
120 def __init__(self, exception: Exception, offset: int):
121 self.exception = exception
122 self.offset = offset
125DEFAULT_EDNS_PAYLOAD = 1232
126MAX_CHAIN = 16
128IndexKeyType = Tuple[
129 int,
130 dns.name.Name,
131 dns.rdataclass.RdataClass,
132 dns.rdatatype.RdataType,
133 Optional[dns.rdatatype.RdataType],
134 Optional[dns.rdataclass.RdataClass],
135]
136IndexType = Dict[IndexKeyType, dns.rrset.RRset]
137SectionType = Union[int, str, List[dns.rrset.RRset]]
140class Message:
141 """A DNS message."""
143 _section_enum = MessageSection
145 def __init__(self, id: Optional[int] = None):
146 if id is None:
147 self.id = dns.entropy.random_16()
148 else:
149 self.id = id
150 self.flags = 0
151 self.sections: List[List[dns.rrset.RRset]] = [[], [], [], []]
152 self.opt: Optional[dns.rrset.RRset] = None
153 self.request_payload = 0
154 self.pad = 0
155 self.keyring: Any = None
156 self.tsig: Optional[dns.rrset.RRset] = None
157 self.request_mac = b""
158 self.xfr = False
159 self.origin: Optional[dns.name.Name] = None
160 self.tsig_ctx: Optional[Any] = None
161 self.index: IndexType = {}
162 self.errors: List[MessageError] = []
163 self.time = 0.0
165 @property
166 def question(self) -> List[dns.rrset.RRset]:
167 """The question section."""
168 return self.sections[0]
170 @question.setter
171 def question(self, v):
172 self.sections[0] = v
174 @property
175 def answer(self) -> List[dns.rrset.RRset]:
176 """The answer section."""
177 return self.sections[1]
179 @answer.setter
180 def answer(self, v):
181 self.sections[1] = v
183 @property
184 def authority(self) -> List[dns.rrset.RRset]:
185 """The authority section."""
186 return self.sections[2]
188 @authority.setter
189 def authority(self, v):
190 self.sections[2] = v
192 @property
193 def additional(self) -> List[dns.rrset.RRset]:
194 """The additional data section."""
195 return self.sections[3]
197 @additional.setter
198 def additional(self, v):
199 self.sections[3] = v
201 def __repr__(self):
202 return "<DNS message, ID " + repr(self.id) + ">"
204 def __str__(self):
205 return self.to_text()
207 def to_text(
208 self,
209 origin: Optional[dns.name.Name] = None,
210 relativize: bool = True,
211 **kw: Dict[str, Any],
212 ) -> str:
213 """Convert the message to text.
215 The *origin*, *relativize*, and any other keyword
216 arguments are passed to the RRset ``to_wire()`` method.
218 Returns a ``str``.
219 """
221 s = io.StringIO()
222 s.write("id %d\n" % self.id)
223 s.write("opcode %s\n" % dns.opcode.to_text(self.opcode()))
224 s.write("rcode %s\n" % dns.rcode.to_text(self.rcode()))
225 s.write("flags %s\n" % dns.flags.to_text(self.flags))
226 if self.edns >= 0:
227 s.write("edns %s\n" % self.edns)
228 if self.ednsflags != 0:
229 s.write("eflags %s\n" % dns.flags.edns_to_text(self.ednsflags))
230 s.write("payload %d\n" % self.payload)
231 for opt in self.options:
232 s.write("option %s\n" % opt.to_text())
233 for name, which in self._section_enum.__members__.items():
234 s.write(f";{name}\n")
235 for rrset in self.section_from_number(which):
236 s.write(rrset.to_text(origin, relativize, **kw))
237 s.write("\n")
238 #
239 # We strip off the final \n so the caller can print the result without
240 # doing weird things to get around eccentricities in Python print
241 # formatting
242 #
243 return s.getvalue()[:-1]
245 def __eq__(self, other):
246 """Two messages are equal if they have the same content in the
247 header, question, answer, and authority sections.
249 Returns a ``bool``.
250 """
252 if not isinstance(other, Message):
253 return False
254 if self.id != other.id:
255 return False
256 if self.flags != other.flags:
257 return False
258 for i, section in enumerate(self.sections):
259 other_section = other.sections[i]
260 for n in section:
261 if n not in other_section:
262 return False
263 for n in other_section:
264 if n not in section:
265 return False
266 return True
268 def __ne__(self, other):
269 return not self.__eq__(other)
271 def is_response(self, other: "Message") -> bool:
272 """Is *other*, also a ``dns.message.Message``, a response to this
273 message?
275 Returns a ``bool``.
276 """
278 if (
279 other.flags & dns.flags.QR == 0
280 or self.id != other.id
281 or dns.opcode.from_flags(self.flags) != dns.opcode.from_flags(other.flags)
282 ):
283 return False
284 if other.rcode() in {
285 dns.rcode.FORMERR,
286 dns.rcode.SERVFAIL,
287 dns.rcode.NOTIMP,
288 dns.rcode.REFUSED,
289 }:
290 # We don't check the question section in these cases if
291 # the other question section is empty, even though they
292 # still really ought to have a question section.
293 if len(other.question) == 0:
294 return True
295 if dns.opcode.is_update(self.flags):
296 # This is assuming the "sender doesn't include anything
297 # from the update", but we don't care to check the other
298 # case, which is that all the sections are returned and
299 # identical.
300 return True
301 for n in self.question:
302 if n not in other.question:
303 return False
304 for n in other.question:
305 if n not in self.question:
306 return False
307 return True
309 def section_number(self, section: List[dns.rrset.RRset]) -> int:
310 """Return the "section number" of the specified section for use
311 in indexing.
313 *section* is one of the section attributes of this message.
315 Raises ``ValueError`` if the section isn't known.
317 Returns an ``int``.
318 """
320 for i, our_section in enumerate(self.sections):
321 if section is our_section:
322 return self._section_enum(i)
323 raise ValueError("unknown section")
325 def section_from_number(self, number: int) -> List[dns.rrset.RRset]:
326 """Return the section list associated with the specified section
327 number.
329 *number* is a section number `int` or the text form of a section
330 name.
332 Raises ``ValueError`` if the section isn't known.
334 Returns a ``list``.
335 """
337 section = self._section_enum.make(number)
338 return self.sections[section]
340 def find_rrset(
341 self,
342 section: SectionType,
343 name: dns.name.Name,
344 rdclass: dns.rdataclass.RdataClass,
345 rdtype: dns.rdatatype.RdataType,
346 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
347 deleting: Optional[dns.rdataclass.RdataClass] = None,
348 create: bool = False,
349 force_unique: bool = False,
350 idna_codec: Optional[dns.name.IDNACodec] = None,
351 ) -> dns.rrset.RRset:
352 """Find the RRset with the given attributes in the specified section.
354 *section*, an ``int`` section number, a ``str`` section name, or one of
355 the section attributes of this message. This specifies the
356 the section of the message to search. For example::
358 my_message.find_rrset(my_message.answer, name, rdclass, rdtype)
359 my_message.find_rrset(dns.message.ANSWER, name, rdclass, rdtype)
360 my_message.find_rrset("ANSWER", name, rdclass, rdtype)
362 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
364 *rdclass*, an ``int`` or ``str``, the class of the RRset.
366 *rdtype*, an ``int`` or ``str``, the type of the RRset.
368 *covers*, an ``int`` or ``str``, the covers value of the RRset.
369 The default is ``dns.rdatatype.NONE``.
371 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
372 RRset. The default is ``None``.
374 *create*, a ``bool``. If ``True``, create the RRset if it is not found.
375 The created RRset is appended to *section*.
377 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``,
378 create a new RRset regardless of whether a matching RRset exists
379 already. The default is ``False``. This is useful when creating
380 DDNS Update messages, as order matters for them.
382 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
383 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
384 is used.
386 Raises ``KeyError`` if the RRset was not found and create was
387 ``False``.
389 Returns a ``dns.rrset.RRset object``.
390 """
392 if isinstance(section, int):
393 section_number = section
394 section = self.section_from_number(section_number)
395 elif isinstance(section, str):
396 section_number = MessageSection.from_text(section)
397 section = self.section_from_number(section_number)
398 else:
399 section_number = self.section_number(section)
400 if isinstance(name, str):
401 name = dns.name.from_text(name, idna_codec=idna_codec)
402 rdtype = dns.rdatatype.RdataType.make(rdtype)
403 rdclass = dns.rdataclass.RdataClass.make(rdclass)
404 covers = dns.rdatatype.RdataType.make(covers)
405 if deleting is not None:
406 deleting = dns.rdataclass.RdataClass.make(deleting)
407 key = (section_number, name, rdclass, rdtype, covers, deleting)
408 if not force_unique:
409 if self.index is not None:
410 rrset = self.index.get(key)
411 if rrset is not None:
412 return rrset
413 else:
414 for rrset in section:
415 if rrset.full_match(name, rdclass, rdtype, covers, deleting):
416 return rrset
417 if not create:
418 raise KeyError
419 rrset = dns.rrset.RRset(name, rdclass, rdtype, covers, deleting)
420 section.append(rrset)
421 if self.index is not None:
422 self.index[key] = rrset
423 return rrset
425 def get_rrset(
426 self,
427 section: SectionType,
428 name: dns.name.Name,
429 rdclass: dns.rdataclass.RdataClass,
430 rdtype: dns.rdatatype.RdataType,
431 covers: dns.rdatatype.RdataType = dns.rdatatype.NONE,
432 deleting: Optional[dns.rdataclass.RdataClass] = None,
433 create: bool = False,
434 force_unique: bool = False,
435 idna_codec: Optional[dns.name.IDNACodec] = None,
436 ) -> Optional[dns.rrset.RRset]:
437 """Get the RRset with the given attributes in the specified section.
439 If the RRset is not found, None is returned.
441 *section*, an ``int`` section number, a ``str`` section name, or one of
442 the section attributes of this message. This specifies the
443 the section of the message to search. For example::
445 my_message.get_rrset(my_message.answer, name, rdclass, rdtype)
446 my_message.get_rrset(dns.message.ANSWER, name, rdclass, rdtype)
447 my_message.get_rrset("ANSWER", name, rdclass, rdtype)
449 *name*, a ``dns.name.Name`` or ``str``, the name of the RRset.
451 *rdclass*, an ``int`` or ``str``, the class of the RRset.
453 *rdtype*, an ``int`` or ``str``, the type of the RRset.
455 *covers*, an ``int`` or ``str``, the covers value of the RRset.
456 The default is ``dns.rdatatype.NONE``.
458 *deleting*, an ``int``, ``str``, or ``None``, the deleting value of the
459 RRset. The default is ``None``.
461 *create*, a ``bool``. If ``True``, create the RRset if it is not found.
462 The created RRset is appended to *section*.
464 *force_unique*, a ``bool``. If ``True`` and *create* is also ``True``,
465 create a new RRset regardless of whether a matching RRset exists
466 already. The default is ``False``. This is useful when creating
467 DDNS Update messages, as order matters for them.
469 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
470 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
471 is used.
473 Returns a ``dns.rrset.RRset object`` or ``None``.
474 """
476 try:
477 rrset = self.find_rrset(
478 section,
479 name,
480 rdclass,
481 rdtype,
482 covers,
483 deleting,
484 create,
485 force_unique,
486 idna_codec,
487 )
488 except KeyError:
489 rrset = None
490 return rrset
492 def _compute_opt_reserve(self) -> int:
493 """Compute the size required for the OPT RR, padding excluded"""
494 if not self.opt:
495 return 0
496 # 1 byte for the root name, 10 for the standard RR fields
497 size = 11
498 # This would be more efficient if options had a size() method, but we won't
499 # worry about that for now. We also don't worry if there is an existing padding
500 # option, as it is unlikely and probably harmless, as the worst case is that we
501 # may add another, and this seems to be legal.
502 for option in self.opt[0].options:
503 wire = option.to_wire()
504 # We add 4 here to account for the option type and length
505 size += len(wire) + 4
506 if self.pad:
507 # Padding will be added, so again add the option type and length.
508 size += 4
509 return size
511 def _compute_tsig_reserve(self) -> int:
512 """Compute the size required for the TSIG RR"""
513 # This would be more efficient if TSIGs had a size method, but we won't
514 # worry about for now. Also, we can't really cope with the potential
515 # compressibility of the TSIG owner name, so we estimate with the uncompressed
516 # size. We will disable compression when TSIG and padding are both is active
517 # so that the padding comes out right.
518 if not self.tsig:
519 return 0
520 f = io.BytesIO()
521 self.tsig.to_wire(f)
522 return len(f.getvalue())
524 def to_wire(
525 self,
526 origin: Optional[dns.name.Name] = None,
527 max_size: int = 0,
528 multi: bool = False,
529 tsig_ctx: Optional[Any] = None,
530 prepend_length: bool = False,
531 **kw: Dict[str, Any],
532 ) -> bytes:
533 """Return a string containing the message in DNS compressed wire
534 format.
536 Additional keyword arguments are passed to the RRset ``to_wire()``
537 method.
539 *origin*, a ``dns.name.Name`` or ``None``, the origin to be appended
540 to any relative names. If ``None``, and the message has an origin
541 attribute that is not ``None``, then it will be used.
543 *max_size*, an ``int``, the maximum size of the wire format
544 output; default is 0, which means "the message's request
545 payload, if nonzero, or 65535".
547 *multi*, a ``bool``, should be set to ``True`` if this message is
548 part of a multiple message sequence.
550 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the
551 ongoing TSIG context, used when signing zone transfers.
553 *prepend_length*, a ``bool``, should be set to ``True`` if the caller
554 wants the message length prepended to the message itself. This is
555 useful for messages sent over TCP, TLS (DoT), or QUIC (DoQ).
557 Raises ``dns.exception.TooBig`` if *max_size* was exceeded.
559 Returns a ``bytes``.
560 """
562 if origin is None and self.origin is not None:
563 origin = self.origin
564 if max_size == 0:
565 if self.request_payload != 0:
566 max_size = self.request_payload
567 else:
568 max_size = 65535
569 if max_size < 512:
570 max_size = 512
571 elif max_size > 65535:
572 max_size = 65535
573 r = dns.renderer.Renderer(self.id, self.flags, max_size, origin)
574 opt_reserve = self._compute_opt_reserve()
575 r.reserve(opt_reserve)
576 tsig_reserve = self._compute_tsig_reserve()
577 r.reserve(tsig_reserve)
578 for rrset in self.question:
579 r.add_question(rrset.name, rrset.rdtype, rrset.rdclass)
580 for rrset in self.answer:
581 r.add_rrset(dns.renderer.ANSWER, rrset, **kw)
582 for rrset in self.authority:
583 r.add_rrset(dns.renderer.AUTHORITY, rrset, **kw)
584 for rrset in self.additional:
585 r.add_rrset(dns.renderer.ADDITIONAL, rrset, **kw)
586 r.release_reserved()
587 if self.opt is not None:
588 r.add_opt(self.opt, self.pad, opt_reserve, tsig_reserve)
589 r.write_header()
590 if self.tsig is not None:
591 (new_tsig, ctx) = dns.tsig.sign(
592 r.get_wire(),
593 self.keyring,
594 self.tsig[0],
595 int(time.time()),
596 self.request_mac,
597 tsig_ctx,
598 multi,
599 )
600 self.tsig.clear()
601 self.tsig.add(new_tsig)
602 r.add_rrset(dns.renderer.ADDITIONAL, self.tsig)
603 r.write_header()
604 if multi:
605 self.tsig_ctx = ctx
606 wire = r.get_wire()
607 if prepend_length:
608 wire = len(wire).to_bytes(2, "big") + wire
609 return wire
611 @staticmethod
612 def _make_tsig(
613 keyname, algorithm, time_signed, fudge, mac, original_id, error, other
614 ):
615 tsig = dns.rdtypes.ANY.TSIG.TSIG(
616 dns.rdataclass.ANY,
617 dns.rdatatype.TSIG,
618 algorithm,
619 time_signed,
620 fudge,
621 mac,
622 original_id,
623 error,
624 other,
625 )
626 return dns.rrset.from_rdata(keyname, 0, tsig)
628 def use_tsig(
629 self,
630 keyring: Any,
631 keyname: Optional[Union[dns.name.Name, str]] = None,
632 fudge: int = 300,
633 original_id: Optional[int] = None,
634 tsig_error: int = 0,
635 other_data: bytes = b"",
636 algorithm: Union[dns.name.Name, str] = dns.tsig.default_algorithm,
637 ) -> None:
638 """When sending, a TSIG signature using the specified key
639 should be added.
641 *key*, a ``dns.tsig.Key`` is the key to use. If a key is specified,
642 the *keyring* and *algorithm* fields are not used.
644 *keyring*, a ``dict``, ``callable`` or ``dns.tsig.Key``, is either
645 the TSIG keyring or key to use.
647 The format of a keyring dict is a mapping from TSIG key name, as
648 ``dns.name.Name`` to ``dns.tsig.Key`` or a TSIG secret, a ``bytes``.
649 If a ``dict`` *keyring* is specified but a *keyname* is not, the key
650 used will be the first key in the *keyring*. Note that the order of
651 keys in a dictionary is not defined, so applications should supply a
652 keyname when a ``dict`` keyring is used, unless they know the keyring
653 contains only one key. If a ``callable`` keyring is specified, the
654 callable will be called with the message and the keyname, and is
655 expected to return a key.
657 *keyname*, a ``dns.name.Name``, ``str`` or ``None``, the name of
658 this TSIG key to use; defaults to ``None``. If *keyring* is a
659 ``dict``, the key must be defined in it. If *keyring* is a
660 ``dns.tsig.Key``, this is ignored.
662 *fudge*, an ``int``, the TSIG time fudge.
664 *original_id*, an ``int``, the TSIG original id. If ``None``,
665 the message's id is used.
667 *tsig_error*, an ``int``, the TSIG error code.
669 *other_data*, a ``bytes``, the TSIG other data.
671 *algorithm*, a ``dns.name.Name`` or ``str``, the TSIG algorithm to use. This is
672 only used if *keyring* is a ``dict``, and the key entry is a ``bytes``.
673 """
675 if isinstance(keyring, dns.tsig.Key):
676 key = keyring
677 keyname = key.name
678 elif callable(keyring):
679 key = keyring(self, keyname)
680 else:
681 if isinstance(keyname, str):
682 keyname = dns.name.from_text(keyname)
683 if keyname is None:
684 keyname = next(iter(keyring))
685 key = keyring[keyname]
686 if isinstance(key, bytes):
687 key = dns.tsig.Key(keyname, key, algorithm)
688 self.keyring = key
689 if original_id is None:
690 original_id = self.id
691 self.tsig = self._make_tsig(
692 keyname,
693 self.keyring.algorithm,
694 0,
695 fudge,
696 b"\x00" * dns.tsig.mac_sizes[self.keyring.algorithm],
697 original_id,
698 tsig_error,
699 other_data,
700 )
702 @property
703 def keyname(self) -> Optional[dns.name.Name]:
704 if self.tsig:
705 return self.tsig.name
706 else:
707 return None
709 @property
710 def keyalgorithm(self) -> Optional[dns.name.Name]:
711 if self.tsig:
712 return self.tsig[0].algorithm
713 else:
714 return None
716 @property
717 def mac(self) -> Optional[bytes]:
718 if self.tsig:
719 return self.tsig[0].mac
720 else:
721 return None
723 @property
724 def tsig_error(self) -> Optional[int]:
725 if self.tsig:
726 return self.tsig[0].error
727 else:
728 return None
730 @property
731 def had_tsig(self) -> bool:
732 return bool(self.tsig)
734 @staticmethod
735 def _make_opt(flags=0, payload=DEFAULT_EDNS_PAYLOAD, options=None):
736 opt = dns.rdtypes.ANY.OPT.OPT(payload, dns.rdatatype.OPT, options or ())
737 return dns.rrset.from_rdata(dns.name.root, int(flags), opt)
739 def use_edns(
740 self,
741 edns: Optional[Union[int, bool]] = 0,
742 ednsflags: int = 0,
743 payload: int = DEFAULT_EDNS_PAYLOAD,
744 request_payload: Optional[int] = None,
745 options: Optional[List[dns.edns.Option]] = None,
746 pad: int = 0,
747 ) -> None:
748 """Configure EDNS behavior.
750 *edns*, an ``int``, is the EDNS level to use. Specifying ``None``, ``False``,
751 or ``-1`` means "do not use EDNS", and in this case the other parameters are
752 ignored. Specifying ``True`` is equivalent to specifying 0, i.e. "use EDNS0".
754 *ednsflags*, an ``int``, the EDNS flag values.
756 *payload*, an ``int``, is the EDNS sender's payload field, which is the maximum
757 size of UDP datagram the sender can handle. I.e. how big a response to this
758 message can be.
760 *request_payload*, an ``int``, is the EDNS payload size to use when sending this
761 message. If not specified, defaults to the value of *payload*.
763 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS options.
765 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
766 padding bytes to make the message size a multiple of *pad*. Note that if
767 padding is non-zero, an EDNS PADDING option will always be added to the
768 message.
769 """
771 if edns is None or edns is False:
772 edns = -1
773 elif edns is True:
774 edns = 0
775 if edns < 0:
776 self.opt = None
777 self.request_payload = 0
778 else:
779 # make sure the EDNS version in ednsflags agrees with edns
780 ednsflags &= 0xFF00FFFF
781 ednsflags |= edns << 16
782 if options is None:
783 options = []
784 self.opt = self._make_opt(ednsflags, payload, options)
785 if request_payload is None:
786 request_payload = payload
787 self.request_payload = request_payload
788 self.pad = pad
790 @property
791 def edns(self) -> int:
792 if self.opt:
793 return (self.ednsflags & 0xFF0000) >> 16
794 else:
795 return -1
797 @property
798 def ednsflags(self) -> int:
799 if self.opt:
800 return self.opt.ttl
801 else:
802 return 0
804 @ednsflags.setter
805 def ednsflags(self, v):
806 if self.opt:
807 self.opt.ttl = v
808 elif v:
809 self.opt = self._make_opt(v)
811 @property
812 def payload(self) -> int:
813 if self.opt:
814 return self.opt[0].payload
815 else:
816 return 0
818 @property
819 def options(self) -> Tuple:
820 if self.opt:
821 return self.opt[0].options
822 else:
823 return ()
825 def want_dnssec(self, wanted: bool = True) -> None:
826 """Enable or disable 'DNSSEC desired' flag in requests.
828 *wanted*, a ``bool``. If ``True``, then DNSSEC data is
829 desired in the response, EDNS is enabled if required, and then
830 the DO bit is set. If ``False``, the DO bit is cleared if
831 EDNS is enabled.
832 """
834 if wanted:
835 self.ednsflags |= dns.flags.DO
836 elif self.opt:
837 self.ednsflags &= ~int(dns.flags.DO)
839 def rcode(self) -> dns.rcode.Rcode:
840 """Return the rcode.
842 Returns a ``dns.rcode.Rcode``.
843 """
844 return dns.rcode.from_flags(int(self.flags), int(self.ednsflags))
846 def set_rcode(self, rcode: dns.rcode.Rcode) -> None:
847 """Set the rcode.
849 *rcode*, a ``dns.rcode.Rcode``, is the rcode to set.
850 """
851 (value, evalue) = dns.rcode.to_flags(rcode)
852 self.flags &= 0xFFF0
853 self.flags |= value
854 self.ednsflags &= 0x00FFFFFF
855 self.ednsflags |= evalue
857 def opcode(self) -> dns.opcode.Opcode:
858 """Return the opcode.
860 Returns a ``dns.opcode.Opcode``.
861 """
862 return dns.opcode.from_flags(int(self.flags))
864 def set_opcode(self, opcode: dns.opcode.Opcode) -> None:
865 """Set the opcode.
867 *opcode*, a ``dns.opcode.Opcode``, is the opcode to set.
868 """
869 self.flags &= 0x87FF
870 self.flags |= dns.opcode.to_flags(opcode)
872 def _get_one_rr_per_rrset(self, value):
873 # What the caller picked is fine.
874 return value
876 # pylint: disable=unused-argument
878 def _parse_rr_header(self, section, name, rdclass, rdtype):
879 return (rdclass, rdtype, None, False)
881 # pylint: enable=unused-argument
883 def _parse_special_rr_header(self, section, count, position, name, rdclass, rdtype):
884 if rdtype == dns.rdatatype.OPT:
885 if (
886 section != MessageSection.ADDITIONAL
887 or self.opt
888 or name != dns.name.root
889 ):
890 raise BadEDNS
891 elif rdtype == dns.rdatatype.TSIG:
892 if (
893 section != MessageSection.ADDITIONAL
894 or rdclass != dns.rdatatype.ANY
895 or position != count - 1
896 ):
897 raise BadTSIG
898 return (rdclass, rdtype, None, False)
901class ChainingResult:
902 """The result of a call to dns.message.QueryMessage.resolve_chaining().
904 The ``answer`` attribute is the answer RRSet, or ``None`` if it doesn't
905 exist.
907 The ``canonical_name`` attribute is the canonical name after all
908 chaining has been applied (this is the same name as ``rrset.name`` in cases
909 where rrset is not ``None``).
911 The ``minimum_ttl`` attribute is the minimum TTL, i.e. the TTL to
912 use if caching the data. It is the smallest of all the CNAME TTLs
913 and either the answer TTL if it exists or the SOA TTL and SOA
914 minimum values for negative answers.
916 The ``cnames`` attribute is a list of all the CNAME RRSets followed to
917 get to the canonical name.
918 """
920 def __init__(
921 self,
922 canonical_name: dns.name.Name,
923 answer: Optional[dns.rrset.RRset],
924 minimum_ttl: int,
925 cnames: List[dns.rrset.RRset],
926 ):
927 self.canonical_name = canonical_name
928 self.answer = answer
929 self.minimum_ttl = minimum_ttl
930 self.cnames = cnames
933class QueryMessage(Message):
934 def resolve_chaining(self) -> ChainingResult:
935 """Follow the CNAME chain in the response to determine the answer
936 RRset.
938 Raises ``dns.message.NotQueryResponse`` if the message is not
939 a response.
941 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
943 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
944 but an answer was found.
946 Raises ``dns.exception.FormError`` if the question count is not 1.
948 Returns a ChainingResult object.
949 """
950 if self.flags & dns.flags.QR == 0:
951 raise NotQueryResponse
952 if len(self.question) != 1:
953 raise dns.exception.FormError
954 question = self.question[0]
955 qname = question.name
956 min_ttl = dns.ttl.MAX_TTL
957 answer = None
958 count = 0
959 cnames = []
960 while count < MAX_CHAIN:
961 try:
962 answer = self.find_rrset(
963 self.answer, qname, question.rdclass, question.rdtype
964 )
965 min_ttl = min(min_ttl, answer.ttl)
966 break
967 except KeyError:
968 if question.rdtype != dns.rdatatype.CNAME:
969 try:
970 crrset = self.find_rrset(
971 self.answer, qname, question.rdclass, dns.rdatatype.CNAME
972 )
973 cnames.append(crrset)
974 min_ttl = min(min_ttl, crrset.ttl)
975 for rd in crrset:
976 qname = rd.target
977 break
978 count += 1
979 continue
980 except KeyError:
981 # Exit the chaining loop
982 break
983 else:
984 # Exit the chaining loop
985 break
986 if count >= MAX_CHAIN:
987 raise ChainTooLong
988 if self.rcode() == dns.rcode.NXDOMAIN and answer is not None:
989 raise AnswerForNXDOMAIN
990 if answer is None:
991 # Further minimize the TTL with NCACHE.
992 auname = qname
993 while True:
994 # Look for an SOA RR whose owner name is a superdomain
995 # of qname.
996 try:
997 srrset = self.find_rrset(
998 self.authority, auname, question.rdclass, dns.rdatatype.SOA
999 )
1000 min_ttl = min(min_ttl, srrset.ttl, srrset[0].minimum)
1001 break
1002 except KeyError:
1003 try:
1004 auname = auname.parent()
1005 except dns.name.NoParent:
1006 break
1007 return ChainingResult(qname, answer, min_ttl, cnames)
1009 def canonical_name(self) -> dns.name.Name:
1010 """Return the canonical name of the first name in the question
1011 section.
1013 Raises ``dns.message.NotQueryResponse`` if the message is not
1014 a response.
1016 Raises ``dns.message.ChainTooLong`` if the CNAME chain is too long.
1018 Raises ``dns.message.AnswerForNXDOMAIN`` if the rcode is NXDOMAIN
1019 but an answer was found.
1021 Raises ``dns.exception.FormError`` if the question count is not 1.
1022 """
1023 return self.resolve_chaining().canonical_name
1026def _maybe_import_update():
1027 # We avoid circular imports by doing this here. We do it in another
1028 # function as doing it in _message_factory_from_opcode() makes "dns"
1029 # a local symbol, and the first line fails :)
1031 # pylint: disable=redefined-outer-name,import-outside-toplevel,unused-import
1032 import dns.update # noqa: F401
1035def _message_factory_from_opcode(opcode):
1036 if opcode == dns.opcode.QUERY:
1037 return QueryMessage
1038 elif opcode == dns.opcode.UPDATE:
1039 _maybe_import_update()
1040 return dns.update.UpdateMessage
1041 else:
1042 return Message
1045class _WireReader:
1047 """Wire format reader.
1049 parser: the binary parser
1050 message: The message object being built
1051 initialize_message: Callback to set message parsing options
1052 question_only: Are we only reading the question?
1053 one_rr_per_rrset: Put each RR into its own RRset?
1054 keyring: TSIG keyring
1055 ignore_trailing: Ignore trailing junk at end of request?
1056 multi: Is this message part of a multi-message sequence?
1057 DNS dynamic updates.
1058 continue_on_error: try to extract as much information as possible from
1059 the message, accumulating MessageErrors in the *errors* attribute instead of
1060 raising them.
1061 """
1063 def __init__(
1064 self,
1065 wire,
1066 initialize_message,
1067 question_only=False,
1068 one_rr_per_rrset=False,
1069 ignore_trailing=False,
1070 keyring=None,
1071 multi=False,
1072 continue_on_error=False,
1073 ):
1074 self.parser = dns.wire.Parser(wire)
1075 self.message = None
1076 self.initialize_message = initialize_message
1077 self.question_only = question_only
1078 self.one_rr_per_rrset = one_rr_per_rrset
1079 self.ignore_trailing = ignore_trailing
1080 self.keyring = keyring
1081 self.multi = multi
1082 self.continue_on_error = continue_on_error
1083 self.errors = []
1085 def _get_question(self, section_number, qcount):
1086 """Read the next *qcount* records from the wire data and add them to
1087 the question section.
1088 """
1089 assert self.message is not None
1090 section = self.message.sections[section_number]
1091 for _ in range(qcount):
1092 qname = self.parser.get_name(self.message.origin)
1093 (rdtype, rdclass) = self.parser.get_struct("!HH")
1094 (rdclass, rdtype, _, _) = self.message._parse_rr_header(
1095 section_number, qname, rdclass, rdtype
1096 )
1097 self.message.find_rrset(
1098 section, qname, rdclass, rdtype, create=True, force_unique=True
1099 )
1101 def _add_error(self, e):
1102 self.errors.append(MessageError(e, self.parser.current))
1104 def _get_section(self, section_number, count):
1105 """Read the next I{count} records from the wire data and add them to
1106 the specified section.
1108 section_number: the section of the message to which to add records
1109 count: the number of records to read
1110 """
1111 assert self.message is not None
1112 section = self.message.sections[section_number]
1113 force_unique = self.one_rr_per_rrset
1114 for i in range(count):
1115 rr_start = self.parser.current
1116 absolute_name = self.parser.get_name()
1117 if self.message.origin is not None:
1118 name = absolute_name.relativize(self.message.origin)
1119 else:
1120 name = absolute_name
1121 (rdtype, rdclass, ttl, rdlen) = self.parser.get_struct("!HHIH")
1122 if rdtype in (dns.rdatatype.OPT, dns.rdatatype.TSIG):
1123 (
1124 rdclass,
1125 rdtype,
1126 deleting,
1127 empty,
1128 ) = self.message._parse_special_rr_header(
1129 section_number, count, i, name, rdclass, rdtype
1130 )
1131 else:
1132 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
1133 section_number, name, rdclass, rdtype
1134 )
1135 rdata_start = self.parser.current
1136 try:
1137 if empty:
1138 if rdlen > 0:
1139 raise dns.exception.FormError
1140 rd = None
1141 covers = dns.rdatatype.NONE
1142 else:
1143 with self.parser.restrict_to(rdlen):
1144 rd = dns.rdata.from_wire_parser(
1145 rdclass, rdtype, self.parser, self.message.origin
1146 )
1147 covers = rd.covers()
1148 if self.message.xfr and rdtype == dns.rdatatype.SOA:
1149 force_unique = True
1150 if rdtype == dns.rdatatype.OPT:
1151 self.message.opt = dns.rrset.from_rdata(name, ttl, rd)
1152 elif rdtype == dns.rdatatype.TSIG:
1153 if self.keyring is None:
1154 raise UnknownTSIGKey("got signed message without keyring")
1155 if isinstance(self.keyring, dict):
1156 key = self.keyring.get(absolute_name)
1157 if isinstance(key, bytes):
1158 key = dns.tsig.Key(absolute_name, key, rd.algorithm)
1159 elif callable(self.keyring):
1160 key = self.keyring(self.message, absolute_name)
1161 else:
1162 key = self.keyring
1163 if key is None:
1164 raise UnknownTSIGKey("key '%s' unknown" % name)
1165 self.message.keyring = key
1166 self.message.tsig_ctx = dns.tsig.validate(
1167 self.parser.wire,
1168 key,
1169 absolute_name,
1170 rd,
1171 int(time.time()),
1172 self.message.request_mac,
1173 rr_start,
1174 self.message.tsig_ctx,
1175 self.multi,
1176 )
1177 self.message.tsig = dns.rrset.from_rdata(absolute_name, 0, rd)
1178 else:
1179 rrset = self.message.find_rrset(
1180 section,
1181 name,
1182 rdclass,
1183 rdtype,
1184 covers,
1185 deleting,
1186 True,
1187 force_unique,
1188 )
1189 if rd is not None:
1190 if ttl > 0x7FFFFFFF:
1191 ttl = 0
1192 rrset.add(rd, ttl)
1193 except Exception as e:
1194 if self.continue_on_error:
1195 self._add_error(e)
1196 self.parser.seek(rdata_start + rdlen)
1197 else:
1198 raise
1200 def read(self):
1201 """Read a wire format DNS message and build a dns.message.Message
1202 object."""
1204 if self.parser.remaining() < 12:
1205 raise ShortHeader
1206 (id, flags, qcount, ancount, aucount, adcount) = self.parser.get_struct(
1207 "!HHHHHH"
1208 )
1209 factory = _message_factory_from_opcode(dns.opcode.from_flags(flags))
1210 self.message = factory(id=id)
1211 self.message.flags = dns.flags.Flag(flags)
1212 self.initialize_message(self.message)
1213 self.one_rr_per_rrset = self.message._get_one_rr_per_rrset(
1214 self.one_rr_per_rrset
1215 )
1216 try:
1217 self._get_question(MessageSection.QUESTION, qcount)
1218 if self.question_only:
1219 return self.message
1220 self._get_section(MessageSection.ANSWER, ancount)
1221 self._get_section(MessageSection.AUTHORITY, aucount)
1222 self._get_section(MessageSection.ADDITIONAL, adcount)
1223 if not self.ignore_trailing and self.parser.remaining() != 0:
1224 raise TrailingJunk
1225 if self.multi and self.message.tsig_ctx and not self.message.had_tsig:
1226 self.message.tsig_ctx.update(self.parser.wire)
1227 except Exception as e:
1228 if self.continue_on_error:
1229 self._add_error(e)
1230 else:
1231 raise
1232 return self.message
1235def from_wire(
1236 wire: bytes,
1237 keyring: Optional[Any] = None,
1238 request_mac: Optional[bytes] = b"",
1239 xfr: bool = False,
1240 origin: Optional[dns.name.Name] = None,
1241 tsig_ctx: Optional[Union[dns.tsig.HMACTSig, dns.tsig.GSSTSig]] = None,
1242 multi: bool = False,
1243 question_only: bool = False,
1244 one_rr_per_rrset: bool = False,
1245 ignore_trailing: bool = False,
1246 raise_on_truncation: bool = False,
1247 continue_on_error: bool = False,
1248) -> Message:
1249 """Convert a DNS wire format message into a message object.
1251 *keyring*, a ``dns.tsig.Key`` or ``dict``, the key or keyring to use if the message
1252 is signed.
1254 *request_mac*, a ``bytes`` or ``None``. If the message is a response to a
1255 TSIG-signed request, *request_mac* should be set to the MAC of that request.
1257 *xfr*, a ``bool``, should be set to ``True`` if this message is part of a zone
1258 transfer.
1260 *origin*, a ``dns.name.Name`` or ``None``. If the message is part of a zone
1261 transfer, *origin* should be the origin name of the zone. If not ``None``, names
1262 will be relativized to the origin.
1264 *tsig_ctx*, a ``dns.tsig.HMACTSig`` or ``dns.tsig.GSSTSig`` object, the ongoing TSIG
1265 context, used when validating zone transfers.
1267 *multi*, a ``bool``, should be set to ``True`` if this message is part of a multiple
1268 message sequence.
1270 *question_only*, a ``bool``. If ``True``, read only up to the end of the question
1271 section.
1273 *one_rr_per_rrset*, a ``bool``. If ``True``, put each RR into its own RRset.
1275 *ignore_trailing*, a ``bool``. If ``True``, ignore trailing junk at end of the
1276 message.
1278 *raise_on_truncation*, a ``bool``. If ``True``, raise an exception if the TC bit is
1279 set.
1281 *continue_on_error*, a ``bool``. If ``True``, try to continue parsing even if
1282 errors occur. Erroneous rdata will be ignored. Errors will be accumulated as a
1283 list of MessageError objects in the message's ``errors`` attribute. This option is
1284 recommended only for DNS analysis tools, or for use in a server as part of an error
1285 handling path. The default is ``False``.
1287 Raises ``dns.message.ShortHeader`` if the message is less than 12 octets long.
1289 Raises ``dns.message.TrailingJunk`` if there were octets in the message past the end
1290 of the proper DNS message, and *ignore_trailing* is ``False``.
1292 Raises ``dns.message.BadEDNS`` if an OPT record was in the wrong section, or
1293 occurred more than once.
1295 Raises ``dns.message.BadTSIG`` if a TSIG record was not the last record of the
1296 additional data section.
1298 Raises ``dns.message.Truncated`` if the TC flag is set and *raise_on_truncation* is
1299 ``True``.
1301 Returns a ``dns.message.Message``.
1302 """
1304 # We permit None for request_mac solely for backwards compatibility
1305 if request_mac is None:
1306 request_mac = b""
1308 def initialize_message(message):
1309 message.request_mac = request_mac
1310 message.xfr = xfr
1311 message.origin = origin
1312 message.tsig_ctx = tsig_ctx
1314 reader = _WireReader(
1315 wire,
1316 initialize_message,
1317 question_only,
1318 one_rr_per_rrset,
1319 ignore_trailing,
1320 keyring,
1321 multi,
1322 continue_on_error,
1323 )
1324 try:
1325 m = reader.read()
1326 except dns.exception.FormError:
1327 if (
1328 reader.message
1329 and (reader.message.flags & dns.flags.TC)
1330 and raise_on_truncation
1331 ):
1332 raise Truncated(message=reader.message)
1333 else:
1334 raise
1335 # Reading a truncated message might not have any errors, so we
1336 # have to do this check here too.
1337 if m.flags & dns.flags.TC and raise_on_truncation:
1338 raise Truncated(message=m)
1339 if continue_on_error:
1340 m.errors = reader.errors
1342 return m
1345class _TextReader:
1347 """Text format reader.
1349 tok: the tokenizer.
1350 message: The message object being built.
1351 DNS dynamic updates.
1352 last_name: The most recently read name when building a message object.
1353 one_rr_per_rrset: Put each RR into its own RRset?
1354 origin: The origin for relative names
1355 relativize: relativize names?
1356 relativize_to: the origin to relativize to.
1357 """
1359 def __init__(
1360 self,
1361 text,
1362 idna_codec,
1363 one_rr_per_rrset=False,
1364 origin=None,
1365 relativize=True,
1366 relativize_to=None,
1367 ):
1368 self.message = None
1369 self.tok = dns.tokenizer.Tokenizer(text, idna_codec=idna_codec)
1370 self.last_name = None
1371 self.one_rr_per_rrset = one_rr_per_rrset
1372 self.origin = origin
1373 self.relativize = relativize
1374 self.relativize_to = relativize_to
1375 self.id = None
1376 self.edns = -1
1377 self.ednsflags = 0
1378 self.payload = DEFAULT_EDNS_PAYLOAD
1379 self.rcode = None
1380 self.opcode = dns.opcode.QUERY
1381 self.flags = 0
1383 def _header_line(self, _):
1384 """Process one line from the text format header section."""
1386 token = self.tok.get()
1387 what = token.value
1388 if what == "id":
1389 self.id = self.tok.get_int()
1390 elif what == "flags":
1391 while True:
1392 token = self.tok.get()
1393 if not token.is_identifier():
1394 self.tok.unget(token)
1395 break
1396 self.flags = self.flags | dns.flags.from_text(token.value)
1397 elif what == "edns":
1398 self.edns = self.tok.get_int()
1399 self.ednsflags = self.ednsflags | (self.edns << 16)
1400 elif what == "eflags":
1401 if self.edns < 0:
1402 self.edns = 0
1403 while True:
1404 token = self.tok.get()
1405 if not token.is_identifier():
1406 self.tok.unget(token)
1407 break
1408 self.ednsflags = self.ednsflags | dns.flags.edns_from_text(token.value)
1409 elif what == "payload":
1410 self.payload = self.tok.get_int()
1411 if self.edns < 0:
1412 self.edns = 0
1413 elif what == "opcode":
1414 text = self.tok.get_string()
1415 self.opcode = dns.opcode.from_text(text)
1416 self.flags = self.flags | dns.opcode.to_flags(self.opcode)
1417 elif what == "rcode":
1418 text = self.tok.get_string()
1419 self.rcode = dns.rcode.from_text(text)
1420 else:
1421 raise UnknownHeaderField
1422 self.tok.get_eol()
1424 def _question_line(self, section_number):
1425 """Process one line from the text format question section."""
1427 section = self.message.sections[section_number]
1428 token = self.tok.get(want_leading=True)
1429 if not token.is_whitespace():
1430 self.last_name = self.tok.as_name(
1431 token, self.message.origin, self.relativize, self.relativize_to
1432 )
1433 name = self.last_name
1434 if name is None:
1435 raise NoPreviousName
1436 token = self.tok.get()
1437 if not token.is_identifier():
1438 raise dns.exception.SyntaxError
1439 # Class
1440 try:
1441 rdclass = dns.rdataclass.from_text(token.value)
1442 token = self.tok.get()
1443 if not token.is_identifier():
1444 raise dns.exception.SyntaxError
1445 except dns.exception.SyntaxError:
1446 raise dns.exception.SyntaxError
1447 except Exception:
1448 rdclass = dns.rdataclass.IN
1449 # Type
1450 rdtype = dns.rdatatype.from_text(token.value)
1451 (rdclass, rdtype, _, _) = self.message._parse_rr_header(
1452 section_number, name, rdclass, rdtype
1453 )
1454 self.message.find_rrset(
1455 section, name, rdclass, rdtype, create=True, force_unique=True
1456 )
1457 self.tok.get_eol()
1459 def _rr_line(self, section_number):
1460 """Process one line from the text format answer, authority, or
1461 additional data sections.
1462 """
1464 section = self.message.sections[section_number]
1465 # Name
1466 token = self.tok.get(want_leading=True)
1467 if not token.is_whitespace():
1468 self.last_name = self.tok.as_name(
1469 token, self.message.origin, self.relativize, self.relativize_to
1470 )
1471 name = self.last_name
1472 if name is None:
1473 raise NoPreviousName
1474 token = self.tok.get()
1475 if not token.is_identifier():
1476 raise dns.exception.SyntaxError
1477 # TTL
1478 try:
1479 ttl = int(token.value, 0)
1480 token = self.tok.get()
1481 if not token.is_identifier():
1482 raise dns.exception.SyntaxError
1483 except dns.exception.SyntaxError:
1484 raise dns.exception.SyntaxError
1485 except Exception:
1486 ttl = 0
1487 # Class
1488 try:
1489 rdclass = dns.rdataclass.from_text(token.value)
1490 token = self.tok.get()
1491 if not token.is_identifier():
1492 raise dns.exception.SyntaxError
1493 except dns.exception.SyntaxError:
1494 raise dns.exception.SyntaxError
1495 except Exception:
1496 rdclass = dns.rdataclass.IN
1497 # Type
1498 rdtype = dns.rdatatype.from_text(token.value)
1499 (rdclass, rdtype, deleting, empty) = self.message._parse_rr_header(
1500 section_number, name, rdclass, rdtype
1501 )
1502 token = self.tok.get()
1503 if empty and not token.is_eol_or_eof():
1504 raise dns.exception.SyntaxError
1505 if not empty and token.is_eol_or_eof():
1506 raise dns.exception.UnexpectedEnd
1507 if not token.is_eol_or_eof():
1508 self.tok.unget(token)
1509 rd = dns.rdata.from_text(
1510 rdclass,
1511 rdtype,
1512 self.tok,
1513 self.message.origin,
1514 self.relativize,
1515 self.relativize_to,
1516 )
1517 covers = rd.covers()
1518 else:
1519 rd = None
1520 covers = dns.rdatatype.NONE
1521 rrset = self.message.find_rrset(
1522 section,
1523 name,
1524 rdclass,
1525 rdtype,
1526 covers,
1527 deleting,
1528 True,
1529 self.one_rr_per_rrset,
1530 )
1531 if rd is not None:
1532 rrset.add(rd, ttl)
1534 def _make_message(self):
1535 factory = _message_factory_from_opcode(self.opcode)
1536 message = factory(id=self.id)
1537 message.flags = self.flags
1538 if self.edns >= 0:
1539 message.use_edns(self.edns, self.ednsflags, self.payload)
1540 if self.rcode:
1541 message.set_rcode(self.rcode)
1542 if self.origin:
1543 message.origin = self.origin
1544 return message
1546 def read(self):
1547 """Read a text format DNS message and build a dns.message.Message
1548 object."""
1550 line_method = self._header_line
1551 section_number = None
1552 while 1:
1553 token = self.tok.get(True, True)
1554 if token.is_eol_or_eof():
1555 break
1556 if token.is_comment():
1557 u = token.value.upper()
1558 if u == "HEADER":
1559 line_method = self._header_line
1561 if self.message:
1562 message = self.message
1563 else:
1564 # If we don't have a message, create one with the current
1565 # opcode, so that we know which section names to parse.
1566 message = self._make_message()
1567 try:
1568 section_number = message._section_enum.from_text(u)
1569 # We found a section name. If we don't have a message,
1570 # use the one we just created.
1571 if not self.message:
1572 self.message = message
1573 self.one_rr_per_rrset = message._get_one_rr_per_rrset(
1574 self.one_rr_per_rrset
1575 )
1576 if section_number == MessageSection.QUESTION:
1577 line_method = self._question_line
1578 else:
1579 line_method = self._rr_line
1580 except Exception:
1581 # It's just a comment.
1582 pass
1583 self.tok.get_eol()
1584 continue
1585 self.tok.unget(token)
1586 line_method(section_number)
1587 if not self.message:
1588 self.message = self._make_message()
1589 return self.message
1592def from_text(
1593 text: str,
1594 idna_codec: Optional[dns.name.IDNACodec] = None,
1595 one_rr_per_rrset: bool = False,
1596 origin: Optional[dns.name.Name] = None,
1597 relativize: bool = True,
1598 relativize_to: Optional[dns.name.Name] = None,
1599) -> Message:
1600 """Convert the text format message into a message object.
1602 The reader stops after reading the first blank line in the input to
1603 facilitate reading multiple messages from a single file with
1604 ``dns.message.from_file()``.
1606 *text*, a ``str``, the text format message.
1608 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1609 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1610 is used.
1612 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put
1613 into its own rrset. The default is ``False``.
1615 *origin*, a ``dns.name.Name`` (or ``None``), the
1616 origin to use for relative names.
1618 *relativize*, a ``bool``. If true, name will be relativized.
1620 *relativize_to*, a ``dns.name.Name`` (or ``None``), the origin to use
1621 when relativizing names. If not set, the *origin* value will be used.
1623 Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
1625 Raises ``dns.exception.SyntaxError`` if the text is badly formed.
1627 Returns a ``dns.message.Message object``
1628 """
1630 # 'text' can also be a file, but we don't publish that fact
1631 # since it's an implementation detail. The official file
1632 # interface is from_file().
1634 reader = _TextReader(
1635 text, idna_codec, one_rr_per_rrset, origin, relativize, relativize_to
1636 )
1637 return reader.read()
1640def from_file(
1641 f: Any,
1642 idna_codec: Optional[dns.name.IDNACodec] = None,
1643 one_rr_per_rrset: bool = False,
1644) -> Message:
1645 """Read the next text format message from the specified file.
1647 Message blocks are separated by a single blank line.
1649 *f*, a ``file`` or ``str``. If *f* is text, it is treated as the
1650 pathname of a file to open.
1652 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1653 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1654 is used.
1656 *one_rr_per_rrset*, a ``bool``. If ``True``, then each RR is put
1657 into its own rrset. The default is ``False``.
1659 Raises ``dns.message.UnknownHeaderField`` if a header is unknown.
1661 Raises ``dns.exception.SyntaxError`` if the text is badly formed.
1663 Returns a ``dns.message.Message object``
1664 """
1666 if isinstance(f, str):
1667 cm: contextlib.AbstractContextManager = open(f)
1668 else:
1669 cm = contextlib.nullcontext(f)
1670 with cm as f:
1671 return from_text(f, idna_codec, one_rr_per_rrset)
1672 assert False # for mypy lgtm[py/unreachable-statement]
1675def make_query(
1676 qname: Union[dns.name.Name, str],
1677 rdtype: Union[dns.rdatatype.RdataType, str],
1678 rdclass: Union[dns.rdataclass.RdataClass, str] = dns.rdataclass.IN,
1679 use_edns: Optional[Union[int, bool]] = None,
1680 want_dnssec: bool = False,
1681 ednsflags: Optional[int] = None,
1682 payload: Optional[int] = None,
1683 request_payload: Optional[int] = None,
1684 options: Optional[List[dns.edns.Option]] = None,
1685 idna_codec: Optional[dns.name.IDNACodec] = None,
1686 id: Optional[int] = None,
1687 flags: int = dns.flags.RD,
1688 pad: int = 0,
1689) -> QueryMessage:
1690 """Make a query message.
1692 The query name, type, and class may all be specified either
1693 as objects of the appropriate type, or as strings.
1695 The query will have a randomly chosen query id, and its DNS flags
1696 will be set to dns.flags.RD.
1698 qname, a ``dns.name.Name`` or ``str``, the query name.
1700 *rdtype*, an ``int`` or ``str``, the desired rdata type.
1702 *rdclass*, an ``int`` or ``str``, the desired rdata class; the default
1703 is class IN.
1705 *use_edns*, an ``int``, ``bool`` or ``None``. The EDNS level to use; the
1706 default is ``None``. If ``None``, EDNS will be enabled only if other
1707 parameters (*ednsflags*, *payload*, *request_payload*, or *options*) are
1708 set.
1709 See the description of dns.message.Message.use_edns() for the possible
1710 values for use_edns and their meanings.
1712 *want_dnssec*, a ``bool``. If ``True``, DNSSEC data is desired.
1714 *ednsflags*, an ``int``, the EDNS flag values.
1716 *payload*, an ``int``, is the EDNS sender's payload field, which is the
1717 maximum size of UDP datagram the sender can handle. I.e. how big
1718 a response to this message can be.
1720 *request_payload*, an ``int``, is the EDNS payload size to use when
1721 sending this message. If not specified, defaults to the value of
1722 *payload*.
1724 *options*, a list of ``dns.edns.Option`` objects or ``None``, the EDNS
1725 options.
1727 *idna_codec*, a ``dns.name.IDNACodec``, specifies the IDNA
1728 encoder/decoder. If ``None``, the default IDNA 2003 encoder/decoder
1729 is used.
1731 *id*, an ``int`` or ``None``, the desired query id. The default is
1732 ``None``, which generates a random query id.
1734 *flags*, an ``int``, the desired query flags. The default is
1735 ``dns.flags.RD``.
1737 *pad*, a non-negative ``int``. If 0, the default, do not pad; otherwise add
1738 padding bytes to make the message size a multiple of *pad*. Note that if
1739 padding is non-zero, an EDNS PADDING option will always be added to the
1740 message.
1742 Returns a ``dns.message.QueryMessage``
1743 """
1745 if isinstance(qname, str):
1746 qname = dns.name.from_text(qname, idna_codec=idna_codec)
1747 rdtype = dns.rdatatype.RdataType.make(rdtype)
1748 rdclass = dns.rdataclass.RdataClass.make(rdclass)
1749 m = QueryMessage(id=id)
1750 m.flags = dns.flags.Flag(flags)
1751 m.find_rrset(m.question, qname, rdclass, rdtype, create=True, force_unique=True)
1752 # only pass keywords on to use_edns if they have been set to a
1753 # non-None value. Setting a field will turn EDNS on if it hasn't
1754 # been configured.
1755 kwargs: Dict[str, Any] = {}
1756 if ednsflags is not None:
1757 kwargs["ednsflags"] = ednsflags
1758 if payload is not None:
1759 kwargs["payload"] = payload
1760 if request_payload is not None:
1761 kwargs["request_payload"] = request_payload
1762 if options is not None:
1763 kwargs["options"] = options
1764 if kwargs and use_edns is None:
1765 use_edns = 0
1766 kwargs["edns"] = use_edns
1767 kwargs["pad"] = pad
1768 m.use_edns(**kwargs)
1769 m.want_dnssec(want_dnssec)
1770 return m
1773def make_response(
1774 query: Message,
1775 recursion_available: bool = False,
1776 our_payload: int = 8192,
1777 fudge: int = 300,
1778 tsig_error: int = 0,
1779) -> Message:
1780 """Make a message which is a response for the specified query.
1781 The message returned is really a response skeleton; it has all
1782 of the infrastructure required of a response, but none of the
1783 content.
1785 The response's question section is a shallow copy of the query's
1786 question section, so the query's question RRsets should not be
1787 changed.
1789 *query*, a ``dns.message.Message``, the query to respond to.
1791 *recursion_available*, a ``bool``, should RA be set in the response?
1793 *our_payload*, an ``int``, the payload size to advertise in EDNS
1794 responses.
1796 *fudge*, an ``int``, the TSIG time fudge.
1798 *tsig_error*, an ``int``, the TSIG error.
1800 Returns a ``dns.message.Message`` object whose specific class is
1801 appropriate for the query. For example, if query is a
1802 ``dns.update.UpdateMessage``, response will be too.
1803 """
1805 if query.flags & dns.flags.QR:
1806 raise dns.exception.FormError("specified query message is not a query")
1807 factory = _message_factory_from_opcode(query.opcode())
1808 response = factory(id=query.id)
1809 response.flags = dns.flags.QR | (query.flags & dns.flags.RD)
1810 if recursion_available:
1811 response.flags |= dns.flags.RA
1812 response.set_opcode(query.opcode())
1813 response.question = list(query.question)
1814 if query.edns >= 0:
1815 response.use_edns(0, 0, our_payload, query.payload)
1816 if query.had_tsig:
1817 response.use_tsig(
1818 query.keyring,
1819 query.keyname,
1820 fudge,
1821 None,
1822 tsig_error,
1823 b"",
1824 query.keyalgorithm,
1825 )
1826 response.request_mac = query.mac
1827 return response
1830### BEGIN generated MessageSection constants
1832QUESTION = MessageSection.QUESTION
1833ANSWER = MessageSection.ANSWER
1834AUTHORITY = MessageSection.AUTHORITY
1835ADDITIONAL = MessageSection.ADDITIONAL
1837### END generated MessageSection constants