Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/dns.py: 46%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

819 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7DNS: Domain Name System 

8 

9This implements: 

10- RFC1035: Domain Names 

11- RFC6762: Multicast DNS 

12- RFC6763: DNS-Based Service Discovery 

13""" 

14 

15import abc 

16import collections 

17import operator 

18import itertools 

19import socket 

20import struct 

21import time 

22import warnings 

23 

24from scapy.arch import ( 

25 get_if_addr, 

26 get_if_addr6, 

27 read_nameservers, 

28) 

29from scapy.ansmachine import AnsweringMachine 

30from scapy.base_classes import Net, ScopedIP 

31from scapy.config import conf 

32from scapy.compat import raw, chb, bytes_encode, plain_str 

33from scapy.error import log_runtime, warning, Scapy_Exception 

34from scapy.packet import Packet, bind_layers, Raw 

35from scapy.fields import ( 

36 BitEnumField, 

37 BitField, 

38 ByteEnumField, 

39 ByteField, 

40 ConditionalField, 

41 Field, 

42 FieldLenField, 

43 FieldListField, 

44 FlagsField, 

45 I, 

46 IP6Field, 

47 IntField, 

48 MACField, 

49 MultipleTypeField, 

50 PacketListField, 

51 ShortEnumField, 

52 ShortField, 

53 StrField, 

54 StrLenField, 

55 UTCTimeField, 

56 XStrFixedLenField, 

57 XStrLenField, 

58) 

59from scapy.interfaces import resolve_iface 

60from scapy.sendrecv import sr1, sr 

61from scapy.supersocket import StreamSocket 

62from scapy.plist import SndRcvList, _PacketList, QueryAnswer 

63from scapy.pton_ntop import inet_ntop, inet_pton 

64from scapy.utils import pretty_list 

65from scapy.volatile import RandShort 

66 

67from scapy.layers.l2 import Ether 

68from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP 

69from scapy.layers.inet6 import IPv6 

70 

71from typing import ( 

72 Any, 

73 List, 

74 Optional, 

75 Tuple, 

76 Type, 

77 Union, 

78) 

79 

80 

81# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-4 

82dnstypes = { 

83 0: "RESERVED", 

84 1: "A", 2: "NS", 3: "MD", 4: "MF", 5: "CNAME", 6: "SOA", 7: "MB", 8: "MG", 

85 9: "MR", 10: "NULL", 11: "WKS", 12: "PTR", 13: "HINFO", 14: "MINFO", 

86 15: "MX", 16: "TXT", 17: "RP", 18: "AFSDB", 19: "X25", 20: "ISDN", 

87 21: "RT", 22: "NSAP", 23: "NSAP-PTR", 24: "SIG", 25: "KEY", 26: "PX", 

88 27: "GPOS", 28: "AAAA", 29: "LOC", 30: "NXT", 31: "EID", 32: "NIMLOC", 

89 33: "SRV", 34: "ATMA", 35: "NAPTR", 36: "KX", 37: "CERT", 38: "A6", 

90 39: "DNAME", 40: "SINK", 41: "OPT", 42: "APL", 43: "DS", 44: "SSHFP", 

91 45: "IPSECKEY", 46: "RRSIG", 47: "NSEC", 48: "DNSKEY", 49: "DHCID", 

92 50: "NSEC3", 51: "NSEC3PARAM", 52: "TLSA", 53: "SMIMEA", 55: "HIP", 

93 56: "NINFO", 57: "RKEY", 58: "TALINK", 59: "CDS", 60: "CDNSKEY", 

94 61: "OPENPGPKEY", 62: "CSYNC", 63: "ZONEMD", 64: "SVCB", 65: "HTTPS", 

95 99: "SPF", 100: "UINFO", 101: "UID", 102: "GID", 103: "UNSPEC", 104: "NID", 

96 105: "L32", 106: "L64", 107: "LP", 108: "EUI48", 109: "EUI64", 249: "TKEY", 

97 250: "TSIG", 256: "URI", 257: "CAA", 258: "AVC", 259: "DOA", 

98 260: "AMTRELAY", 32768: "TA", 32769: "DLV", 65535: "RESERVED" 

99} 

100 

101 

102dnsqtypes = {251: "IXFR", 252: "AXFR", 253: "MAILB", 254: "MAILA", 255: "ALL"} 

103dnsqtypes.update(dnstypes) 

104dnsclasses = {1: 'IN', 2: 'CS', 3: 'CH', 4: 'HS', 255: 'ANY'} 

105 

106 

107# 12/2023 from https://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xhtml # noqa: E501 

108dnssecalgotypes = {0: "Reserved", 1: "RSA/MD5", 2: "Diffie-Hellman", 3: "DSA/SHA-1", # noqa: E501 

109 4: "Reserved", 5: "RSA/SHA-1", 6: "DSA-NSEC3-SHA1", 

110 7: "RSASHA1-NSEC3-SHA1", 8: "RSA/SHA-256", 9: "Reserved", 

111 10: "RSA/SHA-512", 11: "Reserved", 12: "GOST R 34.10-2001", 

112 13: "ECDSA Curve P-256 with SHA-256", 14: "ECDSA Curve P-384 with SHA-384", # noqa: E501 

113 15: "Ed25519", 16: "Ed448", 

114 252: "Reserved for Indirect Keys", 253: "Private algorithms - domain name", # noqa: E501 

115 254: "Private algorithms - OID", 255: "Reserved"} 

116 

117# 12/2023 from https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml 

118dnssecdigesttypes = {0: "Reserved", 1: "SHA-1", 2: "SHA-256", 3: "GOST R 34.11-94", 4: "SHA-384"} # noqa: E501 

119 

120# 12/2023 from https://www.iana.org/assignments/dnssec-nsec3-parameters/dnssec-nsec3-parameters.xhtml # noqa: E501 

121dnssecnsec3algotypes = {0: "Reserved", 1: "SHA-1"} 

122 

123 

124def dns_get_str(s, full=None, _ignore_compression=False): 

125 """This function decompresses a string s, starting 

126 from the given pointer. 

127 

128 :param s: the string to decompress 

129 :param full: (optional) the full packet (used for decompression) 

130 

131 :returns: (decoded_string, end_index, left_string) 

132 """ 

133 # _ignore_compression is for internal use only 

134 max_length = len(s) 

135 # The result = the extracted name 

136 name = b"" 

137 # Will contain the index after the pointer, to be returned 

138 after_pointer = None 

139 processed_pointers = [] # Used to check for decompression loops 

140 bytes_left = None 

141 _fullpacket = False # s = full packet 

142 pointer = 0 

143 while True: 

144 if abs(pointer) >= max_length: 

145 log_runtime.info( 

146 "DNS RR prematured end (ofs=%i, len=%i)", pointer, len(s) 

147 ) 

148 break 

149 cur = s[pointer] # get pointer value 

150 pointer += 1 # make pointer go forward 

151 if cur & 0xc0: # Label pointer 

152 if after_pointer is None: 

153 # after_pointer points to where the remaining bytes start, 

154 # as pointer will follow the jump token 

155 after_pointer = pointer + 1 

156 if _ignore_compression: 

157 # skip 

158 pointer += 1 

159 continue 

160 if pointer >= max_length: 

161 log_runtime.info( 

162 "DNS incomplete jump token at (ofs=%i)", pointer 

163 ) 

164 break 

165 if not full: 

166 raise Scapy_Exception("DNS message can't be compressed " + 

167 "at this point!") 

168 # Follow the pointer 

169 pointer = ((cur & ~0xc0) << 8) + s[pointer] 

170 if pointer in processed_pointers: 

171 warning("DNS decompression loop detected") 

172 break 

173 if len(processed_pointers) >= 20: 

174 warning("More than 20 jumps in a single DNS decompression ! " 

175 "Dropping (evil packet)") 

176 break 

177 if not _fullpacket: 

178 # We switch our s buffer to full, so we need to remember 

179 # the previous context 

180 bytes_left = s[after_pointer:] 

181 s = full 

182 max_length = len(s) 

183 _fullpacket = True 

184 processed_pointers.append(pointer) 

185 continue 

186 elif cur > 0: # Label 

187 # cur = length of the string 

188 name += s[pointer:pointer + cur] + b"." 

189 pointer += cur 

190 else: # End 

191 break 

192 if after_pointer is not None: 

193 # Return the real end index (not the one we followed) 

194 pointer = after_pointer 

195 if bytes_left is None: 

196 bytes_left = s[pointer:] 

197 # name, remaining 

198 return name or b".", bytes_left 

199 

200 

201def _is_ptr(x): 

202 """ 

203 Heuristic to guess if bytes are an encoded DNS pointer. 

204 """ 

205 return ( 

206 (x and x[-1] == 0) or 

207 (len(x) >= 2 and (x[-2] & 0xc0) == 0xc0) 

208 ) 

209 

210 

211def dns_encode(x, check_built=False): 

212 """Encodes a bytes string into the DNS format 

213 

214 :param x: the string 

215 :param check_built: detect already-built strings and ignore them 

216 :returns: the encoded bytes string 

217 """ 

218 if not x or x == b".": 

219 return b"\x00" 

220 

221 if check_built and _is_ptr(x): 

222 # The value has already been processed. Do not process it again 

223 return x 

224 

225 # Truncate chunks that cannot be encoded (more than 63 bytes..) 

226 x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b"."))) 

227 if x[-1:] != b"\x00": 

228 x += b"\x00" 

229 return x 

230 

231 

232def DNSgetstr(*args, **kwargs): 

233 """Legacy function. Deprecated""" 

234 warnings.warn( 

235 "DNSgetstr is deprecated. Use dns_get_str instead.", 

236 DeprecationWarning 

237 ) 

238 return dns_get_str(*args, **kwargs)[:-1] 

239 

240 

241def dns_compress(pkt): 

242 """This function compresses a DNS packet according to compression rules. 

