Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/dns.py: 48%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7DNS: Domain Name System.
8"""
10import abc
11import operator
12import itertools
13import socket
14import struct
15import time
16import warnings
18from scapy.arch import (
19 get_if_addr,
20 get_if_addr6,
21 read_nameservers,
22)
23from scapy.ansmachine import AnsweringMachine
24from scapy.base_classes import Net
25from scapy.config import conf
26from scapy.compat import orb, raw, chb, bytes_encode, plain_str
27from scapy.error import log_runtime, warning, Scapy_Exception
28from scapy.packet import Packet, bind_layers, Raw
29from scapy.fields import (
30 BitEnumField,
31 BitField,
32 ByteEnumField,
33 ByteField,
34 ConditionalField,
35 Field,
36 FieldLenField,
37 FieldListField,
38 FlagsField,
39 I,
40 IP6Field,
41 IntField,
42 MultipleTypeField,
43 PacketListField,
44 ShortEnumField,
45 ShortField,
46 StrField,
47 StrLenField,
48 UTCTimeField,
49 XStrFixedLenField,
50 XStrLenField,
51)
52from scapy.sendrecv import sr1
53from scapy.supersocket import StreamSocket
54from scapy.pton_ntop import inet_ntop, inet_pton
55from scapy.volatile import RandShort
57from scapy.layers.l2 import Ether
58from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP
60from typing import (
61 Any,
62 List,
63 Optional,
64 Tuple,
65 Type,
66 Union,
67)
70# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-4
71dnstypes = {
72 0: "ANY",
73 1: "A", 2: "NS", 3: "MD", 4: "MF", 5: "CNAME", 6: "SOA", 7: "MB", 8: "MG",
74 9: "MR", 10: "NULL", 11: "WKS", 12: "PTR", 13: "HINFO", 14: "MINFO",
75 15: "MX", 16: "TXT", 17: "RP", 18: "AFSDB", 19: "X25", 20: "ISDN",
76 21: "RT", 22: "NSAP", 23: "NSAP-PTR", 24: "SIG", 25: "KEY", 26: "PX",
77 27: "GPOS", 28: "AAAA", 29: "LOC", 30: "NXT", 31: "EID", 32: "NIMLOC",
78 33: "SRV", 34: "ATMA", 35: "NAPTR", 36: "KX", 37: "CERT", 38: "A6",
79 39: "DNAME", 40: "SINK", 41: "OPT", 42: "APL", 43: "DS", 44: "SSHFP",
80 45: "IPSECKEY", 46: "RRSIG", 47: "NSEC", 48: "DNSKEY", 49: "DHCID",
81 50: "NSEC3", 51: "NSEC3PARAM", 52: "TLSA", 53: "SMIMEA", 55: "HIP",
82 56: "NINFO", 57: "RKEY", 58: "TALINK", 59: "CDS", 60: "CDNSKEY",
83 61: "OPENPGPKEY", 62: "CSYNC", 63: "ZONEMD", 64: "SVCB", 65: "HTTPS",
84 99: "SPF", 100: "UINFO", 101: "UID", 102: "GID", 103: "UNSPEC", 104: "NID",
85 105: "L32", 106: "L64", 107: "LP", 108: "EUI48", 109: "EUI64", 249: "TKEY",
86 250: "TSIG", 256: "URI", 257: "CAA", 258: "AVC", 259: "DOA",
87 260: "AMTRELAY", 32768: "TA", 32769: "DLV", 65535: "RESERVED"
88}
91dnsqtypes = {251: "IXFR", 252: "AXFR", 253: "MAILB", 254: "MAILA", 255: "ALL"}
92dnsqtypes.update(dnstypes)
93dnsclasses = {1: 'IN', 2: 'CS', 3: 'CH', 4: 'HS', 255: 'ANY'}
96# 12/2023 from https://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xhtml # noqa: E501
97dnssecalgotypes = {0: "Reserved", 1: "RSA/MD5", 2: "Diffie-Hellman", 3: "DSA/SHA-1", # noqa: E501
98 4: "Reserved", 5: "RSA/SHA-1", 6: "DSA-NSEC3-SHA1",
99 7: "RSASHA1-NSEC3-SHA1", 8: "RSA/SHA-256", 9: "Reserved",
100 10: "RSA/SHA-512", 11: "Reserved", 12: "GOST R 34.10-2001",
101 13: "ECDSA Curve P-256 with SHA-256", 14: "ECDSA Curve P-384 with SHA-384", # noqa: E501
102 15: "Ed25519", 16: "Ed448",
103 252: "Reserved for Indirect Keys", 253: "Private algorithms - domain name", # noqa: E501
104 254: "Private algorithms - OID", 255: "Reserved"}
106# 12/2023 from https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml
107dnssecdigesttypes = {0: "Reserved", 1: "SHA-1", 2: "SHA-256", 3: "GOST R 34.11-94", 4: "SHA-384"} # noqa: E501
109# 12/2023 from https://www.iana.org/assignments/dnssec-nsec3-parameters/dnssec-nsec3-parameters.xhtml # noqa: E501
110dnssecnsec3algotypes = {0: "Reserved", 1: "SHA-1"}
113def dns_get_str(s, full=None, _ignore_compression=False):
114 """This function decompresses a string s, starting
115 from the given pointer.
117 :param s: the string to decompress
118 :param full: (optional) the full packet (used for decompression)
120 :returns: (decoded_string, end_index, left_string)
121 """
122 # _ignore_compression is for internal use only
123 max_length = len(s)
124 # The result = the extracted name
125 name = b""
126 # Will contain the index after the pointer, to be returned
127 after_pointer = None
128 processed_pointers = [] # Used to check for decompression loops
129 bytes_left = None
130 _fullpacket = False # s = full packet
131 pointer = 0
132 while True:
133 if abs(pointer) >= max_length:
134 log_runtime.info(
135 "DNS RR prematured end (ofs=%i, len=%i)", pointer, len(s)
136 )
137 break
138 cur = s[pointer] # get pointer value
139 pointer += 1 # make pointer go forward
140 if cur & 0xc0: # Label pointer
141 if after_pointer is None:
142 # after_pointer points to where the remaining bytes start,
143 # as pointer will follow the jump token
144 after_pointer = pointer + 1
145 if _ignore_compression:
146 # skip
147 pointer += 1
148 continue
149 if pointer >= max_length:
150 log_runtime.info(
151 "DNS incomplete jump token at (ofs=%i)", pointer
152 )
153 break
154 if not full:
155 raise Scapy_Exception("DNS message can't be compressed " +
156 "at this point!")
157 # Follow the pointer
158 pointer = ((cur & ~0xc0) << 8) + s[pointer]
159 if pointer in processed_pointers:
160 warning("DNS decompression loop detected")
161 break
162 if len(processed_pointers) >= 20:
163 warning("More than 20 jumps in a single DNS decompression ! "
164 "Dropping (evil packet)")
165 break
166 if not _fullpacket:
167 # We switch our s buffer to full, so we need to remember
168 # the previous context
169 bytes_left = s[after_pointer:]
170 s = full
171 max_length = len(s)
172 _fullpacket = True
173 processed_pointers.append(pointer)
174 continue
175 elif cur > 0: # Label
176 # cur = length of the string
177 name += s[pointer:pointer + cur] + b"."
178 pointer += cur
179 else: # End
180 break
181 if after_pointer is not None:
182 # Return the real end index (not the one we followed)
183 pointer = after_pointer
184 if bytes_left is None:
185 bytes_left = s[pointer:]
186 # name, remaining
187 return name or b".", bytes_left
190def _is_ptr(x):
191 return b"." not in x and (
192 (x and orb(x[-1]) == 0) or
193 (len(x) >= 2 and (orb(x[-2]) & 0xc0) == 0xc0)
194 )
197def dns_encode(x, check_built=False):
198 """Encodes a bytes string into the DNS format
200 :param x: the string
201 :param check_built: detect already-built strings and ignore them
202 :returns: the encoded bytes string
203 """
204 if not x or x == b".":
205 return b"\x00"
207 if check_built and _is_ptr(x):
208 # The value has already been processed. Do not process it again
209 return x
211 # Truncate chunks that cannot be encoded (more than 63 bytes..)
212 x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b".")))
213 if x[-1:] != b"\x00":
214 x += b"\x00"
215 return x
218def DNSgetstr(*args, **kwargs):
219 """Legacy function. Deprecated"""
220 warnings.warn(
221 "DNSgetstr is deprecated. Use dns_get_str instead.",
222 DeprecationWarning
223 )
224 return dns_get_str(*args, **kwargs)[:-1]
227def dns_compress(pkt):
228 """This function compresses a DNS packet according to compression rules.
229 """
230 if DNS not in pkt:
231 raise Scapy_Exception("Can only compress DNS layers")
232 pkt = pkt.copy()
233 dns_pkt = pkt.getlayer(DNS)
234 dns_pkt.clear_cache()
235 build_pkt = raw(dns_pkt)
237 def field_gen(dns_pkt):
238 """Iterates through all DNS strings that can be compressed"""
239 for lay in [dns_pkt.qd, dns_pkt.an, dns_pkt.ns, dns_pkt.ar]:
240 if not lay:
241 continue
242 for current in lay:
243 for field in current.fields_desc:
244 if isinstance(field, DNSStrField) or \
245 (isinstance(field, MultipleTypeField) and
246 current.type in [2, 3, 4, 5, 12, 15, 39]):
247 # Get the associated data and store it accordingly # noqa: E501
248 dat = current.getfieldval(field.name)
249 yield current, field.name, dat
251 def possible_shortens(dat):
252 """Iterates through all possible compression parts in a DNS string"""
253 if dat == b".": # we'd lose by compressing it
254 return
255 yield dat
256 for x in range(1, dat.count(b".")):
257 yield dat.split(b".", x)[x]
258 data = {}
259 for current, name, dat in field_gen(dns_pkt):
260 for part in possible_shortens(dat):
261 # Encode the data
262 encoded = dns_encode(part, check_built=True)
263 if part not in data:
264 # We have no occurrence of such data, let's store it as a
265 # possible pointer for future strings.
266 # We get the index of the encoded data
267 index = build_pkt.index(encoded)
268 # The following is used to build correctly the pointer
269 fb_index = ((index >> 8) | 0xc0)
270 sb_index = index - (256 * (fb_index - 0xc0))
271 pointer = chb(fb_index) + chb(sb_index)
272 data[part] = [(current, name, pointer, index + 1)]
273 else:
274 # This string already exists, let's mark the current field
275 # with it, so that it gets compressed
276 data[part].append((current, name))
277 _in = data[part][0][3]
278 build_pkt = build_pkt[:_in] + build_pkt[_in:].replace(
279 encoded,
280 b"\0\0",
281 1
282 )
283 break
284 # Apply compression rules
285 for ck in data:
286 # compression_key is a DNS string
287 replacements = data[ck]
288 # replacements is the list of all tuples (layer, field name)
289 # where this string was found
290 replace_pointer = replacements.pop(0)[2]
291 # replace_pointer is the packed pointer that should replace
292 # those strings. Note that pop remove it from the list
293 for rep in replacements:
294 # setfieldval edits the value of the field in the layer
295 val = rep[0].getfieldval(rep[1])
296 assert val.endswith(ck)
297 kept_string = dns_encode(val[:-len(ck)], check_built=True)[:-1]
298 new_val = kept_string + replace_pointer
299 rep[0].setfieldval(rep[1], new_val)
300 try:
301 del rep[0].rdlen
302 except AttributeError:
303 pass
304 # End of the compression algorithm
305 # Destroy the previous DNS layer if needed
306 if not isinstance(pkt, DNS) and pkt.getlayer(DNS).underlayer:
307 pkt.getlayer(DNS).underlayer.remove_payload()
308 return pkt / dns_pkt
309 return dns_pkt
312class DNSCompressedPacket(Packet):
313 """
314 Class to mark that a packet contains DNSStrField and supports compression
315 """
316 @abc.abstractmethod
317 def get_full(self):
318 pass
321class DNSStrField(StrLenField):
322 """
323 Special StrField that handles DNS encoding/decoding.
324 It will also handle DNS decompression.
325 (may be StrLenField if a length_from is passed),
326 """
327 def any2i(self, pkt, x):
328 if x and isinstance(x, list):
329 return [self.h2i(pkt, y) for y in x]
330 return super(DNSStrField, self).any2i(pkt, x)
332 def h2i(self, pkt, x):
333 # Setting a DNSStrField manually (h2i) means any current compression will break
334 if (
335 pkt and
336 isinstance(pkt.parent, DNSCompressedPacket) and
337 pkt.parent.raw_packet_cache
338 ):
339 pkt.parent.clear_cache()
340 if not x:
341 return b"."
342 x = bytes_encode(x)
343 if x[-1:] != b"." and not _is_ptr(x):
344 return x + b"."
345 return x
347 def i2m(self, pkt, x):
348 return dns_encode(x, check_built=True)
350 def i2len(self, pkt, x):
351 return len(self.i2m(pkt, x))
353 def get_full(self, pkt):
354 while pkt and not isinstance(pkt, DNSCompressedPacket):
355 pkt = pkt.parent or pkt.underlayer
356 if not pkt:
357 return None
358 return pkt.get_full()
360 def getfield(self, pkt, s):
361 remain = b""
362 if self.length_from:
363 remain, s = super(DNSStrField, self).getfield(pkt, s)
364 # Decode the compressed DNS message
365 decoded, left = dns_get_str(s, full=self.get_full(pkt))
366 # returns (remaining, decoded)
367 return left + remain, decoded
370class DNSTextField(StrLenField):
371 """
372 Special StrLenField that handles DNS TEXT data (16)
373 """
375 islist = 1
377 def i2h(self, pkt, x):
378 if not x:
379 return []
380 return x
382 def m2i(self, pkt, s):
383 ret_s = list()
384 tmp_s = s
385 # RDATA contains a list of strings, each are prepended with
386 # a byte containing the size of the following string.
387 while tmp_s:
388 tmp_len = orb(tmp_s[0]) + 1
389 if tmp_len > len(tmp_s):
390 log_runtime.info(
391 "DNS RR TXT prematured end of character-string "
392 "(size=%i, remaining bytes=%i)", tmp_len, len(tmp_s)
393 )
394 ret_s.append(tmp_s[1:tmp_len])
395 tmp_s = tmp_s[tmp_len:]
396 return ret_s
398 def any2i(self, pkt, x):
399 if isinstance(x, (str, bytes)):
400 return [x]
401 return x
403 def i2len(self, pkt, x):
404 return len(self.i2m(pkt, x))
406 def i2m(self, pkt, s):
407 ret_s = b""
408 for text in s:
409 if not text:
410 ret_s += b"\x00"
411 continue
412 text = bytes_encode(text)
413 # The initial string must be split into a list of strings
414 # prepended with theirs sizes.
415 while len(text) >= 255:
416 ret_s += b"\xff" + text[:255]
417 text = text[255:]
418 # The remaining string is less than 255 bytes long
419 if len(text):
420 ret_s += struct.pack("!B", len(text)) + text
421 return ret_s
424# RFC 2671 - Extension Mechanisms for DNS (EDNS0)
426edns0types = {0: "Reserved", 1: "LLQ", 2: "UL", 3: "NSID", 4: "Reserved",
427 5: "DAU", 6: "DHU", 7: "N3U", 8: "edns-client-subnet", 10: "COOKIE",
428 15: "Extended DNS Error"}
431class _EDNS0Dummy(Packet):
432 name = "Dummy class that implements extract_padding()"
434 def extract_padding(self, p):
435 # type: (bytes) -> Tuple[bytes, Optional[bytes]]
436 return "", p
439class EDNS0TLV(_EDNS0Dummy):
440 name = "DNS EDNS0 TLV"
441 fields_desc = [ShortEnumField("optcode", 0, edns0types),
442 FieldLenField("optlen", None, "optdata", fmt="H"),
443 StrLenField("optdata", "",
444 length_from=lambda pkt: pkt.optlen)]
446 @classmethod
447 def dispatch_hook(cls, _pkt=None, *args, **kargs):
448 # type: (Optional[bytes], *Any, **Any) -> Type[Packet]
449 if _pkt is None:
450 return EDNS0TLV
451 if len(_pkt) < 2:
452 return Raw
453 edns0type = struct.unpack("!H", _pkt[:2])[0]
454 return EDNS0OPT_DISPATCHER.get(edns0type, EDNS0TLV)
457class DNSRROPT(Packet):
458 name = "DNS OPT Resource Record"
459 fields_desc = [DNSStrField("rrname", ""),
460 ShortEnumField("type", 41, dnstypes),
461 ShortField("rclass", 4096),
462 ByteField("extrcode", 0),
463 ByteField("version", 0),
464 # version 0 means EDNS0
465 BitEnumField("z", 32768, 16, {32768: "D0"}),
466 # D0 means DNSSEC OK from RFC 3225
467 FieldLenField("rdlen", None, length_of="rdata", fmt="H"),
468 PacketListField("rdata", [], EDNS0TLV,
469 length_from=lambda pkt: pkt.rdlen)]
472# RFC 6975 - Signaling Cryptographic Algorithm Understanding in
473# DNS Security Extensions (DNSSEC)
475class EDNS0DAU(_EDNS0Dummy):
476 name = "DNSSEC Algorithm Understood (DAU)"
477 fields_desc = [ShortEnumField("optcode", 5, edns0types),
478 FieldLenField("optlen", None, count_of="alg_code", fmt="H"),
479 FieldListField("alg_code", None,
480 ByteEnumField("", 0, dnssecalgotypes),
481 count_from=lambda pkt:pkt.optlen)]
484class EDNS0DHU(_EDNS0Dummy):
485 name = "DS Hash Understood (DHU)"
486 fields_desc = [ShortEnumField("optcode", 6, edns0types),
487 FieldLenField("optlen", None, count_of="alg_code", fmt="H"),
488 FieldListField("alg_code", None,
489 ByteEnumField("", 0, dnssecdigesttypes),
490 count_from=lambda pkt:pkt.optlen)]
493class EDNS0N3U(_EDNS0Dummy):
494 name = "NSEC3 Hash Understood (N3U)"
495 fields_desc = [ShortEnumField("optcode", 7, edns0types),
496 FieldLenField("optlen", None, count_of="alg_code", fmt="H"),
497 FieldListField("alg_code", None,
498 ByteEnumField("", 0, dnssecnsec3algotypes),
499 count_from=lambda pkt:pkt.optlen)]
502# RFC 7871 - Client Subnet in DNS Queries
504class ClientSubnetv4(StrLenField):
505 af_familly = socket.AF_INET
506 af_length = 32
507 af_default = b"\xc0" # 192.0.0.0
509 def getfield(self, pkt, s):
510 # type: (Packet, bytes) -> Tuple[bytes, I]
511 sz = operator.floordiv(self.length_from(pkt), 8)
512 sz = min(sz, operator.floordiv(self.af_length, 8))
513 return s[sz:], self.m2i(pkt, s[:sz])
515 def m2i(self, pkt, x):
516 # type: (Optional[Packet], bytes) -> str
517 padding = self.af_length - self.length_from(pkt)
518 if padding:
519 x += b"\x00" * operator.floordiv(padding, 8)
520 x = x[: operator.floordiv(self.af_length, 8)]
521 return inet_ntop(self.af_familly, x)
523 def _pack_subnet(self, subnet):
524 # type: (bytes) -> bytes
525 packed_subnet = inet_pton(self.af_familly, plain_str(subnet))
526 for i in list(range(operator.floordiv(self.af_length, 8)))[::-1]:
527 if orb(packed_subnet[i]) != 0:
528 i += 1
529 break
530 return packed_subnet[:i]
532 def i2m(self, pkt, x):
533 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes
534 if x is None:
535 return self.af_default
536 try:
537 return self._pack_subnet(x)
538 except (OSError, socket.error):
539 pkt.family = 2
540 return ClientSubnetv6("", "")._pack_subnet(x)
542 def i2len(self, pkt, x):
543 # type: (Packet, Any) -> int
544 if x is None:
545 return 1
546 try:
547 return len(self._pack_subnet(x))
548 except (OSError, socket.error):
549 pkt.family = 2
550 return len(ClientSubnetv6("", "")._pack_subnet(x))
553class ClientSubnetv6(ClientSubnetv4):
554 af_familly = socket.AF_INET6
555 af_length = 128
556 af_default = b"\x20" # 2000::
559class EDNS0ClientSubnet(_EDNS0Dummy):
560 name = "DNS EDNS0 Client Subnet"
561 fields_desc = [ShortEnumField("optcode", 8, edns0types),
562 FieldLenField("optlen", None, "address", fmt="H",
563 adjust=lambda pkt, x: x + 4),
564 ShortField("family", 1),
565 FieldLenField("source_plen", None,
566 length_of="address",
567 fmt="B",
568 adjust=lambda pkt, x: x * 8),
569 ByteField("scope_plen", 0),
570 MultipleTypeField(
571 [(ClientSubnetv4("address", "192.168.0.0",
572 length_from=lambda p: p.source_plen),
573 lambda pkt: pkt.family == 1),
574 (ClientSubnetv6("address", "2001:db8::",
575 length_from=lambda p: p.source_plen),
576 lambda pkt: pkt.family == 2)],
577 ClientSubnetv4("address", "192.168.0.0",
578 length_from=lambda p: p.source_plen))]
581class EDNS0COOKIE(_EDNS0Dummy):
582 name = "DNS EDNS0 COOKIE"
583 fields_desc = [ShortEnumField("optcode", 10, edns0types),
584 FieldLenField("optlen", None, length_of="server_cookie", fmt="!H",
585 adjust=lambda pkt, x: x + 8),
586 XStrFixedLenField("client_cookie", b"\x00" * 8, length=8),
587 XStrLenField("server_cookie", "",
588 length_from=lambda pkt: max(0, pkt.optlen - 8))]
591# RFC 8914 - Extended DNS Errors
593# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#extended-dns-error-codes
594extended_dns_error_codes = {
595 0: "Other",
596 1: "Unsupported DNSKEY Algorithm",
597 2: "Unsupported DS Digest Type",
598 3: "Stale Answer",
599 4: "Forged Answer",
600 5: "DNSSEC Indeterminate",
601 6: "DNSSEC Bogus",
602 7: "Signature Expired",
603 8: "Signature Not Yet Valid",
604 9: "DNSKEY Missing",
605 10: "RRSIGs Missing",
606 11: "No Zone Key Bit Set",
607 12: "NSEC Missing",
608 13: "Cached Error",
609 14: "Not Ready",
610 15: "Blocked",
611 16: "Censored",
612 17: "Filtered",
613 18: "Prohibited",
614 19: "Stale NXDOMAIN Answer",
615 20: "Not Authoritative",
616 21: "Not Supported",
617 22: "No Reachable Authority",
618 23: "Network Error",
619 24: "Invalid Data",
620 25: "Signature Expired before Valid",
621 26: "Too Early",
622 27: "Unsupported NSEC3 Iterations Value",
623 28: "Unable to conform to policy",
624 29: "Synthesized",
625}
628# https://www.rfc-editor.org/rfc/rfc8914.html
629class EDNS0ExtendedDNSError(_EDNS0Dummy):
630 name = "DNS EDNS0 Extended DNS Error"
631 fields_desc = [ShortEnumField("optcode", 15, edns0types),
632 FieldLenField("optlen", None, length_of="extra_text", fmt="!H",
633 adjust=lambda pkt, x: x + 2),
634 ShortEnumField("info_code", 0, extended_dns_error_codes),
635 StrLenField("extra_text", "",
636 length_from=lambda pkt: pkt.optlen - 2)]
639EDNS0OPT_DISPATCHER = {
640 5: EDNS0DAU,
641 6: EDNS0DHU,
642 7: EDNS0N3U,
643 8: EDNS0ClientSubnet,
644 10: EDNS0COOKIE,
645 15: EDNS0ExtendedDNSError,
646}
649# RFC 4034 - Resource Records for the DNS Security Extensions
651def bitmap2RRlist(bitmap):
652 """
653 Decode the 'Type Bit Maps' field of the NSEC Resource Record into an
654 integer list.
655 """
656 # RFC 4034, 4.1.2. The Type Bit Maps Field
658 RRlist = []
660 while bitmap:
662 if len(bitmap) < 2:
663 log_runtime.info("bitmap too short (%i)", len(bitmap))
664 return
666 window_block = orb(bitmap[0]) # window number
667 offset = 256 * window_block # offset of the Resource Record
668 bitmap_len = orb(bitmap[1]) # length of the bitmap in bytes
670 if bitmap_len <= 0 or bitmap_len > 32:
671 log_runtime.info("bitmap length is no valid (%i)", bitmap_len)
672 return
674 tmp_bitmap = bitmap[2:2 + bitmap_len]
676 # Let's compare each bit of tmp_bitmap and compute the real RR value
677 for b in range(len(tmp_bitmap)):
678 v = 128
679 for i in range(8):
680 if orb(tmp_bitmap[b]) & v:
681 # each of the RR is encoded as a bit
682 RRlist += [offset + b * 8 + i]
683 v = v >> 1
685 # Next block if any
686 bitmap = bitmap[2 + bitmap_len:]
688 return RRlist
691def RRlist2bitmap(lst):
692 """
693 Encode a list of integers representing Resource Records to a bitmap field
694 used in the NSEC Resource Record.
695 """
696 # RFC 4034, 4.1.2. The Type Bit Maps Field
698 import math
700 bitmap = b""
701 lst = [abs(x) for x in sorted(set(lst)) if x <= 65535]
703 # number of window blocks
704 max_window_blocks = int(math.ceil(lst[-1] / 256.))
705 min_window_blocks = int(math.floor(lst[0] / 256.))
706 if min_window_blocks == max_window_blocks:
707 max_window_blocks += 1
709 for wb in range(min_window_blocks, max_window_blocks + 1):
710 # First, filter out RR not encoded in the current window block
711 # i.e. keep everything between 256*wb <= 256*(wb+1)
712 rrlist = sorted(x for x in lst if 256 * wb <= x < 256 * (wb + 1))
713 if not rrlist:
714 continue
716 # Compute the number of bytes used to store the bitmap
717 if rrlist[-1] == 0: # only one element in the list
718 bytes_count = 1
719 else:
720 max = rrlist[-1] - 256 * wb
721 bytes_count = int(math.ceil(max // 8)) + 1 # use at least 1 byte
722 if bytes_count > 32: # Don't encode more than 256 bits / values
723 bytes_count = 32
725 bitmap += struct.pack("BB", wb, bytes_count)
727 # Generate the bitmap
728 # The idea is to remove out of range Resource Records with these steps
729 # 1. rescale to fit into 8 bits
730 # 2. x gives the bit position ; compute the corresponding value
731 # 3. sum everything
732 bitmap += b"".join(
733 struct.pack(
734 b"B",
735 sum(2 ** (7 - (x - 256 * wb) + (tmp * 8)) for x in rrlist
736 if 256 * wb + 8 * tmp <= x < 256 * wb + 8 * tmp + 8),
737 ) for tmp in range(bytes_count)
738 )
740 return bitmap
743class RRlistField(StrField):
744 def h2i(self, pkt, x):
745 if isinstance(x, list):
746 return RRlist2bitmap(x)
747 return x
749 def i2repr(self, pkt, x):
750 x = self.i2h(pkt, x)
751 rrlist = bitmap2RRlist(x)
752 return [dnstypes.get(rr, rr) for rr in rrlist] if rrlist else repr(x)
755class _DNSRRdummy(Packet):
756 name = "Dummy class that implements post_build() for Resource Records"
758 def post_build(self, pkt, pay):
759 if self.rdlen is not None:
760 return pkt + pay
762 lrrname = len(self.fields_desc[0].i2m("", self.getfieldval("rrname")))
763 tmp_len = len(pkt) - lrrname - 10
764 tmp_pkt = pkt[:lrrname + 8]
765 pkt = struct.pack("!H", tmp_len) + pkt[lrrname + 8 + 2:]
767 return tmp_pkt + pkt + pay
769 def default_payload_class(self, payload):
770 return conf.padding_layer
773class DNSRRHINFO(_DNSRRdummy):
774 name = "DNS HINFO Resource Record"
775 fields_desc = [DNSStrField("rrname", ""),
776 ShortEnumField("type", 13, dnstypes),
777 ShortEnumField("rclass", 1, dnsclasses),
778 IntField("ttl", 0),
779 ShortField("rdlen", None),
780 FieldLenField("cpulen", None, fmt="!B", length_of="cpu"),
781 StrLenField("cpu", "", length_from=lambda x: x.cpulen),
782 FieldLenField("oslen", None, fmt="!B", length_of="os"),
783 StrLenField("os", "", length_from=lambda x: x.oslen)]
786class DNSRRMX(_DNSRRdummy):
787 name = "DNS MX Resource Record"
788 fields_desc = [DNSStrField("rrname", ""),
789 ShortEnumField("type", 15, dnstypes),
790 ShortEnumField("rclass", 1, dnsclasses),
791 IntField("ttl", 0),
792 ShortField("rdlen", None),
793 ShortField("preference", 0),
794 DNSStrField("exchange", ""),
795 ]
798class DNSRRSOA(_DNSRRdummy):
799 name = "DNS SOA Resource Record"
800 fields_desc = [DNSStrField("rrname", ""),
801 ShortEnumField("type", 6, dnstypes),
802 ShortEnumField("rclass", 1, dnsclasses),
803 IntField("ttl", 0),
804 ShortField("rdlen", None),
805 DNSStrField("mname", ""),
806 DNSStrField("rname", ""),
807 IntField("serial", 0),
808 IntField("refresh", 0),
809 IntField("retry", 0),
810 IntField("expire", 0),
811 IntField("minimum", 0)
812 ]
815class DNSRRRSIG(_DNSRRdummy):
816 name = "DNS RRSIG Resource Record"
817 fields_desc = [DNSStrField("rrname", ""),
818 ShortEnumField("type", 46, dnstypes),
819 ShortEnumField("rclass", 1, dnsclasses),
820 IntField("ttl", 0),
821 ShortField("rdlen", None),
822 ShortEnumField("typecovered", 1, dnstypes),
823 ByteEnumField("algorithm", 5, dnssecalgotypes),
824 ByteField("labels", 0),
825 IntField("originalttl", 0),
826 UTCTimeField("expiration", 0),
827 UTCTimeField("inception", 0),
828 ShortField("keytag", 0),
829 DNSStrField("signersname", ""),
830 StrField("signature", "")
831 ]
834class DNSRRNSEC(_DNSRRdummy):
835 name = "DNS NSEC Resource Record"
836 fields_desc = [DNSStrField("rrname", ""),
837 ShortEnumField("type", 47, dnstypes),
838 ShortEnumField("rclass", 1, dnsclasses),
839 IntField("ttl", 0),
840 ShortField("rdlen", None),
841 DNSStrField("nextname", ""),
842 RRlistField("typebitmaps", "")
843 ]
846class DNSRRDNSKEY(_DNSRRdummy):
847 name = "DNS DNSKEY Resource Record"
848 fields_desc = [DNSStrField("rrname", ""),
849 ShortEnumField("type", 48, dnstypes),
850 ShortEnumField("rclass", 1, dnsclasses),
851 IntField("ttl", 0),
852 ShortField("rdlen", None),
853 FlagsField("flags", 256, 16, "S???????Z???????"),
854 # S: Secure Entry Point
855 # Z: Zone Key
856 ByteField("protocol", 3),
857 ByteEnumField("algorithm", 5, dnssecalgotypes),
858 StrField("publickey", "")
859 ]
862class DNSRRDS(_DNSRRdummy):
863 name = "DNS DS Resource Record"
864 fields_desc = [DNSStrField("rrname", ""),
865 ShortEnumField("type", 43, dnstypes),
866 ShortEnumField("rclass", 1, dnsclasses),
867 IntField("ttl", 0),
868 ShortField("rdlen", None),
869 ShortField("keytag", 0),
870 ByteEnumField("algorithm", 5, dnssecalgotypes),
871 ByteEnumField("digesttype", 5, dnssecdigesttypes),
872 StrField("digest", "")
873 ]
876# RFC 5074 - DNSSEC Lookaside Validation (DLV)
877class DNSRRDLV(DNSRRDS):
878 name = "DNS DLV Resource Record"
880 def __init__(self, *args, **kargs):
881 DNSRRDS.__init__(self, *args, **kargs)
882 if not kargs.get('type', 0):
883 self.type = 32769
885# RFC 5155 - DNS Security (DNSSEC) Hashed Authenticated Denial of Existence
888class DNSRRNSEC3(_DNSRRdummy):
889 name = "DNS NSEC3 Resource Record"
890 fields_desc = [DNSStrField("rrname", ""),
891 ShortEnumField("type", 50, dnstypes),
892 ShortEnumField("rclass", 1, dnsclasses),
893 IntField("ttl", 0),
894 ShortField("rdlen", None),
895 ByteField("hashalg", 0),
896 BitEnumField("flags", 0, 8, {1: "Opt-Out"}),
897 ShortField("iterations", 0),
898 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"),
899 StrLenField("salt", "", length_from=lambda x: x.saltlength),
900 FieldLenField("hashlength", 0, fmt="!B", length_of="nexthashedownername"), # noqa: E501
901 StrLenField("nexthashedownername", "", length_from=lambda x: x.hashlength), # noqa: E501
902 RRlistField("typebitmaps", "")
903 ]
906class DNSRRNSEC3PARAM(_DNSRRdummy):
907 name = "DNS NSEC3PARAM Resource Record"
908 fields_desc = [DNSStrField("rrname", ""),
909 ShortEnumField("type", 51, dnstypes),
910 ShortEnumField("rclass", 1, dnsclasses),
911 IntField("ttl", 0),
912 ShortField("rdlen", None),
913 ByteField("hashalg", 0),
914 ByteField("flags", 0),
915 ShortField("iterations", 0),
916 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"),
917 StrLenField("salt", "", length_from=lambda pkt: pkt.saltlength) # noqa: E501
918 ]
921# RFC 9460 Service Binding and Parameter Specification via the DNS
922# https://www.rfc-editor.org/rfc/rfc9460.html
925# https://www.iana.org/assignments/dns-svcb/dns-svcb.xhtml
926svc_param_keys = {
927 0: "mandatory",
928 1: "alpn",
929 2: "no-default-alpn",
930 3: "port",
931 4: "ipv4hint",
932 5: "ech",
933 6: "ipv6hint",
934 7: "dohpath",
935 8: "ohttp",
936}
939class SvcParam(Packet):
940 name = "SvcParam"
941 fields_desc = [ShortEnumField("key", 0, svc_param_keys),
942 FieldLenField("len", None, length_of="value", fmt="H"),
943 MultipleTypeField(
944 [
945 # mandatory
946 (FieldListField("value", [],
947 ShortEnumField("", 0, svc_param_keys),
948 length_from=lambda pkt: pkt.len),
949 lambda pkt: pkt.key == 0),
950 # alpn, no-default-alpn
951 (DNSTextField("value", [],
952 length_from=lambda pkt: pkt.len),
953 lambda pkt: pkt.key in (1, 2)),
954 # port
955 (ShortField("value", 0),
956 lambda pkt: pkt.key == 3),
957 # ipv4hint
958 (FieldListField("value", [],
959 IPField("", "0.0.0.0"),
960 length_from=lambda pkt: pkt.len),
961 lambda pkt: pkt.key == 4),
962 # ipv6hint
963 (FieldListField("value", [],
964 IP6Field("", "::"),
965 length_from=lambda pkt: pkt.len),
966 lambda pkt: pkt.key == 6),
967 ],
968 StrLenField("value", "",
969 length_from=lambda pkt:pkt.len))]
971 def extract_padding(self, p):
972 return "", p
975class DNSRRSVCB(_DNSRRdummy):
976 name = "DNS SVCB Resource Record"
977 fields_desc = [DNSStrField("rrname", ""),
978 ShortEnumField("type", 64, dnstypes),
979 ShortEnumField("rclass", 1, dnsclasses),
980 IntField("ttl", 0),
981 ShortField("rdlen", None),
982 ShortField("svc_priority", 0),
983 DNSStrField("target_name", ""),
984 PacketListField("svc_params", [], SvcParam)]
987class DNSRRHTTPS(_DNSRRdummy):
988 name = "DNS HTTPS Resource Record"
989 fields_desc = [DNSStrField("rrname", ""),
990 ShortEnumField("type", 65, dnstypes)
991 ] + DNSRRSVCB.fields_desc[2:]
994# RFC 2782 - A DNS RR for specifying the location of services (DNS SRV)
997class DNSRRSRV(_DNSRRdummy):
998 name = "DNS SRV Resource Record"
999 fields_desc = [DNSStrField("rrname", ""),
1000 ShortEnumField("type", 33, dnstypes),
1001 ShortEnumField("rclass", 1, dnsclasses),
1002 IntField("ttl", 0),
1003 ShortField("rdlen", None),
1004 ShortField("priority", 0),
1005 ShortField("weight", 0),
1006 ShortField("port", 0),
1007 DNSStrField("target", ""), ]
1010# RFC 2845 - Secret Key Transaction Authentication for DNS (TSIG)
1011tsig_algo_sizes = {"HMAC-MD5.SIG-ALG.REG.INT": 16,
1012 "hmac-sha1": 20}
1015class TimeSignedField(Field[int, bytes]):
1016 def __init__(self, name, default):
1017 Field.__init__(self, name, default, fmt="6s")
1019 def _convert_seconds(self, packed_seconds):
1020 """Unpack the internal representation."""
1021 seconds = struct.unpack("!H", packed_seconds[:2])[0]
1022 seconds += struct.unpack("!I", packed_seconds[2:])[0]
1023 return seconds
1025 def i2m(self, pkt, seconds):
1026 """Convert the number of seconds since 1-Jan-70 UTC to the packed
1027 representation."""
1029 if seconds is None:
1030 seconds = 0
1032 tmp_short = (seconds >> 32) & 0xFFFF
1033 tmp_int = seconds & 0xFFFFFFFF
1035 return struct.pack("!HI", tmp_short, tmp_int)
1037 def m2i(self, pkt, packed_seconds):
1038 """Convert the internal representation to the number of seconds
1039 since 1-Jan-70 UTC."""
1041 if packed_seconds is None:
1042 return None
1044 return self._convert_seconds(packed_seconds)
1046 def i2repr(self, pkt, packed_seconds):
1047 """Convert the internal representation to a nice one using the RFC
1048 format."""
1049 time_struct = time.gmtime(packed_seconds)
1050 return time.strftime("%a %b %d %H:%M:%S %Y", time_struct)
1053class DNSRRTSIG(_DNSRRdummy):
1054 name = "DNS TSIG Resource Record"
1055 fields_desc = [DNSStrField("rrname", ""),
1056 ShortEnumField("type", 250, dnstypes),
1057 ShortEnumField("rclass", 1, dnsclasses),
1058 IntField("ttl", 0),
1059 ShortField("rdlen", None),
1060 DNSStrField("algo_name", "hmac-sha1"),
1061 TimeSignedField("time_signed", 0),
1062 ShortField("fudge", 0),
1063 FieldLenField("mac_len", 20, fmt="!H", length_of="mac_data"), # noqa: E501
1064 StrLenField("mac_data", "", length_from=lambda pkt: pkt.mac_len), # noqa: E501
1065 ShortField("original_id", 0),
1066 ShortField("error", 0),
1067 FieldLenField("other_len", 0, fmt="!H", length_of="other_data"), # noqa: E501
1068 StrLenField("other_data", "", length_from=lambda pkt: pkt.other_len) # noqa: E501
1069 ]
1072DNSRR_DISPATCHER = {
1073 6: DNSRRSOA, # RFC 1035
1074 13: DNSRRHINFO, # RFC 1035
1075 15: DNSRRMX, # RFC 1035
1076 33: DNSRRSRV, # RFC 2782
1077 41: DNSRROPT, # RFC 1671
1078 43: DNSRRDS, # RFC 4034
1079 46: DNSRRRSIG, # RFC 4034
1080 47: DNSRRNSEC, # RFC 4034
1081 48: DNSRRDNSKEY, # RFC 4034
1082 50: DNSRRNSEC3, # RFC 5155
1083 51: DNSRRNSEC3PARAM, # RFC 5155
1084 64: DNSRRSVCB, # RFC 9460
1085 65: DNSRRHTTPS, # RFC 9460
1086 250: DNSRRTSIG, # RFC 2845
1087 32769: DNSRRDLV, # RFC 4431
1088}
1091class DNSRR(Packet):
1092 name = "DNS Resource Record"
1093 show_indent = 0
1094 fields_desc = [DNSStrField("rrname", ""),
1095 ShortEnumField("type", 1, dnstypes),
1096 ShortEnumField("rclass", 1, dnsclasses),
1097 IntField("ttl", 0),
1098 FieldLenField("rdlen", None, length_of="rdata", fmt="H"),
1099 MultipleTypeField(
1100 [
1101 # A
1102 (IPField("rdata", "0.0.0.0"),
1103 lambda pkt: pkt.type == 1),
1104 # AAAA
1105 (IP6Field("rdata", "::"),
1106 lambda pkt: pkt.type == 28),
1107 # NS, MD, MF, CNAME, PTR, DNAME
1108 (DNSStrField("rdata", "",
1109 length_from=lambda pkt: pkt.rdlen),
1110 lambda pkt: pkt.type in [2, 3, 4, 5, 12, 39]),
1111 # TEXT
1112 (DNSTextField("rdata", [""],
1113 length_from=lambda pkt: pkt.rdlen),
1114 lambda pkt: pkt.type == 16),
1115 ],
1116 StrLenField("rdata", "",
1117 length_from=lambda pkt:pkt.rdlen)
1118 )]
1120 def default_payload_class(self, payload):
1121 return conf.padding_layer
1124def _DNSRR(s, **kwargs):
1125 """
1126 DNSRR dispatcher func
1127 """
1128 if s:
1129 # Try to find the type of the RR using the dispatcher
1130 _, remain = dns_get_str(s, _ignore_compression=True)
1131 cls = DNSRR_DISPATCHER.get(
1132 struct.unpack("!H", remain[:2])[0],
1133 DNSRR,
1134 )
1135 rrlen = (
1136 len(s) - len(remain) + # rrname len
1137 10 +
1138 struct.unpack("!H", remain[8:10])[0]
1139 )
1140 pkt = cls(s[:rrlen], **kwargs) / conf.padding_layer(s[rrlen:])
1141 # drop rdlen because if rdata was compressed, it will break everything
1142 # when rebuilding
1143 del pkt.fields["rdlen"]
1144 return pkt
1145 return None
1148class DNSQR(Packet):
1149 name = "DNS Question Record"
1150 show_indent = 0
1151 fields_desc = [DNSStrField("qname", "www.example.com"),
1152 ShortEnumField("qtype", 1, dnsqtypes),
1153 ShortEnumField("qclass", 1, dnsclasses)]
1155 def default_payload_class(self, payload):
1156 return conf.padding_layer
1159class _DNSPacketListField(PacketListField):
1160 # A normal PacketListField with backward-compatible hacks
1161 def any2i(self, pkt, x):
1162 # type: (Optional[Packet], List[Any]) -> List[Any]
1163 if x is None:
1164 warnings.warn(
1165 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now "
1166 "PacketListField(s) ! "
1167 "Setting a null default should be [] instead of None"),
1168 DeprecationWarning
1169 )
1170 x = []
1171 return super(_DNSPacketListField, self).any2i(pkt, x)
1173 def i2h(self, pkt, x):
1174 # type: (Optional[Packet], List[Packet]) -> Any
1175 class _list(list):
1176 """
1177 Fake list object to provide compatibility with older DNS fields
1178 """
1179 def __getattr__(self, attr):
1180 try:
1181 ret = getattr(self[0], attr)
1182 warnings.warn(
1183 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now "
1184 "PacketListField(s) ! "
1185 "To access the first element, use pkt.an[0] instead of "
1186 "pkt.an"),
1187 DeprecationWarning
1188 )
1189 return ret
1190 except AttributeError:
1191 raise
1192 return _list(x)
1195class DNS(DNSCompressedPacket):
1196 name = "DNS"
1197 fields_desc = [
1198 ConditionalField(ShortField("length", None),
1199 lambda p: isinstance(p.underlayer, TCP)),
1200 ShortField("id", 0),
1201 BitField("qr", 0, 1),
1202 BitEnumField("opcode", 0, 4, {0: "QUERY", 1: "IQUERY", 2: "STATUS"}),
1203 BitField("aa", 0, 1),
1204 BitField("tc", 0, 1),
1205 BitField("rd", 1, 1),
1206 BitField("ra", 0, 1),
1207 BitField("z", 0, 1),
1208 # AD and CD bits are defined in RFC 2535
1209 BitField("ad", 0, 1), # Authentic Data
1210 BitField("cd", 0, 1), # Checking Disabled
1211 BitEnumField("rcode", 0, 4, {0: "ok", 1: "format-error",
1212 2: "server-failure", 3: "name-error",
1213 4: "not-implemented", 5: "refused"}),
1214 FieldLenField("qdcount", None, count_of="qd"),
1215 FieldLenField("ancount", None, count_of="an"),
1216 FieldLenField("nscount", None, count_of="ns"),
1217 FieldLenField("arcount", None, count_of="ar"),
1218 _DNSPacketListField("qd", [DNSQR()], DNSQR, count_from=lambda pkt: pkt.qdcount),
1219 _DNSPacketListField("an", [], _DNSRR, count_from=lambda pkt: pkt.ancount),
1220 _DNSPacketListField("ns", [], _DNSRR, count_from=lambda pkt: pkt.nscount),
1221 _DNSPacketListField("ar", [], _DNSRR, count_from=lambda pkt: pkt.arcount),
1222 ]
1224 def get_full(self):
1225 # Required for DNSCompressedPacket
1226 if isinstance(self.underlayer, TCP):
1227 return self.original[2:]
1228 else:
1229 return self.original
1231 def answers(self, other):
1232 return (isinstance(other, DNS) and
1233 self.id == other.id and
1234 self.qr == 1 and
1235 other.qr == 0)
1237 def mysummary(self):
1238 name = ""
1239 if self.qr:
1240 type = "Ans"
1241 if self.an and isinstance(self.an[0], DNSRR):
1242 name = ' %s' % self.an[0].rdata
1243 elif self.rcode != 0:
1244 name = self.sprintf(' %rcode%')
1245 else:
1246 type = "Qry"
1247 if self.qd and isinstance(self.qd[0], DNSQR):
1248 name = ' %s' % self.qd[0].qname
1249 return "%sDNS %s%s" % (
1250 "m"
1251 if isinstance(self.underlayer, UDP) and self.underlayer.dport == 5353
1252 else "",
1253 type,
1254 name,
1255 )
1257 def post_build(self, pkt, pay):
1258 if isinstance(self.underlayer, TCP) and self.length is None:
1259 pkt = struct.pack("!H", len(pkt) - 2) + pkt[2:]
1260 return pkt + pay
1262 def compress(self):
1263 """Return the compressed DNS packet (using `dns_compress()`)"""
1264 return dns_compress(self)
1266 def pre_dissect(self, s):
1267 """
1268 Check that a valid DNS over TCP message can be decoded
1269 """
1270 if isinstance(self.underlayer, TCP):
1272 # Compute the length of the DNS packet
1273 if len(s) >= 2:
1274 dns_len = struct.unpack("!H", s[:2])[0]
1275 else:
1276 message = "Malformed DNS message: too small!"
1277 log_runtime.info(message)
1278 raise Scapy_Exception(message)
1280 # Check if the length is valid
1281 if dns_len < 14 or len(s) < dns_len:
1282 message = "Malformed DNS message: invalid length!"
1283 log_runtime.info(message)
1284 raise Scapy_Exception(message)
1286 return s
1289bind_layers(UDP, DNS, dport=5353)
1290bind_layers(UDP, DNS, sport=5353)
1291bind_layers(UDP, DNS, dport=53)
1292bind_layers(UDP, DNS, sport=53)
1293DestIPField.bind_addr(UDP, "224.0.0.251", dport=5353)
1294if conf.ipv6_enabled:
1295 from scapy.layers.inet6 import DestIP6Field
1296 DestIP6Field.bind_addr(UDP, "ff02::fb", dport=5353)
1297bind_layers(TCP, DNS, dport=53)
1298bind_layers(TCP, DNS, sport=53)
1300# Nameserver config
1301conf.nameservers = read_nameservers()
1302_dns_cache = conf.netcache.new_cache("dns_cache", 300)
1305@conf.commands.register
1306def dns_resolve(qname, qtype="A", raw=False, verbose=1, timeout=3, **kwargs):
1307 """
1308 Perform a simple DNS resolution using conf.nameservers with caching
1310 :param qname: the name to query
1311 :param qtype: the type to query (default A)
1312 :param raw: return the whole DNS packet (default False)
1313 :param verbose: show verbose errors
1314 :param timeout: seconds until timeout (per server)
1315 :raise TimeoutError: if no DNS servers were reached in time.
1316 """
1317 # Unify types
1318 qtype = DNSQR.qtype.any2i_one(None, qtype)
1319 qname = DNSQR.qname.any2i(None, qname)
1320 # Check cache
1321 cache_ident = b";".join(
1322 [qname, struct.pack("!B", qtype)] +
1323 ([b"raw"] if raw else [])
1324 )
1325 result = _dns_cache.get(cache_ident)
1326 if result:
1327 return result
1329 kwargs.setdefault("timeout", timeout)
1330 kwargs.setdefault("verbose", 0)
1331 res = None
1332 for nameserver in conf.nameservers:
1333 # Try all nameservers
1334 try:
1335 # Spawn a UDP socket, connect to the nameserver on port 53
1336 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
1337 sock.settimeout(kwargs["timeout"])
1338 sock.connect((nameserver, 53))
1339 # Connected. Wrap it with DNS
1340 sock = StreamSocket(sock, DNS)
1341 # I/O
1342 res = sock.sr1(
1343 DNS(qd=[DNSQR(qname=qname, qtype=qtype)], id=RandShort()),
1344 **kwargs,
1345 )
1346 except IOError as ex:
1347 if verbose:
1348 log_runtime.warning(str(ex))
1349 continue
1350 finally:
1351 sock.close()
1352 if res:
1353 # We have a response ! Check for failure
1354 if res[DNS].rcode == 2: # server failure
1355 res = None
1356 if verbose:
1357 log_runtime.info(
1358 "DNS: %s answered with failure for %s" % (
1359 nameserver,
1360 qname,
1361 )
1362 )
1363 else:
1364 break
1365 if res is not None:
1366 if raw:
1367 # Raw
1368 result = res
1369 else:
1370 # Find answers
1371 result = [
1372 x
1373 for x in itertools.chain(res.an, res.ns, res.ar)
1374 if x.type == qtype
1375 ]
1376 if result:
1377 # Cache it
1378 _dns_cache[cache_ident] = result
1379 return result
1380 else:
1381 raise TimeoutError
1384@conf.commands.register
1385def dyndns_add(nameserver, name, rdata, type="A", ttl=10):
1386 """Send a DNS add message to a nameserver for "name" to have a new "rdata"
1387dyndns_add(nameserver, name, rdata, type="A", ttl=10) -> result code (0=ok)
1389example: dyndns_add("ns1.toto.com", "dyn.toto.com", "127.0.0.1")
1390RFC2136
1391"""
1392 zone = name[name.find(".") + 1:]
1393 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5,
1394 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501
1395 ns=[DNSRR(rrname=name, type="A",
1396 ttl=ttl, rdata=rdata)]),
1397 verbose=0, timeout=5)
1398 if r and r.haslayer(DNS):
1399 return r.getlayer(DNS).rcode
1400 else:
1401 return -1
1404@conf.commands.register
1405def dyndns_del(nameserver, name, type="ALL", ttl=10):
1406 """Send a DNS delete message to a nameserver for "name"
1407dyndns_del(nameserver, name, type="ANY", ttl=10) -> result code (0=ok)
1409example: dyndns_del("ns1.toto.com", "dyn.toto.com")
1410RFC2136
1411"""
1412 zone = name[name.find(".") + 1:]
1413 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5,
1414 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501
1415 ns=[DNSRR(rrname=name, type=type,
1416 rclass="ANY", ttl=0, rdata="")]), # noqa: E501
1417 verbose=0, timeout=5)
1418 if r and r.haslayer(DNS):
1419 return r.getlayer(DNS).rcode
1420 else:
1421 return -1
1424class DNS_am(AnsweringMachine):
1425 function_name = "dnsd"
1426 filter = "udp port 53"
1427 cls = DNS # We also use this automaton for llmnrd / mdnsd
1429 def parse_options(self, joker=None,
1430 match=None,
1431 srvmatch=None,
1432 joker6=False,
1433 send_error=False,
1434 relay=False,
1435 from_ip=None,
1436 from_ip6=None,
1437 src_ip=None,
1438 src_ip6=None,
1439 ttl=10,
1440 jokerarpa=None):
1441 """
1442 :param joker: default IPv4 for unresolved domains. (Default: None)
1443 Set to False to disable, None to mirror the interface's IP.
1444 :param joker6: default IPv6 for unresolved domains (Default: False)
1445 set to False to disable, None to mirror the interface's IPv6.
1446 :param jokerarpa: answer for .in-addr.arpa PTR requests. (Default: None)
1447 :param relay: relay unresolved domains to conf.nameservers (Default: False).
1448 :param send_error: send an error message when this server can't answer
1449 (Default: False)
1450 :param match: a dictionary of {name: val} where name is a string representing
1451 a domain name (A, AAAA) and val is a tuple of 2 elements, each
1452 representing an IP or a list of IPs. If val is a single element,
1453 (A, None) is assumed.
1454 :param srvmatch: a dictionary of {name: (port, target)} used for SRV
1455 :param from_ip: an source IP to filter. Can contain a netmask
1456 :param from_ip6: an source IPv6 to filter. Can contain a netmask
1457 :param ttl: the DNS time to live (in seconds)
1458 :param src_ip: override the source IP
1459 :param src_ip6:
1461 Example::
1463 $ sudo iptables -I OUTPUT -p icmp --icmp-type 3/3 -j DROP
1464 >>> dnsd(match={"google.com": "1.1.1.1"}, joker="192.168.0.2", iface="eth0")
1465 >>> dnsd(srvmatch={
1466 ... "_ldap._tcp.dc._msdcs.DOMAIN.LOCAL.": (389, "srv1.domain.local")
1467 ... })
1468 """
1469 def normv(v):
1470 if isinstance(v, (tuple, list)) and len(v) == 2:
1471 return v
1472 elif isinstance(v, str):
1473 return (v, None)
1474 else:
1475 raise ValueError("Bad match value: '%s'" % repr(v))
1477 def normk(k):
1478 k = bytes_encode(k).lower()
1479 if not k.endswith(b"."):
1480 k += b"."
1481 return k
1482 if match is None:
1483 self.match = {}
1484 else:
1485 self.match = {normk(k): normv(v) for k, v in match.items()}
1486 if srvmatch is None:
1487 self.srvmatch = {}
1488 else:
1489 self.srvmatch = {normk(k): normv(v) for k, v in srvmatch.items()}
1490 self.joker = joker
1491 self.joker6 = joker6
1492 self.jokerarpa = jokerarpa
1493 self.send_error = send_error
1494 self.relay = relay
1495 if isinstance(from_ip, str):
1496 self.from_ip = Net(from_ip)
1497 else:
1498 self.from_ip = from_ip
1499 if isinstance(from_ip6, str):
1500 self.from_ip6 = Net(from_ip6)
1501 else:
1502 self.from_ip6 = from_ip6
1503 self.src_ip = src_ip
1504 self.src_ip6 = src_ip6
1505 self.ttl = ttl
1507 def is_request(self, req):
1508 from scapy.layers.inet6 import IPv6
1509 return (
1510 req.haslayer(self.cls) and
1511 req.getlayer(self.cls).qr == 0 and (
1512 (
1513 not self.from_ip6 or req[IPv6].src in self.from_ip6
1514 )
1515 if IPv6 in req else
1516 (
1517 not self.from_ip or req[IP].src in self.from_ip
1518 )
1519 )
1520 )
1522 def make_reply(self, req):
1523 mDNS = isinstance(self, mDNS_am)
1524 llmnr = self.cls != DNS
1525 # Build reply from the request
1526 resp = req.copy()
1527 if Ether in req:
1528 if mDNS:
1529 resp[Ether].src, resp[Ether].dst = None, None
1530 elif llmnr:
1531 resp[Ether].src, resp[Ether].dst = None, req[Ether].src
1532 else:
1533 resp[Ether].src, resp[Ether].dst = (
1534 None if req[Ether].dst in "ff:ff:ff:ff:ff:ff" else req[Ether].dst,
1535 req[Ether].src,
1536 )
1537 from scapy.layers.inet6 import IPv6
1538 if IPv6 in req:
1539 resp[IPv6].underlayer.remove_payload()
1540 if mDNS:
1541 resp /= IPv6(dst="ff02::fb", src=self.src_ip6)
1542 elif llmnr:
1543 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6)
1544 else:
1545 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6 or req[IPv6].dst)
1546 elif IP in req:
1547 resp[IP].underlayer.remove_payload()
1548 if mDNS:
1549 resp /= IP(dst="224.0.0.251", src=self.src_ip)
1550 elif llmnr:
1551 resp /= IP(dst=req[IP].src, src=self.src_ip)
1552 else:
1553 resp /= IP(dst=req[IP].src, src=self.src_ip or req[IP].dst)
1554 else:
1555 warning("No IP or IPv6 layer in %s", req.command())
1556 return
1557 try:
1558 resp /= UDP(sport=req[UDP].dport, dport=req[UDP].sport)
1559 except IndexError:
1560 warning("No UDP layer in %s", req.command(), exc_info=True)
1561 return
1562 # Now process each query and store its answer in 'ans'
1563 ans = []
1564 try:
1565 req = req[self.cls]
1566 except IndexError:
1567 warning(
1568 "No %s layer in %s",
1569 self.cls.__name__,
1570 req.command(),
1571 exc_info=True,
1572 )
1573 return
1574 try:
1575 queries = req.qd
1576 except AttributeError:
1577 warning("No qd attribute in %s", req.command(), exc_info=True)
1578 return
1579 for rq in queries:
1580 # For each query
1581 if isinstance(rq, Raw):
1582 warning("Cannot parse qd element %s", rq.command(), exc_info=True)
1583 continue
1584 if rq.qtype in [1, 28]:
1585 # A or AAAA
1586 if rq.qtype == 28:
1587 # AAAA
1588 try:
1589 rdata = self.match[rq.qname.lower()][1]
1590 except KeyError:
1591 if self.relay or self.joker6 is False:
1592 rdata = None
1593 else:
1594 rdata = self.joker6 or get_if_addr6(
1595 self.optsniff.get("iface", conf.iface)
1596 )
1597 elif rq.qtype == 1:
1598 # A
1599 try:
1600 rdata = self.match[rq.qname.lower()][0]
1601 except KeyError:
1602 if self.relay or self.joker is False:
1603 rdata = None
1604 else:
1605 rdata = self.joker or get_if_addr(
1606 self.optsniff.get("iface", conf.iface)
1607 )
1608 if rdata is not None:
1609 # Common A and AAAA
1610 if not isinstance(rdata, list):
1611 rdata = [rdata]
1612 ans.extend([
1613 DNSRR(rrname=rq.qname, ttl=self.ttl, rdata=x, type=rq.qtype)
1614 for x in rdata
1615 ])
1616 continue # next
1617 elif rq.qtype == 33:
1618 # SRV
1619 try:
1620 port, target = self.srvmatch[rq.qname.lower()]
1621 ans.append(DNSRRSRV(
1622 rrname=rq.qname,
1623 port=port,
1624 target=target,
1625 weight=100,
1626 ttl=self.ttl
1627 ))
1628 continue # next
1629 except KeyError:
1630 # No result
1631 pass
1632 elif rq.qtype == 12:
1633 # PTR
1634 if rq.qname[-14:] == b".in-addr.arpa." and self.jokerarpa:
1635 ans.append(DNSRR(
1636 rrname=rq.qname,
1637 type=rq.qtype,
1638 ttl=self.ttl,
1639 rdata=self.jokerarpa,
1640 ))
1641 continue
1642 # It it arrives here, there is currently no answer
1643 if self.relay:
1644 # Relay mode ?
1645 try:
1646 _rslv = dns_resolve(rq.qname, qtype=rq.qtype)
1647 if _rslv:
1648 ans.extend(_rslv)
1649 continue # next
1650 except TimeoutError:
1651 pass
1652 # Error
1653 break
1654 else:
1655 if not ans:
1656 # No rq was actually answered, as none was valid. Discard.
1657 return
1658 # All rq were answered
1659 if mDNS:
1660 # in mDNS mode, don't repeat the question
1661 resp /= self.cls(id=req.id, qr=1, qd=[], an=ans)
1662 else:
1663 resp /= self.cls(id=req.id, qr=1, qd=req.qd, an=ans)
1664 return resp
1665 # An error happened
1666 if self.send_error:
1667 resp /= self.cls(id=req.id, qr=1, qd=req.qd, rcode=3)
1668 return resp
1671class mDNS_am(DNS_am):
1672 """
1673 mDNS answering machine.
1675 This has the same arguments as DNS_am. See help(DNS_am)
1677 Example::
1679 >>> mdnsd(joker="192.168.0.2", iface="eth0")
1680 >>> mdnsd(match={"TEST.local": "192.168.0.2"})
1681 """
1682 function_name = "mdnsd"
1683 filter = "udp port 5353"