243 """ 

244 if DNS not in pkt: 

245 raise Scapy_Exception("Can only compress DNS layers") 

246 pkt = pkt.copy() 

247 dns_pkt = pkt.getlayer(DNS) 

248 dns_pkt.clear_cache() 

249 build_pkt = raw(dns_pkt) 

250 

251 def field_gen(dns_pkt): 

252 """Iterates through all DNS strings that can be compressed""" 

253 for lay in [dns_pkt.qd, dns_pkt.an, dns_pkt.ns, dns_pkt.ar]: 

254 if not lay: 

255 continue 

256 for current in lay: 

257 for field in current.fields_desc: 

258 if isinstance(field, DNSStrField) or \ 

259 (isinstance(field, MultipleTypeField) and 

260 current.type in [2, 3, 4, 5, 12, 15, 39, 47]): 

261 # Get the associated data and store it accordingly # noqa: E501 

262 dat = current.getfieldval(field.name) 

263 yield current, field.name, dat 

264 

265 def possible_shortens(dat): 

266 """Iterates through all possible compression parts in a DNS string""" 

267 if dat == b".": # we'd lose by compressing it 

268 return 

269 yield dat 

270 for x in range(1, dat.count(b".")): 

271 yield dat.split(b".", x)[x] 

272 data = {} 

273 for current, name, dat in field_gen(dns_pkt): 

274 for part in possible_shortens(dat): 

275 # Encode the data 

276 encoded = dns_encode(part, check_built=True) 

277 if part not in data: 

278 # We have no occurrence of such data, let's store it as a 

279 # possible pointer for future strings. 

280 # We get the index of the encoded data 

281 index = build_pkt.index(encoded) 

282 # The following is used to build correctly the pointer 

283 fb_index = ((index >> 8) | 0xc0) 

284 sb_index = index - (256 * (fb_index - 0xc0)) 

285 pointer = chb(fb_index) + chb(sb_index) 

286 data[part] = [(current, name, pointer, index + 1)] 

287 else: 

288 # This string already exists, let's mark the current field 

289 # with it, so that it gets compressed 

290 data[part].append((current, name)) 

291 _in = data[part][0][3] 

292 build_pkt = build_pkt[:_in] + build_pkt[_in:].replace( 

293 encoded, 

294 b"\0\0", 

295 1 

296 ) 

297 break 

298 # Apply compression rules 

299 for ck in data: 

300 # compression_key is a DNS string 

301 replacements = data[ck] 

302 # replacements is the list of all tuples (layer, field name) 

303 # where this string was found 

304 replace_pointer = replacements.pop(0)[2] 

305 # replace_pointer is the packed pointer that should replace 

306 # those strings. Note that pop remove it from the list 

307 for rep in replacements: 

308 # setfieldval edits the value of the field in the layer 

309 val = rep[0].getfieldval(rep[1]) 

310 assert val.endswith(ck) 

311 kept_string = dns_encode(val[:-len(ck)], check_built=True)[:-1] 

312 new_val = kept_string + replace_pointer 

313 rep[0].setfieldval(rep[1], new_val) 

314 try: 

315 del rep[0].rdlen 

316 except AttributeError: 

317 pass 

318 # End of the compression algorithm 

319 # Destroy the previous DNS layer if needed 

320 if not isinstance(pkt, DNS) and pkt.getlayer(DNS).underlayer: 

321 pkt.getlayer(DNS).underlayer.remove_payload() 

322 return pkt / dns_pkt 

323 return dns_pkt 

324 

325 

326class DNSCompressedPacket(Packet): 

327 """ 

328 Class to mark that a packet contains DNSStrField and supports compression 

329 """ 

330 @abc.abstractmethod 

331 def get_full(self): 

332 pass 

333 

334 

335class DNSStrField(StrLenField): 

336 """ 

337 Special StrField that handles DNS encoding/decoding. 

338 It will also handle DNS decompression. 

339 (may be StrLenField if a length_from is passed), 

340 """ 

341 def any2i(self, pkt, x): 

342 if x and isinstance(x, list): 

343 return [self.h2i(pkt, y) for y in x] 

344 return super(DNSStrField, self).any2i(pkt, x) 

345 

346 def h2i(self, pkt, x): 

347 # Setting a DNSStrField manually (h2i) means any current compression will break 

348 if ( 

349 pkt and 

350 isinstance(pkt.parent, DNSCompressedPacket) and 

351 pkt.parent.raw_packet_cache 

352 ): 

353 pkt.parent.clear_cache() 

354 if not x: 

355 return b"." 

356 x = bytes_encode(x) 

357 if x[-1:] != b"." and not _is_ptr(x): 

358 return x + b"." 

359 return x 

360 

361 def i2m(self, pkt, x): 

362 return dns_encode(x, check_built=True) 

363 

364 def i2len(self, pkt, x): 

365 return len(self.i2m(pkt, x)) 

366 

367 def get_full(self, pkt): 

368 while pkt and not isinstance(pkt, DNSCompressedPacket): 

369 pkt = pkt.parent or pkt.underlayer 

370 if not pkt: 

371 return None 

372 return pkt.get_full() 

373 

374 def getfield(self, pkt, s): 

375 remain = b"" 

376 if self.length_from: 

377 remain, s = super(DNSStrField, self).getfield(pkt, s) 

378 # Decode the compressed DNS message 

379 decoded, left = dns_get_str(s, full=self.get_full(pkt)) 

380 # returns (remaining, decoded) 

381 return left + remain, decoded 

382 

383 

384class DNSTextField(StrLenField): 

385 """ 

386 Special StrLenField that handles DNS TEXT data (16) 

387 """ 

388 

389 islist = 1 

390 

391 def i2h(self, pkt, x): 

392 if not x: 

393 return [] 

394 return x 

395 

396 def m2i(self, pkt, s): 

397 ret_s = list() 

398 tmp_s = s 

399 # RDATA contains a list of strings, each are prepended with 

400 # a byte containing the size of the following string. 

401 while tmp_s: 

402 tmp_len = tmp_s[0] + 1 

403 if tmp_len > len(tmp_s): 

404 log_runtime.info( 

405 "DNS RR TXT prematured end of character-string " 

406 "(size=%i, remaining bytes=%i)", tmp_len, len(tmp_s) 

407 ) 

408 ret_s.append(tmp_s[1:tmp_len]) 

409 tmp_s = tmp_s[tmp_len:] 

410 return ret_s 

411 

412 def any2i(self, pkt, x): 

413 if isinstance(x, (str, bytes)): 

414 return [x] 

415 return x 

416 

417 def i2len(self, pkt, x): 

418 return len(self.i2m(pkt, x)) 

419 

420 def i2m(self, pkt, s): 

421 ret_s = b"" 

422 for text in s: 

423 if not text: 

424 ret_s += b"\x00" 

425 continue 

426 text = bytes_encode(text) 

427 # The initial string must be split into a list of strings 

428 # prepended with theirs sizes. 

429 while len(text) >= 255: 

430 ret_s += b"\xff" + text[:255] 

431 text = text[255:] 

432 # The remaining string is less than 255 bytes long 

433 if len(text): 

434 ret_s += struct.pack("!B", len(text)) + text 

435 return ret_s 

436 

437 

438# RFC 2671 - Extension Mechanisms for DNS (EDNS0) 

439 

440edns0types = {0: "Reserved", 1: "LLQ", 2: "UL", 3: "NSID", 4: "Owner", 

441 5: "DAU", 6: "DHU", 7: "N3U", 8: "edns-client-subnet", 10: "COOKIE", 

442 15: "Extended DNS Error"} 

443 

444 

445class _EDNS0Dummy(Packet): 

446 name = "Dummy class that implements extract_padding()" 

447 

448 def extract_padding(self, p): 

449 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 

450 return "", p 

451 

452 

453class EDNS0TLV(_EDNS0Dummy): 

454 name = "DNS EDNS0 TLV" 

455 fields_desc = [ShortEnumField("optcode", 0, edns0types), 

456 FieldLenField("optlen", None, "optdata", fmt="H"), 

457 StrLenField("optdata", "", 

458 length_from=lambda pkt: pkt.optlen)] 

459 

460 @classmethod 

461 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

462 # type: (Optional[bytes], *Any, **Any) -> Type[Packet] 

463 if _pkt is None: 

464 return EDNS0TLV 

465 if len(_pkt) < 2: 

466 return Raw 

467 edns0type = struct.unpack("!H", _pkt[:2])[0] 

468 return EDNS0OPT_DISPATCHER.get(edns0type, EDNS0TLV) 

469 

470 

471class DNSRROPT(Packet): 

472 name = "DNS OPT Resource Record" 

473 fields_desc = [DNSStrField("rrname", ""), 

474 ShortEnumField("type", 41, dnstypes), 

475 ShortEnumField("rclass", 4096, dnsclasses), 

476 ByteField("extrcode", 0), 

477 ByteField("version", 0), 

478 # version 0 means EDNS0 

479 BitEnumField("z", 32768, 16, {32768: "D0"}), 

480 # D0 means DNSSEC OK from RFC 3225 

481 FieldLenField("rdlen", None, length_of="rdata", fmt="H"), 

482 PacketListField("rdata", [], EDNS0TLV, 

483 length_from=lambda pkt: pkt.rdlen)] 

484 

485 

486# draft-cheshire-edns0-owner-option-01 - EDNS0 OWNER Option 

487 

488class EDNS0OWN(_EDNS0Dummy): 

489 name = "EDNS0 Owner (OWN)" 

490 fields_desc = [ShortEnumField("optcode", 4, edns0types), 

491 FieldLenField("optlen", None, count_of="primary_mac", fmt="H"), 

492 ByteField("v", 0), 

493 ByteField("s", 0), 

494 MACField("primary_mac", "00:00:00:00:00:00"), 

495 ConditionalField( 

496 MACField("wakeup_mac", "00:00:00:00:00:00"), 

497 lambda pkt: (pkt.optlen or 0) >= 18), 

498 ConditionalField( 

499 StrLenField("password", "", 

500 length_from=lambda pkt: pkt.optlen - 18), 

501 lambda pkt: (pkt.optlen or 0) >= 22)] 

502 

503 def post_build(self, pkt, pay): 

504 pkt += pay 

505 if self.optlen is None: 

506 pkt = pkt[:2] + struct.pack("!H", len(pkt) - 4) + pkt[4:] 

507 return pkt 

508 

509 

510# RFC 6975 - Signaling Cryptographic Algorithm Understanding in 

511# DNS Security Extensions (DNSSEC) 

512 

513class EDNS0DAU(_EDNS0Dummy): 

514 name = "DNSSEC Algorithm Understood (DAU)" 

515 fields_desc = [ShortEnumField("optcode", 5, edns0types), 

516 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 

517 FieldListField("alg_code", None, 

518 ByteEnumField("", 0, dnssecalgotypes), 

519 count_from=lambda pkt:pkt.optlen)] 

520 

521 

522class EDNS0DHU(_EDNS0Dummy): 

523 name = "DS Hash Understood (DHU)" 

524 fields_desc = [ShortEnumField("optcode", 6, edns0types), 

525 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 

526 FieldListField("alg_code", None, 

527 ByteEnumField("", 0, dnssecdigesttypes), 

528 count_from=lambda pkt:pkt.optlen)] 

529 

530 

531class EDNS0N3U(_EDNS0Dummy): 

532 name = "NSEC3 Hash Understood (N3U)" 

533 fields_desc = [ShortEnumField("optcode", 7, edns0types), 

534 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 

535 FieldListField("alg_code", None, 

536 ByteEnumField("", 0, dnssecnsec3algotypes), 

537 count_from=lambda pkt:pkt.optlen)] 

538 

539 

540# RFC 7871 - Client Subnet in DNS Queries 

541 

542class ClientSubnetv4(StrLenField): 

543 af_familly = socket.AF_INET 

544 af_length = 32 

545 af_default = b"\xc0" # 192.0.0.0 

546 

547 def getfield(self, pkt, s): 

548 # type: (Packet, bytes) -> Tuple[bytes, I] 

549 sz = operator.floordiv(self.length_from(pkt), 8) 

550 sz = min(sz, operator.floordiv(self.af_length, 8)) 

551 return s[sz:], self.m2i(pkt, s[:sz]) 

552 

553 def m2i(self, pkt, x): 

554 # type: (Optional[Packet], bytes) -> str 

555 padding = self.af_length - self.length_from(pkt) 

556 if padding: 

557 x += b"\x00" * operator.floordiv(padding, 8) 

558 x = x[: operator.floordiv(self.af_length, 8)] 

559 return inet_ntop(self.af_familly, x) 

560 

561 def _pack_subnet(self, subnet): 

562 # type: (bytes) -> bytes 

563 packed_subnet = inet_pton(self.af_familly, plain_str(subnet)) 

564 for i in list(range(operator.floordiv(self.af_length, 8)))[::-1]: 

565 if packed_subnet[i] != 0: 

566 i += 1 

567 break 

568 return packed_subnet[:i] 

569 

570 def i2m(self, pkt, x): 

571 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 

572 if x is None: 

573 return self.af_default 

574 try: 

575 return self._pack_subnet(x) 

576 except (OSError, socket.error): 

577 pkt.family = 2 

578 return ClientSubnetv6("", "")._pack_subnet(x) 

579 

580 def i2len(self, pkt, x): 

581 # type: (Packet, Any) -> int 

582 if x is None: 

583 return 1 

584 try: 

585 return len(self._pack_subnet(x)) 

586 except (OSError, socket.error): 

587 pkt.family = 2 

588 return len(ClientSubnetv6("", "")._pack_subnet(x)) 

589 

590 

591class ClientSubnetv6(ClientSubnetv4): 

592 af_familly = socket.AF_INET6 

593 af_length = 128 

594 af_default = b"\x20" # 2000:: 

595 

596 

597class EDNS0ClientSubnet(_EDNS0Dummy): 

598 name = "DNS EDNS0 Client Subnet" 

599 fields_desc = [ShortEnumField("optcode", 8, edns0types), 

600 FieldLenField("optlen", None, "address", fmt="H", 

601 adjust=lambda pkt, x: x + 4), 

602 ShortField("family", 1), 

603 FieldLenField("source_plen", None, 

604 length_of="address", 

605 fmt="B", 

606 adjust=lambda pkt, x: x * 8), 

607 ByteField("scope_plen", 0), 

608 MultipleTypeField( 

609 [(ClientSubnetv4("address", "192.168.0.0", 

610 length_from=lambda p: p.source_plen), 

611 lambda pkt: pkt.family == 1), 

612 (ClientSubnetv6("address", "2001:db8::", 

613 length_from=lambda p: p.source_plen), 

614 lambda pkt: pkt.family == 2)], 

615 ClientSubnetv4("address", "192.168.0.0", 

616 length_from=lambda p: p.source_plen))] 

617 

618 

619class EDNS0COOKIE(_EDNS0Dummy): 

620 name = "DNS EDNS0 COOKIE" 

621 fields_desc = [ShortEnumField("optcode", 10, edns0types), 

622 FieldLenField("optlen", None, length_of="server_cookie", fmt="!H", 

623 adjust=lambda pkt, x: x + 8), 

624 XStrFixedLenField("client_cookie", b"\x00" * 8, length=8), 

625 XStrLenField("server_cookie", "", 

626 length_from=lambda pkt: max(0, pkt.optlen - 8))] 

627 

628 

629# RFC 8914 - Extended DNS Errors 

630 

631# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#extended-dns-error-codes 

632extended_dns_error_codes = { 

633 0: "Other", 

634 1: "Unsupported DNSKEY Algorithm", 

635 2: "Unsupported DS Digest Type", 

636 3: "Stale Answer", 

637 4: "Forged Answer", 

638 5: "DNSSEC Indeterminate", 

639 6: "DNSSEC Bogus", 

640 7: "Signature Expired", 

641 8: "Signature Not Yet Valid", 

642 9: "DNSKEY Missing", 

643 10: "RRSIGs Missing", 

644 11: "No Zone Key Bit Set", 

645 12: "NSEC Missing", 

646 13: "Cached Error", 

647 14: "Not Ready", 

648 15: "Blocked", 

649 16: "Censored", 

650 17: "Filtered", 

651 18: "Prohibited", 

652 19: "Stale NXDOMAIN Answer", 

653 20: "Not Authoritative", 

654 21: "Not Supported", 

655 22: "No Reachable Authority", 

656 23: "Network Error", 

657 24: "Invalid Data", 

658 25: "Signature Expired before Valid", 

659 26: "Too Early", 

660 27: "Unsupported NSEC3 Iterations Value", 

661 28: "Unable to conform to policy", 

662 29: "Synthesized", 

663} 

664 

665 

666# https://www.rfc-editor.org/rfc/rfc8914.html 

667class EDNS0ExtendedDNSError(_EDNS0Dummy): 

668 name = "DNS EDNS0 Extended DNS Error" 

669 fields_desc = [ShortEnumField("optcode", 15, edns0types), 

670 FieldLenField("optlen", None, length_of="extra_text", fmt="!H", 

671 adjust=lambda pkt, x: x + 2), 

672 ShortEnumField("info_code", 0, extended_dns_error_codes), 

673 StrLenField("extra_text", "", 

674 length_from=lambda pkt: pkt.optlen - 2)] 

675 

676 

677EDNS0OPT_DISPATCHER = { 

678 4: EDNS0OWN, 

679 5: EDNS0DAU, 

680 6: EDNS0DHU, 

681 7: EDNS0N3U, 

682 8: EDNS0ClientSubnet, 

683 10: EDNS0COOKIE, 

684 15: EDNS0ExtendedDNSError, 

685} 

686 

687 

688# RFC 4034 - Resource Records for the DNS Security Extensions 

689 

690def bitmap2RRlist(bitmap): 

691 """ 

692 Decode the 'Type Bit Maps' field of the NSEC Resource Record into an 

693 integer list. 

694 """ 

695 # RFC 4034, 4.1.2. The Type Bit Maps Field 

696 

697 RRlist = [] 

698 

699 while bitmap: 

700 

701 if len(bitmap) < 2: 

702 log_runtime.info("bitmap too short (%i)", len(bitmap)) 

703 return 

704 

705 window_block = bitmap[0] # window number 

706 offset = 256 * window_block # offset of the Resource Record 

707 bitmap_len = bitmap[1] # length of the bitmap in bytes 

708 

709 if bitmap_len <= 0 or bitmap_len > 32: 

710 log_runtime.info("bitmap length is no valid (%i)", bitmap_len) 

711 return 

712 

713 tmp_bitmap = bitmap[2:2 + bitmap_len] 

714 

715 # Let's compare each bit of tmp_bitmap and compute the real RR value 

716 for b in range(len(tmp_bitmap)): 

717 v = 128 

718 for i in range(8): 

719 if tmp_bitmap[b] & v: 

720 # each of the RR is encoded as a bit 

721 RRlist += [offset + b * 8 + i] 

722 v = v >> 1 

723 

724 # Next block if any 

725 bitmap = bitmap[2 + bitmap_len:] 

726 

727 return RRlist 

728 

729 

730def RRlist2bitmap(lst): 

731 """ 

732 Encode a list of integers representing Resource Records to a bitmap field 

733 used in the NSEC Resource Record. 

734 """ 

735 # RFC 4034, 4.1.2. The Type Bit Maps Field 

736 

737 import math 

738 

739 bitmap = b"" 

740 lst = [abs(x) for x in sorted(set(lst)) if x <= 65535] 

741 

742 # number of window blocks 

743 max_window_blocks = int(math.ceil(lst[-1] / 256.)) 

744 min_window_blocks = int(math.floor(lst[0] / 256.)) 

745 if min_window_blocks == max_window_blocks: 

746 max_window_blocks += 1 

747 

748 for wb in range(min_window_blocks, max_window_blocks + 1): 

749 # First, filter out RR not encoded in the current window block 

750 # i.e. keep everything between 256*wb <= 256*(wb+1) 

751 rrlist = sorted(x for x in lst if 256 * wb <= x < 256 * (wb + 1)) 

752 if not rrlist: 

753 continue 

754 

755 # Compute the number of bytes used to store the bitmap 

756 if rrlist[-1] == 0: # only one element in the list 

757 bytes_count = 1 

758 else: 

759 max = rrlist[-1] - 256 * wb 

760 bytes_count = int(math.ceil(max // 8)) + 1 # use at least 1 byte 

761 if bytes_count > 32: # Don't encode more than 256 bits / values 

762 bytes_count = 32 

763 

764 bitmap += struct.pack("BB", wb, bytes_count) 

765 

766 # Generate the bitmap 

767 # The idea is to remove out of range Resource Records with these steps 

768 # 1. rescale to fit into 8 bits 

769 # 2. x gives the bit position ; compute the corresponding value 

770 # 3. sum everything 

771 bitmap += b"".join( 

772 struct.pack( 

773 b"B", 

774 sum(2 ** (7 - (x - 256 * wb) + (tmp * 8)) for x in rrlist 

775 if 256 * wb + 8 * tmp <= x < 256 * wb + 8 * tmp + 8), 

776 ) for tmp in range(bytes_count) 

777 ) 

778 

779 return bitmap 

780 

781 

782class RRlistField(StrField): 

783 islist = 1 

784 

785 def h2i(self, pkt, x): 

786 if x and isinstance(x, list): 

787 return RRlist2bitmap(x) 

788 return x 

789 

790 def i2repr(self, pkt, x): 

791 if not x: 

792 return "[]" 

793 x = self.i2h(pkt, x) 

794 rrlist = bitmap2RRlist(x) 

795 return [dnstypes.get(rr, rr) for rr in rrlist] if rrlist else repr(x) 

796 

797 

798class _DNSRRdummy(Packet): 

799 name = "Dummy class that implements post_build() for Resource Records" 

800 

801 def post_build(self, pkt, pay): 

802 if self.rdlen is not None: 

803 return pkt + pay 

804 

805 lrrname = len(self.fields_desc[0].i2m("", self.getfieldval("rrname"))) 

806 tmp_len = len(pkt) - lrrname - 10 

807 tmp_pkt = pkt[:lrrname + 8] 

808 pkt = struct.pack("!H", tmp_len) + pkt[lrrname + 8 + 2:] 

809 

810 return tmp_pkt + pkt + pay 

811 

812 def default_payload_class(self, payload): 

813 return conf.padding_layer 

814 

815 

816class DNSRRHINFO(_DNSRRdummy): 

817 name = "DNS HINFO Resource Record" 

818 fields_desc = [DNSStrField("rrname", ""), 

819 ShortEnumField("type", 13, dnstypes), 

820 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

821 BitEnumField("rclass", 1, 15, dnsclasses), 

822 IntField("ttl", 0), 

823 ShortField("rdlen", None), 

824 FieldLenField("cpulen", None, fmt="!B", length_of="cpu"), 

825 StrLenField("cpu", "", length_from=lambda x: x.cpulen), 

826 FieldLenField("oslen", None, fmt="!B", length_of="os"), 

827 StrLenField("os", "", length_from=lambda x: x.oslen)] 

828 

829 

830class DNSRRMX(_DNSRRdummy): 

831 name = "DNS MX Resource Record" 

832 fields_desc = [DNSStrField("rrname", ""), 

833 ShortEnumField("type", 15, dnstypes), 

834 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

835 BitEnumField("rclass", 1, 15, dnsclasses), 

836 IntField("ttl", 0), 

837 ShortField("rdlen", None), 

838 ShortField("preference", 0), 

839 DNSStrField("exchange", ""), 

840 ] 

841 

842 

843class DNSRRSOA(_DNSRRdummy): 

844 name = "DNS SOA Resource Record" 

845 fields_desc = [DNSStrField("rrname", ""), 

846 ShortEnumField("type", 6, dnstypes), 

847 ShortEnumField("rclass", 1, dnsclasses), 

848 IntField("ttl", 0), 

849 ShortField("rdlen", None), 

850 DNSStrField("mname", ""), 

851 DNSStrField("rname", ""), 

852 IntField("serial", 0), 

853 IntField("refresh", 0), 

854 IntField("retry", 0), 

855 IntField("expire", 0), 

856 IntField("minimum", 0) 

857 ] 

858 

859 

860class DNSRRRSIG(_DNSRRdummy): 

861 name = "DNS RRSIG Resource Record" 

862 fields_desc = [DNSStrField("rrname", ""), 

863 ShortEnumField("type", 46, dnstypes), 

864 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

865 BitEnumField("rclass", 1, 15, dnsclasses), 

866 IntField("ttl", 0), 

867 ShortField("rdlen", None), 

868 ShortEnumField("typecovered", 1, dnstypes), 

869 ByteEnumField("algorithm", 5, dnssecalgotypes), 

870 ByteField("labels", 0), 

871 IntField("originalttl", 0), 

872 UTCTimeField("expiration", 0), 

873 UTCTimeField("inception", 0), 

874 ShortField("keytag", 0), 

875 DNSStrField("signersname", ""), 

876 StrField("signature", "") 

877 ] 

878 

879 

880class DNSRRNSEC(_DNSRRdummy): 

881 name = "DNS NSEC Resource Record" 

882 fields_desc = [DNSStrField("rrname", ""), 

883 ShortEnumField("type", 47, dnstypes), 

884 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

885 BitEnumField("rclass", 1, 15, dnsclasses), 

886 IntField("ttl", 0), 

887 ShortField("rdlen", None), 

888 DNSStrField("nextname", ""), 

889 RRlistField("typebitmaps", []) 

890 ] 

891 

892 

893class DNSRRDNSKEY(_DNSRRdummy): 

894 name = "DNS DNSKEY Resource Record" 

895 fields_desc = [DNSStrField("rrname", ""), 

896 ShortEnumField("type", 48, dnstypes), 

897 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

898 BitEnumField("rclass", 1, 15, dnsclasses), 

899 IntField("ttl", 0), 

900 ShortField("rdlen", None), 

901 FlagsField("flags", 256, 16, "S???????Z???????"), 

902 # S: Secure Entry Point 

903 # Z: Zone Key 

904 ByteField("protocol", 3), 

905 ByteEnumField("algorithm", 5, dnssecalgotypes), 

906 StrField("publickey", "") 

907 ] 

908 

909 

910class DNSRRDS(_DNSRRdummy): 

911 name = "DNS DS Resource Record" 

912 fields_desc = [DNSStrField("rrname", ""), 

913 ShortEnumField("type", 43, dnstypes), 

914 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

915 BitEnumField("rclass", 1, 15, dnsclasses), 

916 IntField("ttl", 0), 

917 ShortField("rdlen", None), 

918 ShortField("keytag", 0), 

919 ByteEnumField("algorithm", 5, dnssecalgotypes), 

920 ByteEnumField("digesttype", 5, dnssecdigesttypes), 

921 StrField("digest", "") 

922 ] 

923 

924 

925# RFC 5074 - DNSSEC Lookaside Validation (DLV) 

926class DNSRRDLV(DNSRRDS): 

927 name = "DNS DLV Resource Record" 

928 

929 def __init__(self, *args, **kargs): 

930 DNSRRDS.__init__(self, *args, **kargs) 

931 if not kargs.get('type', 0): 

932 self.type = 32769 

933 

934# RFC 5155 - DNS Security (DNSSEC) Hashed Authenticated Denial of Existence 

935 

936 

937class DNSRRNSEC3(_DNSRRdummy): 

938 name = "DNS NSEC3 Resource Record" 

939 fields_desc = [DNSStrField("rrname", ""), 

940 ShortEnumField("type", 50, dnstypes), 

941 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

942 BitEnumField("rclass", 1, 15, dnsclasses), 

943 IntField("ttl", 0), 

944 ShortField("rdlen", None), 

945 ByteField("hashalg", 0), 

946 BitEnumField("flags", 0, 8, {1: "Opt-Out"}), 

947 ShortField("iterations", 0), 

948 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"), 

949 StrLenField("salt", "", length_from=lambda x: x.saltlength), 

950 FieldLenField("hashlength", 0, fmt="!B", length_of="nexthashedownername"), # noqa: E501 

951 StrLenField("nexthashedownername", "", length_from=lambda x: x.hashlength), # noqa: E501 

952 RRlistField("typebitmaps", []) 

953 ] 

954 

955 

956class DNSRRNSEC3PARAM(_DNSRRdummy): 

957 name = "DNS NSEC3PARAM Resource Record" 

958 fields_desc = [DNSStrField("rrname", ""), 

959 ShortEnumField("type", 51, dnstypes), 

960 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

961 BitEnumField("rclass", 1, 15, dnsclasses), 

962 IntField("ttl", 0), 

963 ShortField("rdlen", None), 

964 ByteField("hashalg", 0), 

965 ByteField("flags", 0), 

966 ShortField("iterations", 0), 

967 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"), 

968 StrLenField("salt", "", length_from=lambda pkt: pkt.saltlength) # noqa: E501 

969 ] 

970 

971 

972# RFC 9460 Service Binding and Parameter Specification via the DNS 

973# https://www.rfc-editor.org/rfc/rfc9460.html 

974 

975 

976# https://www.iana.org/assignments/dns-svcb/dns-svcb.xhtml 

977svc_param_keys = { 

978 0: "mandatory", 

979 1: "alpn", 

980 2: "no-default-alpn", 

981 3: "port", 

982 4: "ipv4hint", 

983 5: "ech", 

984 6: "ipv6hint", 

985 7: "dohpath", 

986 8: "ohttp", 

987} 

988 

989 

990class SvcParam(Packet): 

991 name = "SvcParam" 

992 fields_desc = [ShortEnumField("key", 0, svc_param_keys), 

993 FieldLenField("len", None, length_of="value", fmt="H"), 

994 MultipleTypeField( 

995 [ 

996 # mandatory 

997 (FieldListField("value", [], 

998 ShortEnumField("", 0, svc_param_keys), 

999 length_from=lambda pkt: pkt.len), 

1000 lambda pkt: pkt.key == 0), 

1001 # alpn, no-default-alpn 

1002 (DNSTextField("value", [], 

1003 length_from=lambda pkt: pkt.len), 

1004 lambda pkt: pkt.key in (1, 2)), 

1005 # port 

1006 (ShortField("value", 0), 

1007 lambda pkt: pkt.key == 3), 

1008 # ipv4hint 

1009 (FieldListField("value", [], 

1010 IPField("", "0.0.0.0"), 

1011 length_from=lambda pkt: pkt.len), 

1012 lambda pkt: pkt.key == 4), 

1013 # ipv6hint 

1014 (FieldListField("value", [], 

1015 IP6Field("", "::"), 

1016 length_from=lambda pkt: pkt.len), 

1017 lambda pkt: pkt.key == 6), 

1018 ], 

1019 StrLenField("value", "", 

1020 length_from=lambda pkt:pkt.len))] 

1021 

1022 def extract_padding(self, p): 

1023 return "", p 

1024 

1025 

1026class DNSRRSVCB(_DNSRRdummy): 

1027 name = "DNS SVCB Resource Record" 

1028 fields_desc = [DNSStrField("rrname", ""), 

1029 ShortEnumField("type", 64, dnstypes), 

1030 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

1031 BitEnumField("rclass", 1, 15, dnsclasses), 

1032 IntField("ttl", 0), 

1033 ShortField("rdlen", None), 

1034 ShortField("svc_priority", 0), 

1035 DNSStrField("target_name", ""), 

1036 PacketListField("svc_params", [], SvcParam)] 

1037 

1038 

1039class DNSRRHTTPS(_DNSRRdummy): 

1040 name = "DNS HTTPS Resource Record" 

1041 fields_desc = [DNSStrField("rrname", ""), 

1042 ShortEnumField("type", 65, dnstypes) 

1043 ] + DNSRRSVCB.fields_desc[2:] 

1044 

1045 

1046# RFC 2782 - A DNS RR for specifying the location of services (DNS SRV) 

1047 

1048 

1049class DNSRRSRV(_DNSRRdummy): 

1050 name = "DNS SRV Resource Record" 

1051 fields_desc = [DNSStrField("rrname", ""), 

1052 ShortEnumField("type", 33, dnstypes), 

1053 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

1054 BitEnumField("rclass", 1, 15, dnsclasses), 

1055 IntField("ttl", 0), 

1056 ShortField("rdlen", None), 

1057 ShortField("priority", 0), 

1058 ShortField("weight", 0), 

1059 ShortField("port", 0), 

1060 DNSStrField("target", ""), ] 

1061 

1062 

1063# RFC 2845 - Secret Key Transaction Authentication for DNS (TSIG) 

1064tsig_algo_sizes = {"HMAC-MD5.SIG-ALG.REG.INT": 16, 

1065 "hmac-sha1": 20} 

1066 

1067 

1068class TimeSignedField(Field[int, bytes]): 

1069 def __init__(self, name, default): 

1070 Field.__init__(self, name, default, fmt="6s") 

1071 

1072 def _convert_seconds(self, packed_seconds): 

1073 """Unpack the internal representation.""" 

1074 seconds = struct.unpack("!H", packed_seconds[:2])[0] 

1075 seconds += struct.unpack("!I", packed_seconds[2:])[0] 

1076 return seconds 

1077 

1078 def i2m(self, pkt, seconds): 

1079 """Convert the number of seconds since 1-Jan-70 UTC to the packed 

1080 representation.""" 

1081 

1082 if seconds is None: 

1083 seconds = 0 

1084 

1085 tmp_short = (seconds >> 32) & 0xFFFF 

1086 tmp_int = seconds & 0xFFFFFFFF 

1087 

1088 return struct.pack("!HI", tmp_short, tmp_int) 

1089 

1090 def m2i(self, pkt, packed_seconds): 

1091 """Convert the internal representation to the number of seconds 

1092 since 1-Jan-70 UTC.""" 

1093 

1094 if packed_seconds is None: 

1095 return None 

1096 

1097 return self._convert_seconds(packed_seconds) 

1098 

1099 def i2repr(self, pkt, packed_seconds): 

1100 """Convert the internal representation to a nice one using the RFC 

1101 format.""" 

1102 time_struct = time.gmtime(packed_seconds) 

1103 return time.strftime("%a %b %d %H:%M:%S %Y", time_struct) 

1104 

1105 

1106class DNSRRTSIG(_DNSRRdummy): 

1107 name = "DNS TSIG Resource Record" 

1108 fields_desc = [DNSStrField("rrname", ""), 

1109 ShortEnumField("type", 250, dnstypes), 

1110 ShortEnumField("rclass", 1, dnsclasses), 

1111 IntField("ttl", 0), 

1112 ShortField("rdlen", None), 

1113 DNSStrField("algo_name", "hmac-sha1"), 

1114 TimeSignedField("time_signed", 0), 

1115 ShortField("fudge", 0), 

1116 FieldLenField("mac_len", 20, fmt="!H", length_of="mac_data"), # noqa: E501 

1117 StrLenField("mac_data", "", length_from=lambda pkt: pkt.mac_len), # noqa: E501 

1118 ShortField("original_id", 0), 

1119 ShortField("error", 0), 

1120 FieldLenField("other_len", 0, fmt="!H", length_of="other_data"), # noqa: E501 

1121 StrLenField("other_data", "", length_from=lambda pkt: pkt.other_len) # noqa: E501 

1122 ] 

1123 

1124 

1125class DNSRRNAPTR(_DNSRRdummy): 

1126 name = "DNS NAPTR Resource Record" 

1127 fields_desc = [DNSStrField("rrname", ""), 

1128 ShortEnumField("type", 35, dnstypes), 

1129 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

1130 BitEnumField("rclass", 1, 15, dnsclasses), 

1131 IntField("ttl", 0), 

1132 ShortField("rdlen", None), 

1133 ShortField("order", 0), 

1134 ShortField("preference", 0), 

1135 FieldLenField("flags_len", None, fmt="!B", length_of="flags"), 

1136 StrLenField("flags", "", length_from=lambda pkt: pkt.flags_len), 

1137 FieldLenField("services_len", None, fmt="!B", length_of="services"), 

1138 StrLenField("services", "", 

1139 length_from=lambda pkt: pkt.services_len), 

1140 FieldLenField("regexp_len", None, fmt="!B", length_of="regexp"), 

1141 StrLenField("regexp", "", length_from=lambda pkt: pkt.regexp_len), 

1142 DNSStrField("replacement", ""), 

1143 ] 

1144 

1145 

1146DNSRR_DISPATCHER = { 

1147 6: DNSRRSOA, # RFC 1035 

1148 13: DNSRRHINFO, # RFC 1035 

1149 15: DNSRRMX, # RFC 1035 

1150 33: DNSRRSRV, # RFC 2782 

1151 35: DNSRRNAPTR, # RFC 2915 

1152 41: DNSRROPT, # RFC 1671 

1153 43: DNSRRDS, # RFC 4034 

1154 46: DNSRRRSIG, # RFC 4034 

1155 47: DNSRRNSEC, # RFC 4034 

1156 48: DNSRRDNSKEY, # RFC 4034 

1157 50: DNSRRNSEC3, # RFC 5155 

1158 51: DNSRRNSEC3PARAM, # RFC 5155 

1159 64: DNSRRSVCB, # RFC 9460 

1160 65: DNSRRHTTPS, # RFC 9460 

1161 250: DNSRRTSIG, # RFC 2845 

1162 32769: DNSRRDLV, # RFC 4431 

1163} 

1164 

1165 

1166class DNSRR(Packet): 

1167 name = "DNS Resource Record" 

1168 show_indent = 0 

1169 fields_desc = [DNSStrField("rrname", ""), 

1170 ShortEnumField("type", 1, dnstypes), 

1171 BitField("cacheflush", 0, 1), # mDNS RFC 6762 

1172 BitEnumField("rclass", 1, 15, dnsclasses), 

1173 IntField("ttl", 0), 

1174 FieldLenField("rdlen", None, length_of="rdata", fmt="H"), 

1175 MultipleTypeField( 

1176 [ 

1177 # A 

1178 (IPField("rdata", "0.0.0.0"), 

1179 lambda pkt: pkt.type == 1), 

1180 # AAAA 

1181 (IP6Field("rdata", "::"), 

1182 lambda pkt: pkt.type == 28), 

1183 # NS, MD, MF, CNAME, PTR, DNAME 

1184 (DNSStrField("rdata", "", 

1185 length_from=lambda pkt: pkt.rdlen), 

1186 lambda pkt: pkt.type in [2, 3, 4, 5, 12, 39]), 

1187 # TEXT 

1188 (DNSTextField("rdata", [""], 

1189 length_from=lambda pkt: pkt.rdlen), 

1190 lambda pkt: pkt.type == 16), 

1191 ], 

1192 StrLenField("rdata", "", 

1193 length_from=lambda pkt:pkt.rdlen) 

1194 )] 

1195 

1196 def default_payload_class(self, payload): 

1197 return conf.padding_layer 

1198 

1199 

1200def _DNSRR(s, **kwargs): 

1201 """ 

1202 DNSRR dispatcher func 

1203 """ 

1204 if s: 

1205 # Try to find the type of the RR using the dispatcher 

1206 _, remain = dns_get_str(s, _ignore_compression=True) 

1207 cls = DNSRR_DISPATCHER.get( 

1208 struct.unpack("!H", remain[:2])[0], 

1209 DNSRR, 

1210 ) 

1211 rrlen = ( 

1212 len(s) - len(remain) + # rrname len 

1213 10 + 

1214 struct.unpack("!H", remain[8:10])[0] 

1215 ) 

1216 pkt = cls(s[:rrlen], **kwargs) / conf.padding_layer(s[rrlen:]) 

1217 # drop rdlen because if rdata was compressed, it will break everything 

1218 # when rebuilding 

1219 del pkt.fields["rdlen"] 

1220 return pkt 

1221 return None 

1222 

1223 

1224class DNSQR(Packet): 

1225 name = "DNS Question Record" 

1226 show_indent = 0 

1227 fields_desc = [DNSStrField("qname", "www.example.com"), 

1228 ShortEnumField("qtype", 1, dnsqtypes), 

1229 BitField("unicastresponse", 0, 1), # mDNS RFC 6762 

1230 BitEnumField("qclass", 1, 15, dnsclasses)] 

1231 

1232 def default_payload_class(self, payload): 

1233 return conf.padding_layer 

1234 

1235 

1236class _DNSPacketListField(PacketListField): 

1237 # A normal PacketListField with backward-compatible hacks 

1238 def any2i(self, pkt, x): 

1239 # type: (Optional[Packet], List[Any]) -> List[Any] 

1240 if x is None: 

1241 warnings.warn( 

1242 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now " 

1243 "PacketListField(s) ! " 

1244 "Setting a null default should be [] instead of None"), 

1245 DeprecationWarning 

1246 ) 

1247 x = [] 

1248 return super(_DNSPacketListField, self).any2i(pkt, x) 

1249 

1250 def i2h(self, pkt, x): 

1251 # type: (Optional[Packet], List[Packet]) -> Any 

1252 class _list(list): 

1253 """ 

1254 Fake list object to provide compatibility with older DNS fields 

1255 """ 

1256 def __getattr__(self, attr): 

1257 try: 

1258 ret = getattr(self[0], attr) 

1259 warnings.warn( 

1260 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now " 

1261 "PacketListField(s) ! " 

1262 "To access the first element, use pkt.an[0] instead of " 

1263 "pkt.an"), 

1264 DeprecationWarning 

1265 ) 

1266 return ret 

1267 except AttributeError: 

1268 raise 

1269 return _list(x) 

1270 

1271 

1272class DNS(DNSCompressedPacket): 

1273 name = "DNS" 

1274 FORCE_TCP = False 

1275 fields_desc = [ 

1276 ConditionalField(ShortField("length", None), 

1277 lambda p: p.FORCE_TCP or isinstance(p.underlayer, TCP)), 

1278 ShortField("id", 0), 

1279 BitField("qr", 0, 1), 

1280 BitEnumField("opcode", 0, 4, {0: "QUERY", 1: "IQUERY", 2: "STATUS"}), 

1281 BitField("aa", 0, 1), 

1282 BitField("tc", 0, 1), 

1283 BitField("rd", 1, 1), 

1284 BitField("ra", 0, 1), 

1285 BitField("z", 0, 1), 

1286 # AD and CD bits are defined in RFC 2535 

1287 BitField("ad", 0, 1), # Authentic Data 

1288 BitField("cd", 0, 1), # Checking Disabled 

1289 BitEnumField("rcode", 0, 4, {0: "ok", 1: "format-error", 

1290 2: "server-failure", 3: "name-error", 

1291 4: "not-implemented", 5: "refused"}), 

1292 FieldLenField("qdcount", None, count_of="qd"), 

1293 FieldLenField("ancount", None, count_of="an"), 

1294 FieldLenField("nscount", None, count_of="ns"), 

1295 FieldLenField("arcount", None, count_of="ar"), 

1296 _DNSPacketListField("qd", [DNSQR()], DNSQR, count_from=lambda pkt: pkt.qdcount), 

1297 _DNSPacketListField("an", [], _DNSRR, count_from=lambda pkt: pkt.ancount), 

1298 _DNSPacketListField("ns", [], _DNSRR, count_from=lambda pkt: pkt.nscount), 

1299 _DNSPacketListField("ar", [], _DNSRR, count_from=lambda pkt: pkt.arcount), 

1300 ] 

1301 

1302 def get_full(self): 

1303 # Required for DNSCompressedPacket 

1304 if isinstance(self.underlayer, TCP) or self.FORCE_TCP: 

1305 return self.original[2:] 

1306 else: 

1307 return self.original 

1308 

1309 def answers(self, other): 

1310 return (isinstance(other, DNS) and 

1311 self.id == other.id and 

1312 self.qr == 1 and 

1313 other.qr == 0) 

1314 

1315 def mysummary(self): 

1316 name = "" 

1317 if self.qr: 

1318 type = "Ans" 

1319 if self.an and isinstance(self.an[0], DNSRR): 

1320 name = ' %s' % self.an[0].rdata 

1321 elif self.rcode != 0: 

1322 name = self.sprintf(' %rcode%') 

1323 else: 

1324 type = "Qry" 

1325 if self.qd and isinstance(self.qd[0], DNSQR): 

1326 name = ' %s' % self.qd[0].qname 

1327 return "%sDNS %s%s" % ( 

1328 "m" 

1329 if isinstance(self.underlayer, UDP) and self.underlayer.dport == 5353 

1330 else "", 

1331 type, 

1332 name, 

1333 ) 

1334 

1335 def post_build(self, pkt, pay): 

1336 if ( 

1337 (isinstance(self.underlayer, TCP) or self.FORCE_TCP) and 

1338 self.length is None 

1339 ): 

1340 pkt = struct.pack("!H", len(pkt) - 2) + pkt[2:] 

1341 return pkt + pay 

1342 

1343 def compress(self): 

1344 """Return the compressed DNS packet (using `dns_compress()`)""" 

1345 return dns_compress(self) 

1346 

1347 def pre_dissect(self, s): 

1348 """ 

1349 Check that a valid DNS over TCP message can be decoded 

1350 """ 

1351 if isinstance(self.underlayer, TCP): 

1352 

1353 # Compute the length of the DNS packet 

1354 if len(s) >= 2: 

1355 dns_len = struct.unpack("!H", s[:2])[0] 

1356 else: 

1357 message = "Malformed DNS message: too small!" 

1358 log_runtime.info(message) 

1359 raise Scapy_Exception(message) 

1360 

1361 # Check if the length is valid 

1362 if dns_len < 14 or len(s) < dns_len: 

1363 message = "Malformed DNS message: invalid length!" 

1364 log_runtime.info(message) 

1365 raise Scapy_Exception(message) 

1366 

1367 return s 

1368 

1369 

1370class DNSTCP(DNS): 

1371 """ 

1372 A DNS packet that is always under TCP 

1373 """ 

1374 FORCE_TCP = True 

1375 match_subclass = True 

1376 

1377 

1378bind_layers(UDP, DNS, dport=5353) 

1379bind_layers(UDP, DNS, sport=5353) 

1380bind_layers(UDP, DNS, dport=53) 

1381bind_layers(UDP, DNS, sport=53) 

1382DestIPField.bind_addr(UDP, "224.0.0.251", dport=5353) 

1383if conf.ipv6_enabled: 

1384 from scapy.layers.inet6 import DestIP6Field 

1385 DestIP6Field.bind_addr(UDP, "ff02::fb", dport=5353) 

1386bind_layers(TCP, DNS, dport=53) 

1387bind_layers(TCP, DNS, sport=53) 

1388 

1389# Nameserver config 

1390conf.nameservers = read_nameservers() 

1391_dns_cache = conf.netcache.new_cache("dns_cache", 300) 

1392 

1393 

1394@conf.commands.register 

1395def dns_resolve(qname, qtype="A", raw=False, tcp=False, verbose=1, timeout=3, **kwargs): 

1396 """ 

1397 Perform a simple DNS resolution using conf.nameservers with caching 

1398 

1399 :param qname: the name to query 

1400 :param qtype: the type to query (default A) 

1401 :param raw: return the whole DNS packet (default False) 

1402 :param tcp: whether to use directly TCP instead of UDP. If truncated is received, 

1403 UDP automatically retries in TCP. (default: False) 

1404 :param verbose: show verbose errors 

1405 :param timeout: seconds until timeout (per server) 

1406 :raise TimeoutError: if no DNS servers were reached in time. 

1407 """ 

1408 # Unify types 

1409 qtype = DNSQR.qtype.any2i_one(None, qtype) 

1410 qname = DNSQR.qname.any2i(None, qname) 

1411 # Check cache 

1412 cache_ident = b";".join( 

1413 [qname, struct.pack("!B", qtype)] + 

1414 ([b"raw"] if raw else []) 

1415 ) 

1416 result = _dns_cache.get(cache_ident) 

1417 if result: 

1418 return result 

1419 

1420 kwargs.setdefault("timeout", timeout) 

1421 kwargs.setdefault("verbose", 0) # hide sr1() output 

1422 res = None 

1423 for nameserver in conf.nameservers: 

1424 # Try all nameservers 

1425 try: 

1426 # Spawn a socket, connect to the nameserver on port 53 

1427 if tcp: 

1428 cls = DNSTCP 

1429 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

1430 else: 

1431 cls = DNS 

1432 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

1433 sock.settimeout(kwargs["timeout"]) 

1434 sock.connect((nameserver, 53)) 

1435 # Connected. Wrap it with DNS 

1436 sock = StreamSocket(sock, cls) 

1437 # I/O 

1438 res = sock.sr1( 

1439 cls(qd=[DNSQR(qname=qname, qtype=qtype)], id=RandShort()), 

1440 **kwargs, 

1441 ) 

1442 except IOError as ex: 

1443 if verbose: 

1444 log_runtime.warning(str(ex)) 

1445 continue 

1446 finally: 

1447 sock.close() 

1448 if res: 

1449 # We have a response ! Check for failure 

1450 if res[DNS].tc == 1: # truncated ! 

1451 if not tcp: 

1452 # Retry using TCP 

1453 return dns_resolve( 

1454 qname=qname, 

1455 qtype=qtype, 

1456 raw=raw, 

1457 tcp=True, 

1458 **kwargs, 

1459 ) 

1460 elif verbose: 

1461 log_runtime.info("DNS answer is truncated !") 

1462 

1463 if res[DNS].rcode == 2: # server failure 

1464 res = None 

1465 if verbose: 

1466 log_runtime.info( 

1467 "DNS: %s answered with failure for %s" % ( 

1468 nameserver, 

1469 qname, 

1470 ) 

1471 ) 

1472 else: 

1473 break 

1474 if res is not None: 

1475 if raw: 

1476 # Raw 

1477 result = res 

1478 else: 

1479 # Find answers 

1480 result = [ 

1481 x 

1482 for x in itertools.chain(res.an, res.ns, res.ar) 

1483 if x.type == qtype 

1484 ] 

1485 if result: 

1486 # Cache it 

1487 _dns_cache[cache_ident] = result 

1488 return result 

1489 else: 

1490 raise TimeoutError 

1491 

1492 

1493@conf.commands.register 

1494def dyndns_add(nameserver, name, rdata, type="A", ttl=10): 

1495 """Send a DNS add message to a nameserver for "name" to have a new "rdata" 

1496dyndns_add(nameserver, name, rdata, type="A", ttl=10) -> result code (0=ok) 

1497 

1498example: dyndns_add("ns1.toto.com", "dyn.toto.com", "127.0.0.1") 

1499RFC2136 

1500""" 

1501 zone = name[name.find(".") + 1:] 

1502 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5, 

1503 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501 

1504 ns=[DNSRR(rrname=name, type="A", 

1505 ttl=ttl, rdata=rdata)]), 

1506 verbose=0, timeout=5) 

1507 if r and r.haslayer(DNS): 

1508 return r.getlayer(DNS).rcode 

1509 else: 

1510 return -1 

1511 

1512 

1513@conf.commands.register 

1514def dyndns_del(nameserver, name, type="ALL", ttl=10): 

1515 """Send a DNS delete message to a nameserver for "name" 

1516dyndns_del(nameserver, name, type="ANY", ttl=10) -> result code (0=ok) 

1517 

1518example: dyndns_del("ns1.toto.com", "dyn.toto.com") 

1519RFC2136 

1520""" 

1521 zone = name[name.find(".") + 1:] 

1522 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5, 

1523 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501 

1524 ns=[DNSRR(rrname=name, type=type, 

1525 rclass="ANY", ttl=0, rdata="")]), # noqa: E501 

1526 verbose=0, timeout=5) 

1527 if r and r.haslayer(DNS): 

1528 return r.getlayer(DNS).rcode 

1529 else: 

1530 return -1 

1531 

1532 

1533class DNS_am(AnsweringMachine): 

1534 function_name = "dnsd" 

1535 filter = "udp port 53" 

1536 cls = DNS # We also use this automaton for llmnrd / mdnsd 

1537 

1538 def parse_options(self, joker=None, 

1539 match=None, 

1540 srvmatch=None, 

1541 joker6=False, 

1542 send_error=False, 

1543 relay=False, 

1544 from_ip=True, 

1545 from_ip6=False, 

1546 src_ip=None, 

1547 src_ip6=None, 

1548 ttl=10, 

1549 jokerarpa=False): 

1550 """ 

1551 Simple DNS answering machine. 

1552 

1553 :param joker: default IPv4 for unresolved domains. 

1554 Set to False to disable, None to mirror the interface's IP. 

1555 Defaults to None, unless 'match' is used, then it defaults to 

1556 False. 

1557 :param joker6: default IPv6 for unresolved domains. 

1558 Set to False to disable, None to mirror the interface's IPv6. 

1559 Defaults to False. 

1560 :param match: queries to match. 

1561 This can be a dictionary of {name: val} where name is a string 

1562 representing a domain name (A, AAAA) and val is a tuple of 2 

1563 elements, each representing an IP or a list of IPs. If val is 

1564 a single element, (A, None) is assumed. 

1565 This can also be a list or names, in which case joker(6) are 

1566 used as a response. 

1567 :param jokerarpa: answer for .in-addr.arpa PTR requests. (Default: False) 

1568 :param relay: relay unresolved domains to conf.nameservers (Default: False). 

1569 :param send_error: send an error message when this server can't answer 

1570 (Default: False) 

1571 :param srvmatch: a dictionary of {name: (port, target)} used for SRV 

1572 :param from_ip: an source IP to filter. Can contain a netmask. True for all, 

1573 False for none. Default True 

1574 :param from_ip6: an source IPv6 to filter. Can contain a netmask. True for all, 

1575 False for none. Default False 

1576 :param ttl: the DNS time to live (in seconds) 

1577 :param src_ip: override the source IP 

1578 :param src_ip6: 

1579 

1580 Examples: 

1581 

1582 - Answer all 'A' and 'AAAA' requests:: 

1583 

1584 $ sudo iptables -I OUTPUT -p icmp --icmp-type 3/3 -j DROP 

1585 >>> dnsd(joker="192.168.0.2", joker6="fe80::260:8ff:fe52:f9d8", 

1586 ... iface="eth0") 

1587 

1588 - Answer only 'A' query for google.com with 192.168.0.2:: 

1589 

1590 >>> dnsd(match={"google.com": "192.168.0.2"}, iface="eth0") 

1591 

1592 - Answer DNS for a Windows domain controller ('SRV', 'A' and 'AAAA'):: 

1593 

1594 >>> dnsd( 

1595 ... srvmatch={ 

1596 ... "_ldap._tcp.dc._msdcs.DOMAIN.LOCAL.": (389, 

1597 ... "srv1.domain.local"), 

1598 ... }, 

1599 ... match={"src1.domain.local": ("192.168.0.102", 

1600 ... "fe80::260:8ff:fe52:f9d8")}, 

1601 ... ) 

1602 

1603 - Relay all queries to another DNS server, except some:: 

1604 

1605 >>> conf.nameservers = ["1.1.1.1"] # server to relay to 

1606 >>> dnsd( 

1607 ... match={"test.com": "1.1.1.1"}, 

1608 ... relay=True, 

1609 ... ) 

1610 """ 

1611 from scapy.layers.inet6 import Net6 

1612 

1613 self.mDNS = isinstance(self, mDNS_am) 

1614 self.llmnr = self.cls != DNS 

1615 

1616 # Add some checks (to help) 

1617 if not isinstance(joker, (str, bool)) and joker is not None: 

1618 raise ValueError("Bad 'joker': should be an IPv4 (str) or False !") 

1619 if not isinstance(joker6, (str, bool)) and joker6 is not None: 

1620 raise ValueError("Bad 'joker6': should be an IPv6 (str) or False !") 

1621 if not isinstance(jokerarpa, (str, bool)): 

1622 raise ValueError("Bad 'jokerarpa': should be a hostname or False !") 

1623 if not isinstance(from_ip, (str, Net, bool)): 

1624 raise ValueError("Bad 'from_ip': should be an IPv4 (str), Net or False !") 

1625 if not isinstance(from_ip6, (str, Net6, bool)): 

1626 raise ValueError("Bad 'from_ip6': should be an IPv6 (str), Net or False !") 

1627 if self.mDNS and src_ip: 

1628 raise ValueError("Cannot use 'src_ip' in mDNS !") 

1629 if self.mDNS and src_ip6: 

1630 raise ValueError("Cannot use 'src_ip6' in mDNS !") 

1631 

1632 if joker is None and match is not None: 

1633 joker = False 

1634 self.joker = joker 

1635 self.joker6 = joker6 

1636 self.jokerarpa = jokerarpa 

1637 

1638 def normv(v): 

1639 if isinstance(v, (tuple, list)) and len(v) == 2: 

1640 return tuple(v) 

1641 elif isinstance(v, str): 

1642 return (v, joker6) 

1643 else: 

1644 raise ValueError("Bad match value: '%s'" % repr(v)) 

1645 

1646 def normk(k): 

1647 k = bytes_encode(k).lower() 

1648 if not k.endswith(b"."): 

1649 k += b"." 

1650 return k 

1651 

1652 self.match = collections.defaultdict(lambda: (joker, joker6)) 

1653 if match: 

1654 if isinstance(match, (list, set)): 

1655 self.match.update({normk(k): (None, None) for k in match}) 

1656 else: 

1657 self.match.update({normk(k): normv(v) for k, v in match.items()}) 

1658 if srvmatch is None: 

1659 self.srvmatch = {} 

1660 else: 

1661 self.srvmatch = {normk(k): normv(v) for k, v in srvmatch.items()} 

1662 

1663 self.send_error = send_error 

1664 self.relay = relay 

1665 if isinstance(from_ip, str): 

1666 self.from_ip = Net(from_ip) 

1667 else: 

1668 self.from_ip = from_ip 

1669 if isinstance(from_ip6, str): 

1670 self.from_ip6 = Net6(from_ip6) 

1671 else: 

1672 self.from_ip6 = from_ip6 

1673 self.src_ip = src_ip 

1674 self.src_ip6 = src_ip6 

1675 self.ttl = ttl 

1676 

1677 def is_request(self, req): 

1678 from scapy.layers.inet6 import IPv6 

1679 return ( 

1680 req.haslayer(self.cls) and 

1681 req.getlayer(self.cls).qr == 0 and ( 

1682 ( 

1683 self.from_ip6 is True or 

1684 (self.from_ip6 and req[IPv6].src in self.from_ip6) 

1685 ) 

1686 if IPv6 in req else 

1687 ( 

1688 self.from_ip is True or 

1689 (self.from_ip and req[IP].src in self.from_ip) 

1690 ) 

1691 ) 

1692 ) 

1693 

1694 def make_reply(self, req): 

1695 # Build reply from the request 

1696 resp = req.copy() 

1697 if Ether in req: 

1698 if self.mDNS: 

1699 resp[Ether].src, resp[Ether].dst = None, None 

1700 elif self.llmnr: 

1701 resp[Ether].src, resp[Ether].dst = None, req[Ether].src 

1702 else: 

1703 resp[Ether].src, resp[Ether].dst = ( 

1704 None if req[Ether].dst == "ff:ff:ff:ff:ff:ff" else req[Ether].dst, 

1705 req[Ether].src, 

1706 ) 

1707 from scapy.layers.inet6 import IPv6 

1708 if IPv6 in req: 

1709 resp[IPv6].underlayer.remove_payload() 

1710 if self.mDNS: 

1711 # "All Multicast DNS responses (including responses sent via unicast) 

1712 # SHOULD be sent with IP TTL set to 255." 

1713 resp /= IPv6(dst="ff02::fb", src=self.src_ip6, 

1714 fl=req[IPv6].fl, hlim=255) 

1715 elif self.llmnr: 

1716 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6, 

1717 fl=req[IPv6].fl, hlim=req[IPv6].hlim) 

1718 else: 

1719 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6 or req[IPv6].dst, 

1720 fl=req[IPv6].fl, hlim=req[IPv6].hlim) 

1721 elif IP in req: 

1722 resp[IP].underlayer.remove_payload() 

1723 if self.mDNS: 

1724 # "All Multicast DNS responses (including responses sent via unicast) 

1725 # SHOULD be sent with IP TTL set to 255." 

1726 resp /= IP(dst="224.0.0.251", src=self.src_ip, 

1727 id=req[IP].id, ttl=255) 

1728 elif self.llmnr: 

1729 resp /= IP(dst=req[IP].src, src=self.src_ip, 

1730 id=req[IP].id, ttl=req[IP].ttl) 

1731 else: 

1732 resp /= IP(dst=req[IP].src, src=self.src_ip or req[IP].dst, 

1733 id=req[IP].id, ttl=req[IP].ttl) 

1734 else: 

1735 warning("No IP or IPv6 layer in %s", req.command()) 

1736 return 

1737 try: 

1738 resp /= UDP(sport=req[UDP].dport, dport=req[UDP].sport) 

1739 except IndexError: 

1740 warning("No UDP layer in %s", req.command(), exc_info=True) 

1741 return 

1742 try: 

1743 req = req[self.cls] 

1744 except IndexError: 

1745 warning( 

1746 "No %s layer in %s", 

1747 self.cls.__name__, 

1748 req.command(), 

1749 exc_info=True, 

1750 ) 

1751 return 

1752 try: 

1753 queries = req.qd 

1754 except AttributeError: 

1755 warning("No qd attribute in %s", req.command(), exc_info=True) 

1756 return 

1757 # Special case: alias 'ALL' query as 'A' + 'AAAA' 

1758 try: 

1759 allquery = next( 

1760 (x for x in queries if getattr(x, "qtype", None) == 255) 

1761 ) 

1762 queries.remove(allquery) 

1763 queries.extend([ 

1764 DNSQR( 

1765 qtype=x, 

1766 qname=allquery.qname, 

1767 unicastresponse=allquery.unicastresponse, 

1768 qclass=allquery.qclass, 

1769 ) 

1770 for x in [1, 28] 

1771 ]) 

1772 except StopIteration: 

1773 pass 

1774 # Process each query 

1775 ans = [] 

1776 ars = [] 

1777 for rq in queries: 

1778 if isinstance(rq, Raw): 

1779 warning("Cannot parse qd element %s", rq.command(), exc_info=True) 

1780 continue 

1781 rqname = rq.qname.lower() 

1782 if rq.qtype in [1, 28]: 

1783 # A or AAAA 

1784 if rq.qtype == 28: 

1785 # AAAA 

1786 rdata = self.match[rqname][1] 

1787 if rdata is None and not self.relay: 

1788 # 'None' resolves to the default IPv6 

1789 iface = resolve_iface(self.optsniff.get("iface", conf.iface)) 

1790 if self.mDNS: 

1791 # All IPs, as per mDNS. 

1792 rdata = iface.ips[6] 

1793 else: 

1794 rdata = get_if_addr6( 

1795 iface 

1796 ) 

1797 if self.mDNS and rdata and IPv6 in resp: 

1798 # For mDNS, we must replace the IPv6 src 

1799 resp[IPv6].src = rdata 

1800 elif rq.qtype == 1: 

1801 # A 

1802 rdata = self.match[rqname][0] 

1803 if rdata is None and not self.relay: 

1804 # 'None' resolves to the default IPv4 

1805 iface = resolve_iface(self.optsniff.get("iface", conf.iface)) 

1806 if self.mDNS: 

1807 # All IPs, as per mDNS. 

1808 rdata = iface.ips[4] 

1809 else: 

1810 rdata = get_if_addr( 

1811 iface 

1812 ) 

1813 if self.mDNS and rdata and IP in resp: 

1814 # For mDNS, we must replace the IP src 

1815 resp[IP].src = rdata 

1816 if rdata: 

1817 # Common A and AAAA 

1818 if not isinstance(rdata, list): 

1819 rdata = [rdata] 

1820 ans.extend([ 

1821 DNSRR( 

1822 rrname=rq.qname, 

1823 ttl=self.ttl, 

1824 rdata=x, 

1825 type=rq.qtype, 

1826 cacheflush=self.mDNS and rq.qtype == rq.qtype, 

1827 ) 

1828 for x in rdata 

1829 ]) 

1830 continue # next 

1831 elif rq.qtype == 33: 

1832 # SRV 

1833 try: 

1834 port, target = self.srvmatch[rqname] 

1835 ans.append(DNSRRSRV( 

1836 rrname=rq.qname, 

1837 port=port, 

1838 target=target, 

1839 weight=100, 

1840 ttl=self.ttl 

1841 )) 

1842 continue # next 

1843 except KeyError: 

1844 # No result 

1845 pass 

1846 elif rq.qtype == 12: 

1847 # PTR 

1848 if rq.qname[-14:] == b".in-addr.arpa." and self.jokerarpa: 

1849 ans.append(DNSRR( 

1850 rrname=rq.qname, 

1851 type=rq.qtype, 

1852 ttl=self.ttl, 

1853 rdata=self.jokerarpa, 

1854 )) 

1855 continue 

1856 # It it arrives here, there is currently no answer 

1857 if self.relay: 

1858 # Relay mode ? 

1859 try: 

1860 _rslv = dns_resolve(rq.qname, qtype=rq.qtype, raw=True) 

1861 if _rslv: 

1862 ans.extend(_rslv.an) 

1863 ars.extend(_rslv.ar) 

1864 continue # next 

1865 except TimeoutError: 

1866 pass 

1867 # Still no answer. 

1868 if self.mDNS: 

1869 # "Any time a responder receives a query for a name for which it 

1870 # has verified exclusive ownership, for a type for which that name 

1871 # has no records, the responder MUST respond asserting the 

1872 # nonexistence of that record using a DNS NSEC record [RFC4034]." 

1873 ans.append(DNSRRNSEC( 

1874 # RFC6762 sect 6.1 - Negative Response 

1875 ttl=self.ttl, 

1876 rrname=rq.qname, 

1877 nextname=rq.qname, 

1878 typebitmaps=RRlist2bitmap([rq.qtype]), 

1879 )) 

1880 if self.mDNS and all(x.type == 47 for x in ans): 

1881 # If mDNS answers with only NSEC, discard. 

1882 return 

1883 if not ans: 

1884 # No answer is available. 

1885 if self.send_error: 

1886 resp /= self.cls(id=req.id, qr=1, qd=req.qd, rcode=3) 

1887 return resp 

1888 log_runtime.info("No answer could be provided to: %s" % req.summary()) 

1889 return 

1890 # Handle Additional Records 

1891 if self.mDNS: 

1892 # Windows specific extension 

1893 ars.append(DNSRROPT( 

1894 z=0x1194, 

1895 rdata=[ 

1896 EDNS0OWN( 

1897 primary_mac=resp[Ether].src, 

1898 ), 

1899 ], 

1900 )) 

1901 # All rq were answered 

1902 if self.mDNS: 

1903 # in mDNS mode, don't repeat the question, set aa=1, rd=0 

1904 dns = self.cls(id=req.id, aa=1, rd=0, qr=1, qd=[], ar=ars, an=ans) 

1905 else: 

1906 dns = self.cls(id=req.id, qr=1, qd=req.qd, ar=ars, an=ans) 

1907 # Compress DNS and mDNS 

1908 if not self.llmnr: 

1909 resp /= dns_compress(dns) 

1910 else: 

1911 resp /= dns 

1912 return resp 

1913 

1914 

1915class mDNS_am(DNS_am): 

1916 """ 

1917 mDNS answering machine. 

1918 

1919 This has the same arguments as DNS_am. See help(DNS_am) 

1920 

1921 Example:: 

1922 

1923 - Answer for 'TEST.local' with local IPv4:: 

1924 

1925 >>> mdnsd(match=["TEST.local"]) 

1926 

1927 - Answer all requests with other IP:: 

1928 

1929 >>> mdnsd(joker="192.168.0.2", joker6="fe80::260:8ff:fe52:f9d8", 

1930 ... iface="eth0") 

1931 

1932 - Answer for multiple different mDNS names:: 

1933 

1934 >>> mdnsd(match={"TEST.local": "192.168.0.100", 

1935 ... "BOB.local": "192.168.0.101"}) 

1936 

1937 - Answer with both A and AAAA records:: 

1938 

1939 >>> mdnsd(match={"TEST.local": ("192.168.0.100", 

1940 ... "fe80::260:8ff:fe52:f9d8")}) 

1941 """ 

1942 function_name = "mdnsd" 

1943 filter = "udp port 5353" 

1944 

1945 

1946# DNS-SD (RFC 6763) 

1947 

1948 

1949class DNSSDResult(SndRcvList): 

1950 def __init__(self, 

1951 res=None, # type: Optional[Union[_PacketList[QueryAnswer], List[QueryAnswer]]] # noqa: E501 

1952 name="DNS-SD", # type: str 

1953 stats=None # type: Optional[List[Type[Packet]]] 

1954 ): 

1955 SndRcvList.__init__(self, res, name, stats) 

1956 

1957 def show(self, types=['PTR', 'SRV'], alltypes=False): 

1958 # type: (List[str], bool) -> None 

1959 """ 

1960 Print the list of discovered services. 

1961 

1962 :param types: types to show. Default ['PTR', 'SRV'] 

1963 :param alltypes: show all types. Default False 

1964 """ 

1965 if alltypes: 

1966 types = None 

1967 data = list() # type: List[Tuple[str | List[str], ...]] 

1968 

1969 resolve_mac = ( 

1970 self.res and isinstance(self.res[0][1].underlayer, Ether) and 

1971 conf.manufdb 

1972 ) 

1973 

1974 header = ("IP", "Service") 

1975 if resolve_mac: 

1976 header = ("Mac",) + header 

1977 

1978 for _, r in self.res: 

1979 attrs = [] 

1980 for attr in itertools.chain(r[DNS].an, r[DNS].ar): 

1981 if types and dnstypes.get(attr.type) not in types: 

1982 continue 

1983 if isinstance(attr, DNSRRNSEC): 

1984 attrs.append(attr.sprintf("%type%=%nextname%")) 

1985 elif isinstance(attr, DNSRRSRV): 

1986 attrs.append(attr.sprintf("%type%=(%target%,%port%)")) 

1987 else: 

1988 attrs.append(attr.sprintf("%type%=%rdata%")) 

1989 ans = (r.src, attrs) 

1990 if resolve_mac: 

1991 mac = conf.manufdb._resolve_MAC(r.underlayer.src) 

1992 data.append((mac,) + ans) 

1993 else: 

1994 data.append(ans) 

1995 

1996 print( 

1997 pretty_list( 

1998 data, 

1999 [header], 

2000 ) 

2001 ) 

2002 

2003 

2004@conf.commands.register 

2005def dnssd(service="_services._dns-sd._udp.local", 

2006 af=socket.AF_INET, 

2007 qtype="PTR", 

2008 iface=None, 

2009 verbose=2, 

2010 timeout=3): 

2011 """ 

2012 Performs a DNS-SD (RFC6763) request 

2013 

2014 :param service: the service name to query (e.g. _spotify-connect._tcp.local) 

2015 :param af: the transport to use. socket.AF_INET or socket.AF_INET6 

2016 :param qtype: the type to use in the mDNS. Either TXT, PTR or SRV. 

2017 :param iface: the interface to do this discovery on. 

2018 """ 

2019 if af == socket.AF_INET: 

2020 pkt = IP(dst=ScopedIP("224.0.0.251", iface), ttl=255) 

2021 elif af == socket.AF_INET6: 

2022 pkt = IPv6(dst=ScopedIP("ff02::fb", iface)) 

2023 else: 

2024 return 

2025 pkt /= UDP(sport=5353, dport=5353) 

2026 pkt /= DNS(rd=0, qd=[DNSQR(qname=service, qtype=qtype)]) 

2027 ans, _ = sr(pkt, multi=True, timeout=timeout, verbose=verbose) 

2028 return DNSSDResult(ans.res)