Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/layers/dns.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

706 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7DNS: Domain Name System. 

8""" 

9 

10import abc 

11import operator 

12import itertools 

13import socket 

14import struct 

15import time 

16import warnings 

17 

18from scapy.arch import ( 

19 get_if_addr, 

20 get_if_addr6, 

21 read_nameservers, 

22) 

23from scapy.ansmachine import AnsweringMachine 

24from scapy.base_classes import Net 

25from scapy.config import conf 

26from scapy.compat import orb, raw, chb, bytes_encode, plain_str 

27from scapy.error import log_runtime, warning, Scapy_Exception 

28from scapy.packet import Packet, bind_layers, Raw 

29from scapy.fields import ( 

30 BitEnumField, 

31 BitField, 

32 ByteEnumField, 

33 ByteField, 

34 ConditionalField, 

35 Field, 

36 FieldLenField, 

37 FieldListField, 

38 FlagsField, 

39 I, 

40 IP6Field, 

41 IntField, 

42 MultipleTypeField, 

43 PacketListField, 

44 ShortEnumField, 

45 ShortField, 

46 StrField, 

47 StrLenField, 

48 UTCTimeField, 

49 XStrFixedLenField, 

50 XStrLenField, 

51) 

52from scapy.sendrecv import sr1 

53from scapy.supersocket import StreamSocket 

54from scapy.pton_ntop import inet_ntop, inet_pton 

55from scapy.volatile import RandShort 

56 

57from scapy.layers.l2 import Ether 

58from scapy.layers.inet import IP, DestIPField, IPField, UDP, TCP 

59 

60from typing import ( 

61 Any, 

62 List, 

63 Optional, 

64 Tuple, 

65 Type, 

66 Union, 

67) 

68 

69 

70# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#dns-parameters-4 

71dnstypes = { 

72 0: "ANY", 

73 1: "A", 2: "NS", 3: "MD", 4: "MF", 5: "CNAME", 6: "SOA", 7: "MB", 8: "MG", 

74 9: "MR", 10: "NULL", 11: "WKS", 12: "PTR", 13: "HINFO", 14: "MINFO", 

75 15: "MX", 16: "TXT", 17: "RP", 18: "AFSDB", 19: "X25", 20: "ISDN", 

76 21: "RT", 22: "NSAP", 23: "NSAP-PTR", 24: "SIG", 25: "KEY", 26: "PX", 

77 27: "GPOS", 28: "AAAA", 29: "LOC", 30: "NXT", 31: "EID", 32: "NIMLOC", 

78 33: "SRV", 34: "ATMA", 35: "NAPTR", 36: "KX", 37: "CERT", 38: "A6", 

79 39: "DNAME", 40: "SINK", 41: "OPT", 42: "APL", 43: "DS", 44: "SSHFP", 

80 45: "IPSECKEY", 46: "RRSIG", 47: "NSEC", 48: "DNSKEY", 49: "DHCID", 

81 50: "NSEC3", 51: "NSEC3PARAM", 52: "TLSA", 53: "SMIMEA", 55: "HIP", 

82 56: "NINFO", 57: "RKEY", 58: "TALINK", 59: "CDS", 60: "CDNSKEY", 

83 61: "OPENPGPKEY", 62: "CSYNC", 63: "ZONEMD", 64: "SVCB", 65: "HTTPS", 

84 99: "SPF", 100: "UINFO", 101: "UID", 102: "GID", 103: "UNSPEC", 104: "NID", 

85 105: "L32", 106: "L64", 107: "LP", 108: "EUI48", 109: "EUI64", 249: "TKEY", 

86 250: "TSIG", 256: "URI", 257: "CAA", 258: "AVC", 259: "DOA", 

87 260: "AMTRELAY", 32768: "TA", 32769: "DLV", 65535: "RESERVED" 

88} 

89 

90 

91dnsqtypes = {251: "IXFR", 252: "AXFR", 253: "MAILB", 254: "MAILA", 255: "ALL"} 

92dnsqtypes.update(dnstypes) 

93dnsclasses = {1: 'IN', 2: 'CS', 3: 'CH', 4: 'HS', 255: 'ANY'} 

94 

95 

96# 12/2023 from https://www.iana.org/assignments/dns-sec-alg-numbers/dns-sec-alg-numbers.xhtml # noqa: E501 

97dnssecalgotypes = {0: "Reserved", 1: "RSA/MD5", 2: "Diffie-Hellman", 3: "DSA/SHA-1", # noqa: E501 

98 4: "Reserved", 5: "RSA/SHA-1", 6: "DSA-NSEC3-SHA1", 

99 7: "RSASHA1-NSEC3-SHA1", 8: "RSA/SHA-256", 9: "Reserved", 

100 10: "RSA/SHA-512", 11: "Reserved", 12: "GOST R 34.10-2001", 

101 13: "ECDSA Curve P-256 with SHA-256", 14: "ECDSA Curve P-384 with SHA-384", # noqa: E501 

102 15: "Ed25519", 16: "Ed448", 

103 252: "Reserved for Indirect Keys", 253: "Private algorithms - domain name", # noqa: E501 

104 254: "Private algorithms - OID", 255: "Reserved"} 

105 

106# 12/2023 from https://www.iana.org/assignments/ds-rr-types/ds-rr-types.xhtml 

107dnssecdigesttypes = {0: "Reserved", 1: "SHA-1", 2: "SHA-256", 3: "GOST R 34.11-94", 4: "SHA-384"} # noqa: E501 

108 

109# 12/2023 from https://www.iana.org/assignments/dnssec-nsec3-parameters/dnssec-nsec3-parameters.xhtml # noqa: E501 

110dnssecnsec3algotypes = {0: "Reserved", 1: "SHA-1"} 

111 

112 

113def dns_get_str(s, full=None, _ignore_compression=False): 

114 """This function decompresses a string s, starting 

115 from the given pointer. 

116 

117 :param s: the string to decompress 

118 :param full: (optional) the full packet (used for decompression) 

119 

120 :returns: (decoded_string, end_index, left_string) 

121 """ 

122 # _ignore_compression is for internal use only 

123 max_length = len(s) 

124 # The result = the extracted name 

125 name = b"" 

126 # Will contain the index after the pointer, to be returned 

127 after_pointer = None 

128 processed_pointers = [] # Used to check for decompression loops 

129 bytes_left = None 

130 _fullpacket = False # s = full packet 

131 pointer = 0 

132 while True: 

133 if abs(pointer) >= max_length: 

134 log_runtime.info( 

135 "DNS RR prematured end (ofs=%i, len=%i)", pointer, len(s) 

136 ) 

137 break 

138 cur = s[pointer] # get pointer value 

139 pointer += 1 # make pointer go forward 

140 if cur & 0xc0: # Label pointer 

141 if after_pointer is None: 

142 # after_pointer points to where the remaining bytes start, 

143 # as pointer will follow the jump token 

144 after_pointer = pointer + 1 

145 if _ignore_compression: 

146 # skip 

147 pointer += 1 

148 continue 

149 if pointer >= max_length: 

150 log_runtime.info( 

151 "DNS incomplete jump token at (ofs=%i)", pointer 

152 ) 

153 break 

154 if not full: 

155 raise Scapy_Exception("DNS message can't be compressed " + 

156 "at this point!") 

157 # Follow the pointer 

158 pointer = ((cur & ~0xc0) << 8) + s[pointer] 

159 if pointer in processed_pointers: 

160 warning("DNS decompression loop detected") 

161 break 

162 if len(processed_pointers) >= 20: 

163 warning("More than 20 jumps in a single DNS decompression ! " 

164 "Dropping (evil packet)") 

165 break 

166 if not _fullpacket: 

167 # We switch our s buffer to full, so we need to remember 

168 # the previous context 

169 bytes_left = s[after_pointer:] 

170 s = full 

171 max_length = len(s) 

172 _fullpacket = True 

173 processed_pointers.append(pointer) 

174 continue 

175 elif cur > 0: # Label 

176 # cur = length of the string 

177 name += s[pointer:pointer + cur] + b"." 

178 pointer += cur 

179 else: # End 

180 break 

181 if after_pointer is not None: 

182 # Return the real end index (not the one we followed) 

183 pointer = after_pointer 

184 if bytes_left is None: 

185 bytes_left = s[pointer:] 

186 # name, remaining 

187 return name or b".", bytes_left 

188 

189 

190def _is_ptr(x): 

191 return b"." not in x and ( 

192 (x and orb(x[-1]) == 0) or 

193 (len(x) >= 2 and (orb(x[-2]) & 0xc0) == 0xc0) 

194 ) 

195 

196 

197def dns_encode(x, check_built=False): 

198 """Encodes a bytes string into the DNS format 

199 

200 :param x: the string 

201 :param check_built: detect already-built strings and ignore them 

202 :returns: the encoded bytes string 

203 """ 

204 if not x or x == b".": 

205 return b"\x00" 

206 

207 if check_built and _is_ptr(x): 

208 # The value has already been processed. Do not process it again 

209 return x 

210 

211 # Truncate chunks that cannot be encoded (more than 63 bytes..) 

212 x = b"".join(chb(len(y)) + y for y in (k[:63] for k in x.split(b"."))) 

213 if x[-1:] != b"\x00": 

214 x += b"\x00" 

215 return x 

216 

217 

218def DNSgetstr(*args, **kwargs): 

219 """Legacy function. Deprecated""" 

220 warnings.warn( 

221 "DNSgetstr is deprecated. Use dns_get_str instead.", 

222 DeprecationWarning 

223 ) 

224 return dns_get_str(*args, **kwargs)[:-1] 

225 

226 

227def dns_compress(pkt): 

228 """This function compresses a DNS packet according to compression rules. 

229 """ 

230 if DNS not in pkt: 

231 raise Scapy_Exception("Can only compress DNS layers") 

232 pkt = pkt.copy() 

233 dns_pkt = pkt.getlayer(DNS) 

234 dns_pkt.clear_cache() 

235 build_pkt = raw(dns_pkt) 

236 

237 def field_gen(dns_pkt): 

238 """Iterates through all DNS strings that can be compressed""" 

239 for lay in [dns_pkt.qd, dns_pkt.an, dns_pkt.ns, dns_pkt.ar]: 

240 if not lay: 

241 continue 

242 for current in lay: 

243 for field in current.fields_desc: 

244 if isinstance(field, DNSStrField) or \ 

245 (isinstance(field, MultipleTypeField) and 

246 current.type in [2, 3, 4, 5, 12, 15, 39]): 

247 # Get the associated data and store it accordingly # noqa: E501 

248 dat = current.getfieldval(field.name) 

249 yield current, field.name, dat 

250 

251 def possible_shortens(dat): 

252 """Iterates through all possible compression parts in a DNS string""" 

253 if dat == b".": # we'd lose by compressing it 

254 return 

255 yield dat 

256 for x in range(1, dat.count(b".")): 

257 yield dat.split(b".", x)[x] 

258 data = {} 

259 for current, name, dat in field_gen(dns_pkt): 

260 for part in possible_shortens(dat): 

261 # Encode the data 

262 encoded = dns_encode(part, check_built=True) 

263 if part not in data: 

264 # We have no occurrence of such data, let's store it as a 

265 # possible pointer for future strings. 

266 # We get the index of the encoded data 

267 index = build_pkt.index(encoded) 

268 # The following is used to build correctly the pointer 

269 fb_index = ((index >> 8) | 0xc0) 

270 sb_index = index - (256 * (fb_index - 0xc0)) 

271 pointer = chb(fb_index) + chb(sb_index) 

272 data[part] = [(current, name, pointer, index + 1)] 

273 else: 

274 # This string already exists, let's mark the current field 

275 # with it, so that it gets compressed 

276 data[part].append((current, name)) 

277 _in = data[part][0][3] 

278 build_pkt = build_pkt[:_in] + build_pkt[_in:].replace( 

279 encoded, 

280 b"\0\0", 

281 1 

282 ) 

283 break 

284 # Apply compression rules 

285 for ck in data: 

286 # compression_key is a DNS string 

287 replacements = data[ck] 

288 # replacements is the list of all tuples (layer, field name) 

289 # where this string was found 

290 replace_pointer = replacements.pop(0)[2] 

291 # replace_pointer is the packed pointer that should replace 

292 # those strings. Note that pop remove it from the list 

293 for rep in replacements: 

294 # setfieldval edits the value of the field in the layer 

295 val = rep[0].getfieldval(rep[1]) 

296 assert val.endswith(ck) 

297 kept_string = dns_encode(val[:-len(ck)], check_built=True)[:-1] 

298 new_val = kept_string + replace_pointer 

299 rep[0].setfieldval(rep[1], new_val) 

300 try: 

301 del rep[0].rdlen 

302 except AttributeError: 

303 pass 

304 # End of the compression algorithm 

305 # Destroy the previous DNS layer if needed 

306 if not isinstance(pkt, DNS) and pkt.getlayer(DNS).underlayer: 

307 pkt.getlayer(DNS).underlayer.remove_payload() 

308 return pkt / dns_pkt 

309 return dns_pkt 

310 

311 

312class DNSCompressedPacket(Packet): 

313 """ 

314 Class to mark that a packet contains DNSStrField and supports compression 

315 """ 

316 @abc.abstractmethod 

317 def get_full(self): 

318 pass 

319 

320 

321class DNSStrField(StrLenField): 

322 """ 

323 Special StrField that handles DNS encoding/decoding. 

324 It will also handle DNS decompression. 

325 (may be StrLenField if a length_from is passed), 

326 """ 

327 def any2i(self, pkt, x): 

328 if x and isinstance(x, list): 

329 return [self.h2i(pkt, y) for y in x] 

330 return super(DNSStrField, self).any2i(pkt, x) 

331 

332 def h2i(self, pkt, x): 

333 # Setting a DNSStrField manually (h2i) means any current compression will break 

334 if ( 

335 pkt and 

336 isinstance(pkt.parent, DNSCompressedPacket) and 

337 pkt.parent.raw_packet_cache 

338 ): 

339 pkt.parent.clear_cache() 

340 if not x: 

341 return b"." 

342 x = bytes_encode(x) 

343 if x[-1:] != b"." and not _is_ptr(x): 

344 return x + b"." 

345 return x 

346 

347 def i2m(self, pkt, x): 

348 return dns_encode(x, check_built=True) 

349 

350 def i2len(self, pkt, x): 

351 return len(self.i2m(pkt, x)) 

352 

353 def get_full(self, pkt): 

354 while pkt and not isinstance(pkt, DNSCompressedPacket): 

355 pkt = pkt.parent or pkt.underlayer 

356 if not pkt: 

357 return None 

358 return pkt.get_full() 

359 

360 def getfield(self, pkt, s): 

361 remain = b"" 

362 if self.length_from: 

363 remain, s = super(DNSStrField, self).getfield(pkt, s) 

364 # Decode the compressed DNS message 

365 decoded, left = dns_get_str(s, full=self.get_full(pkt)) 

366 # returns (remaining, decoded) 

367 return left + remain, decoded 

368 

369 

370class DNSTextField(StrLenField): 

371 """ 

372 Special StrLenField that handles DNS TEXT data (16) 

373 """ 

374 

375 islist = 1 

376 

377 def i2h(self, pkt, x): 

378 if not x: 

379 return [] 

380 return x 

381 

382 def m2i(self, pkt, s): 

383 ret_s = list() 

384 tmp_s = s 

385 # RDATA contains a list of strings, each are prepended with 

386 # a byte containing the size of the following string. 

387 while tmp_s: 

388 tmp_len = orb(tmp_s[0]) + 1 

389 if tmp_len > len(tmp_s): 

390 log_runtime.info( 

391 "DNS RR TXT prematured end of character-string " 

392 "(size=%i, remaining bytes=%i)", tmp_len, len(tmp_s) 

393 ) 

394 ret_s.append(tmp_s[1:tmp_len]) 

395 tmp_s = tmp_s[tmp_len:] 

396 return ret_s 

397 

398 def any2i(self, pkt, x): 

399 if isinstance(x, (str, bytes)): 

400 return [x] 

401 return x 

402 

403 def i2len(self, pkt, x): 

404 return len(self.i2m(pkt, x)) 

405 

406 def i2m(self, pkt, s): 

407 ret_s = b"" 

408 for text in s: 

409 if not text: 

410 ret_s += b"\x00" 

411 continue 

412 text = bytes_encode(text) 

413 # The initial string must be split into a list of strings 

414 # prepended with theirs sizes. 

415 while len(text) >= 255: 

416 ret_s += b"\xff" + text[:255] 

417 text = text[255:] 

418 # The remaining string is less than 255 bytes long 

419 if len(text): 

420 ret_s += struct.pack("!B", len(text)) + text 

421 return ret_s 

422 

423 

424# RFC 2671 - Extension Mechanisms for DNS (EDNS0) 

425 

426edns0types = {0: "Reserved", 1: "LLQ", 2: "UL", 3: "NSID", 4: "Reserved", 

427 5: "DAU", 6: "DHU", 7: "N3U", 8: "edns-client-subnet", 10: "COOKIE", 

428 15: "Extended DNS Error"} 

429 

430 

431class _EDNS0Dummy(Packet): 

432 name = "Dummy class that implements extract_padding()" 

433 

434 def extract_padding(self, p): 

435 # type: (bytes) -> Tuple[bytes, Optional[bytes]] 

436 return "", p 

437 

438 

439class EDNS0TLV(_EDNS0Dummy): 

440 name = "DNS EDNS0 TLV" 

441 fields_desc = [ShortEnumField("optcode", 0, edns0types), 

442 FieldLenField("optlen", None, "optdata", fmt="H"), 

443 StrLenField("optdata", "", 

444 length_from=lambda pkt: pkt.optlen)] 

445 

446 @classmethod 

447 def dispatch_hook(cls, _pkt=None, *args, **kargs): 

448 # type: (Optional[bytes], *Any, **Any) -> Type[Packet] 

449 if _pkt is None: 

450 return EDNS0TLV 

451 if len(_pkt) < 2: 

452 return Raw 

453 edns0type = struct.unpack("!H", _pkt[:2])[0] 

454 return EDNS0OPT_DISPATCHER.get(edns0type, EDNS0TLV) 

455 

456 

457class DNSRROPT(Packet): 

458 name = "DNS OPT Resource Record" 

459 fields_desc = [DNSStrField("rrname", ""), 

460 ShortEnumField("type", 41, dnstypes), 

461 ShortField("rclass", 4096), 

462 ByteField("extrcode", 0), 

463 ByteField("version", 0), 

464 # version 0 means EDNS0 

465 BitEnumField("z", 32768, 16, {32768: "D0"}), 

466 # D0 means DNSSEC OK from RFC 3225 

467 FieldLenField("rdlen", None, length_of="rdata", fmt="H"), 

468 PacketListField("rdata", [], EDNS0TLV, 

469 length_from=lambda pkt: pkt.rdlen)] 

470 

471 

472# RFC 6975 - Signaling Cryptographic Algorithm Understanding in 

473# DNS Security Extensions (DNSSEC) 

474 

475class EDNS0DAU(_EDNS0Dummy): 

476 name = "DNSSEC Algorithm Understood (DAU)" 

477 fields_desc = [ShortEnumField("optcode", 5, edns0types), 

478 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 

479 FieldListField("alg_code", None, 

480 ByteEnumField("", 0, dnssecalgotypes), 

481 count_from=lambda pkt:pkt.optlen)] 

482 

483 

484class EDNS0DHU(_EDNS0Dummy): 

485 name = "DS Hash Understood (DHU)" 

486 fields_desc = [ShortEnumField("optcode", 6, edns0types), 

487 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 

488 FieldListField("alg_code", None, 

489 ByteEnumField("", 0, dnssecdigesttypes), 

490 count_from=lambda pkt:pkt.optlen)] 

491 

492 

493class EDNS0N3U(_EDNS0Dummy): 

494 name = "NSEC3 Hash Understood (N3U)" 

495 fields_desc = [ShortEnumField("optcode", 7, edns0types), 

496 FieldLenField("optlen", None, count_of="alg_code", fmt="H"), 

497 FieldListField("alg_code", None, 

498 ByteEnumField("", 0, dnssecnsec3algotypes), 

499 count_from=lambda pkt:pkt.optlen)] 

500 

501 

502# RFC 7871 - Client Subnet in DNS Queries 

503 

504class ClientSubnetv4(StrLenField): 

505 af_familly = socket.AF_INET 

506 af_length = 32 

507 af_default = b"\xc0" # 192.0.0.0 

508 

509 def getfield(self, pkt, s): 

510 # type: (Packet, bytes) -> Tuple[bytes, I] 

511 sz = operator.floordiv(self.length_from(pkt), 8) 

512 sz = min(sz, operator.floordiv(self.af_length, 8)) 

513 return s[sz:], self.m2i(pkt, s[:sz]) 

514 

515 def m2i(self, pkt, x): 

516 # type: (Optional[Packet], bytes) -> str 

517 padding = self.af_length - self.length_from(pkt) 

518 if padding: 

519 x += b"\x00" * operator.floordiv(padding, 8) 

520 x = x[: operator.floordiv(self.af_length, 8)] 

521 return inet_ntop(self.af_familly, x) 

522 

523 def _pack_subnet(self, subnet): 

524 # type: (bytes) -> bytes 

525 packed_subnet = inet_pton(self.af_familly, plain_str(subnet)) 

526 for i in list(range(operator.floordiv(self.af_length, 8)))[::-1]: 

527 if orb(packed_subnet[i]) != 0: 

528 i += 1 

529 break 

530 return packed_subnet[:i] 

531 

532 def i2m(self, pkt, x): 

533 # type: (Optional[Packet], Optional[Union[str, Net]]) -> bytes 

534 if x is None: 

535 return self.af_default 

536 try: 

537 return self._pack_subnet(x) 

538 except (OSError, socket.error): 

539 pkt.family = 2 

540 return ClientSubnetv6("", "")._pack_subnet(x) 

541 

542 def i2len(self, pkt, x): 

543 # type: (Packet, Any) -> int 

544 if x is None: 

545 return 1 

546 try: 

547 return len(self._pack_subnet(x)) 

548 except (OSError, socket.error): 

549 pkt.family = 2 

550 return len(ClientSubnetv6("", "")._pack_subnet(x)) 

551 

552 

553class ClientSubnetv6(ClientSubnetv4): 

554 af_familly = socket.AF_INET6 

555 af_length = 128 

556 af_default = b"\x20" # 2000:: 

557 

558 

559class EDNS0ClientSubnet(_EDNS0Dummy): 

560 name = "DNS EDNS0 Client Subnet" 

561 fields_desc = [ShortEnumField("optcode", 8, edns0types), 

562 FieldLenField("optlen", None, "address", fmt="H", 

563 adjust=lambda pkt, x: x + 4), 

564 ShortField("family", 1), 

565 FieldLenField("source_plen", None, 

566 length_of="address", 

567 fmt="B", 

568 adjust=lambda pkt, x: x * 8), 

569 ByteField("scope_plen", 0), 

570 MultipleTypeField( 

571 [(ClientSubnetv4("address", "192.168.0.0", 

572 length_from=lambda p: p.source_plen), 

573 lambda pkt: pkt.family == 1), 

574 (ClientSubnetv6("address", "2001:db8::", 

575 length_from=lambda p: p.source_plen), 

576 lambda pkt: pkt.family == 2)], 

577 ClientSubnetv4("address", "192.168.0.0", 

578 length_from=lambda p: p.source_plen))] 

579 

580 

581class EDNS0COOKIE(_EDNS0Dummy): 

582 name = "DNS EDNS0 COOKIE" 

583 fields_desc = [ShortEnumField("optcode", 10, edns0types), 

584 FieldLenField("optlen", None, length_of="server_cookie", fmt="!H", 

585 adjust=lambda pkt, x: x + 8), 

586 XStrFixedLenField("client_cookie", b"\x00" * 8, length=8), 

587 XStrLenField("server_cookie", "", 

588 length_from=lambda pkt: max(0, pkt.optlen - 8))] 

589 

590 

591# RFC 8914 - Extended DNS Errors 

592 

593# https://www.iana.org/assignments/dns-parameters/dns-parameters.xhtml#extended-dns-error-codes 

594extended_dns_error_codes = { 

595 0: "Other", 

596 1: "Unsupported DNSKEY Algorithm", 

597 2: "Unsupported DS Digest Type", 

598 3: "Stale Answer", 

599 4: "Forged Answer", 

600 5: "DNSSEC Indeterminate", 

601 6: "DNSSEC Bogus", 

602 7: "Signature Expired", 

603 8: "Signature Not Yet Valid", 

604 9: "DNSKEY Missing", 

605 10: "RRSIGs Missing", 

606 11: "No Zone Key Bit Set", 

607 12: "NSEC Missing", 

608 13: "Cached Error", 

609 14: "Not Ready", 

610 15: "Blocked", 

611 16: "Censored", 

612 17: "Filtered", 

613 18: "Prohibited", 

614 19: "Stale NXDOMAIN Answer", 

615 20: "Not Authoritative", 

616 21: "Not Supported", 

617 22: "No Reachable Authority", 

618 23: "Network Error", 

619 24: "Invalid Data", 

620 25: "Signature Expired before Valid", 

621 26: "Too Early", 

622 27: "Unsupported NSEC3 Iterations Value", 

623 28: "Unable to conform to policy", 

624 29: "Synthesized", 

625} 

626 

627 

628# https://www.rfc-editor.org/rfc/rfc8914.html 

629class EDNS0ExtendedDNSError(_EDNS0Dummy): 

630 name = "DNS EDNS0 Extended DNS Error" 

631 fields_desc = [ShortEnumField("optcode", 15, edns0types), 

632 FieldLenField("optlen", None, length_of="extra_text", fmt="!H", 

633 adjust=lambda pkt, x: x + 2), 

634 ShortEnumField("info_code", 0, extended_dns_error_codes), 

635 StrLenField("extra_text", "", 

636 length_from=lambda pkt: pkt.optlen - 2)] 

637 

638 

639EDNS0OPT_DISPATCHER = { 

640 5: EDNS0DAU, 

641 6: EDNS0DHU, 

642 7: EDNS0N3U, 

643 8: EDNS0ClientSubnet, 

644 10: EDNS0COOKIE, 

645 15: EDNS0ExtendedDNSError, 

646} 

647 

648 

649# RFC 4034 - Resource Records for the DNS Security Extensions 

650 

651def bitmap2RRlist(bitmap): 

652 """ 

653 Decode the 'Type Bit Maps' field of the NSEC Resource Record into an 

654 integer list. 

655 """ 

656 # RFC 4034, 4.1.2. The Type Bit Maps Field 

657 

658 RRlist = [] 

659 

660 while bitmap: 

661 

662 if len(bitmap) < 2: 

663 log_runtime.info("bitmap too short (%i)", len(bitmap)) 

664 return 

665 

666 window_block = orb(bitmap[0]) # window number 

667 offset = 256 * window_block # offset of the Resource Record 

668 bitmap_len = orb(bitmap[1]) # length of the bitmap in bytes 

669 

670 if bitmap_len <= 0 or bitmap_len > 32: 

671 log_runtime.info("bitmap length is no valid (%i)", bitmap_len) 

672 return 

673 

674 tmp_bitmap = bitmap[2:2 + bitmap_len] 

675 

676 # Let's compare each bit of tmp_bitmap and compute the real RR value 

677 for b in range(len(tmp_bitmap)): 

678 v = 128 

679 for i in range(8): 

680 if orb(tmp_bitmap[b]) & v: 

681 # each of the RR is encoded as a bit 

682 RRlist += [offset + b * 8 + i] 

683 v = v >> 1 

684 

685 # Next block if any 

686 bitmap = bitmap[2 + bitmap_len:] 

687 

688 return RRlist 

689 

690 

691def RRlist2bitmap(lst): 

692 """ 

693 Encode a list of integers representing Resource Records to a bitmap field 

694 used in the NSEC Resource Record. 

695 """ 

696 # RFC 4034, 4.1.2. The Type Bit Maps Field 

697 

698 import math 

699 

700 bitmap = b"" 

701 lst = [abs(x) for x in sorted(set(lst)) if x <= 65535] 

702 

703 # number of window blocks 

704 max_window_blocks = int(math.ceil(lst[-1] / 256.)) 

705 min_window_blocks = int(math.floor(lst[0] / 256.)) 

706 if min_window_blocks == max_window_blocks: 

707 max_window_blocks += 1 

708 

709 for wb in range(min_window_blocks, max_window_blocks + 1): 

710 # First, filter out RR not encoded in the current window block 

711 # i.e. keep everything between 256*wb <= 256*(wb+1) 

712 rrlist = sorted(x for x in lst if 256 * wb <= x < 256 * (wb + 1)) 

713 if not rrlist: 

714 continue 

715 

716 # Compute the number of bytes used to store the bitmap 

717 if rrlist[-1] == 0: # only one element in the list 

718 bytes_count = 1 

719 else: 

720 max = rrlist[-1] - 256 * wb 

721 bytes_count = int(math.ceil(max // 8)) + 1 # use at least 1 byte 

722 if bytes_count > 32: # Don't encode more than 256 bits / values 

723 bytes_count = 32 

724 

725 bitmap += struct.pack("BB", wb, bytes_count) 

726 

727 # Generate the bitmap 

728 # The idea is to remove out of range Resource Records with these steps 

729 # 1. rescale to fit into 8 bits 

730 # 2. x gives the bit position ; compute the corresponding value 

731 # 3. sum everything 

732 bitmap += b"".join( 

733 struct.pack( 

734 b"B", 

735 sum(2 ** (7 - (x - 256 * wb) + (tmp * 8)) for x in rrlist 

736 if 256 * wb + 8 * tmp <= x < 256 * wb + 8 * tmp + 8), 

737 ) for tmp in range(bytes_count) 

738 ) 

739 

740 return bitmap 

741 

742 

743class RRlistField(StrField): 

744 def h2i(self, pkt, x): 

745 if isinstance(x, list): 

746 return RRlist2bitmap(x) 

747 return x 

748 

749 def i2repr(self, pkt, x): 

750 x = self.i2h(pkt, x) 

751 rrlist = bitmap2RRlist(x) 

752 return [dnstypes.get(rr, rr) for rr in rrlist] if rrlist else repr(x) 

753 

754 

755class _DNSRRdummy(Packet): 

756 name = "Dummy class that implements post_build() for Resource Records" 

757 

758 def post_build(self, pkt, pay): 

759 if self.rdlen is not None: 

760 return pkt + pay 

761 

762 lrrname = len(self.fields_desc[0].i2m("", self.getfieldval("rrname"))) 

763 tmp_len = len(pkt) - lrrname - 10 

764 tmp_pkt = pkt[:lrrname + 8] 

765 pkt = struct.pack("!H", tmp_len) + pkt[lrrname + 8 + 2:] 

766 

767 return tmp_pkt + pkt + pay 

768 

769 def default_payload_class(self, payload): 

770 return conf.padding_layer 

771 

772 

773class DNSRRHINFO(_DNSRRdummy): 

774 name = "DNS HINFO Resource Record" 

775 fields_desc = [DNSStrField("rrname", ""), 

776 ShortEnumField("type", 13, dnstypes), 

777 ShortEnumField("rclass", 1, dnsclasses), 

778 IntField("ttl", 0), 

779 ShortField("rdlen", None), 

780 FieldLenField("cpulen", None, fmt="!B", length_of="cpu"), 

781 StrLenField("cpu", "", length_from=lambda x: x.cpulen), 

782 FieldLenField("oslen", None, fmt="!B", length_of="os"), 

783 StrLenField("os", "", length_from=lambda x: x.oslen)] 

784 

785 

786class DNSRRMX(_DNSRRdummy): 

787 name = "DNS MX Resource Record" 

788 fields_desc = [DNSStrField("rrname", ""), 

789 ShortEnumField("type", 15, dnstypes), 

790 ShortEnumField("rclass", 1, dnsclasses), 

791 IntField("ttl", 0), 

792 ShortField("rdlen", None), 

793 ShortField("preference", 0), 

794 DNSStrField("exchange", ""), 

795 ] 

796 

797 

798class DNSRRSOA(_DNSRRdummy): 

799 name = "DNS SOA Resource Record" 

800 fields_desc = [DNSStrField("rrname", ""), 

801 ShortEnumField("type", 6, dnstypes), 

802 ShortEnumField("rclass", 1, dnsclasses), 

803 IntField("ttl", 0), 

804 ShortField("rdlen", None), 

805 DNSStrField("mname", ""), 

806 DNSStrField("rname", ""), 

807 IntField("serial", 0), 

808 IntField("refresh", 0), 

809 IntField("retry", 0), 

810 IntField("expire", 0), 

811 IntField("minimum", 0) 

812 ] 

813 

814 

815class DNSRRRSIG(_DNSRRdummy): 

816 name = "DNS RRSIG Resource Record" 

817 fields_desc = [DNSStrField("rrname", ""), 

818 ShortEnumField("type", 46, dnstypes), 

819 ShortEnumField("rclass", 1, dnsclasses), 

820 IntField("ttl", 0), 

821 ShortField("rdlen", None), 

822 ShortEnumField("typecovered", 1, dnstypes), 

823 ByteEnumField("algorithm", 5, dnssecalgotypes), 

824 ByteField("labels", 0), 

825 IntField("originalttl", 0), 

826 UTCTimeField("expiration", 0), 

827 UTCTimeField("inception", 0), 

828 ShortField("keytag", 0), 

829 DNSStrField("signersname", ""), 

830 StrField("signature", "") 

831 ] 

832 

833 

834class DNSRRNSEC(_DNSRRdummy): 

835 name = "DNS NSEC Resource Record" 

836 fields_desc = [DNSStrField("rrname", ""), 

837 ShortEnumField("type", 47, dnstypes), 

838 ShortEnumField("rclass", 1, dnsclasses), 

839 IntField("ttl", 0), 

840 ShortField("rdlen", None), 

841 DNSStrField("nextname", ""), 

842 RRlistField("typebitmaps", "") 

843 ] 

844 

845 

846class DNSRRDNSKEY(_DNSRRdummy): 

847 name = "DNS DNSKEY Resource Record" 

848 fields_desc = [DNSStrField("rrname", ""), 

849 ShortEnumField("type", 48, dnstypes), 

850 ShortEnumField("rclass", 1, dnsclasses), 

851 IntField("ttl", 0), 

852 ShortField("rdlen", None), 

853 FlagsField("flags", 256, 16, "S???????Z???????"), 

854 # S: Secure Entry Point 

855 # Z: Zone Key 

856 ByteField("protocol", 3), 

857 ByteEnumField("algorithm", 5, dnssecalgotypes), 

858 StrField("publickey", "") 

859 ] 

860 

861 

862class DNSRRDS(_DNSRRdummy): 

863 name = "DNS DS Resource Record" 

864 fields_desc = [DNSStrField("rrname", ""), 

865 ShortEnumField("type", 43, dnstypes), 

866 ShortEnumField("rclass", 1, dnsclasses), 

867 IntField("ttl", 0), 

868 ShortField("rdlen", None), 

869 ShortField("keytag", 0), 

870 ByteEnumField("algorithm", 5, dnssecalgotypes), 

871 ByteEnumField("digesttype", 5, dnssecdigesttypes), 

872 StrField("digest", "") 

873 ] 

874 

875 

876# RFC 5074 - DNSSEC Lookaside Validation (DLV) 

877class DNSRRDLV(DNSRRDS): 

878 name = "DNS DLV Resource Record" 

879 

880 def __init__(self, *args, **kargs): 

881 DNSRRDS.__init__(self, *args, **kargs) 

882 if not kargs.get('type', 0): 

883 self.type = 32769 

884 

885# RFC 5155 - DNS Security (DNSSEC) Hashed Authenticated Denial of Existence 

886 

887 

888class DNSRRNSEC3(_DNSRRdummy): 

889 name = "DNS NSEC3 Resource Record" 

890 fields_desc = [DNSStrField("rrname", ""), 

891 ShortEnumField("type", 50, dnstypes), 

892 ShortEnumField("rclass", 1, dnsclasses), 

893 IntField("ttl", 0), 

894 ShortField("rdlen", None), 

895 ByteField("hashalg", 0), 

896 BitEnumField("flags", 0, 8, {1: "Opt-Out"}), 

897 ShortField("iterations", 0), 

898 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"), 

899 StrLenField("salt", "", length_from=lambda x: x.saltlength), 

900 FieldLenField("hashlength", 0, fmt="!B", length_of="nexthashedownername"), # noqa: E501 

901 StrLenField("nexthashedownername", "", length_from=lambda x: x.hashlength), # noqa: E501 

902 RRlistField("typebitmaps", "") 

903 ] 

904 

905 

906class DNSRRNSEC3PARAM(_DNSRRdummy): 

907 name = "DNS NSEC3PARAM Resource Record" 

908 fields_desc = [DNSStrField("rrname", ""), 

909 ShortEnumField("type", 51, dnstypes), 

910 ShortEnumField("rclass", 1, dnsclasses), 

911 IntField("ttl", 0), 

912 ShortField("rdlen", None), 

913 ByteField("hashalg", 0), 

914 ByteField("flags", 0), 

915 ShortField("iterations", 0), 

916 FieldLenField("saltlength", 0, fmt="!B", length_of="salt"), 

917 StrLenField("salt", "", length_from=lambda pkt: pkt.saltlength) # noqa: E501 

918 ] 

919 

920 

921# RFC 9460 Service Binding and Parameter Specification via the DNS 

922# https://www.rfc-editor.org/rfc/rfc9460.html 

923 

924 

925# https://www.iana.org/assignments/dns-svcb/dns-svcb.xhtml 

926svc_param_keys = { 

927 0: "mandatory", 

928 1: "alpn", 

929 2: "no-default-alpn", 

930 3: "port", 

931 4: "ipv4hint", 

932 5: "ech", 

933 6: "ipv6hint", 

934 7: "dohpath", 

935 8: "ohttp", 

936} 

937 

938 

939class SvcParam(Packet): 

940 name = "SvcParam" 

941 fields_desc = [ShortEnumField("key", 0, svc_param_keys), 

942 FieldLenField("len", None, length_of="value", fmt="H"), 

943 MultipleTypeField( 

944 [ 

945 # mandatory 

946 (FieldListField("value", [], 

947 ShortEnumField("", 0, svc_param_keys), 

948 length_from=lambda pkt: pkt.len), 

949 lambda pkt: pkt.key == 0), 

950 # alpn, no-default-alpn 

951 (DNSTextField("value", [], 

952 length_from=lambda pkt: pkt.len), 

953 lambda pkt: pkt.key in (1, 2)), 

954 # port 

955 (ShortField("value", 0), 

956 lambda pkt: pkt.key == 3), 

957 # ipv4hint 

958 (FieldListField("value", [], 

959 IPField("", "0.0.0.0"), 

960 length_from=lambda pkt: pkt.len), 

961 lambda pkt: pkt.key == 4), 

962 # ipv6hint 

963 (FieldListField("value", [], 

964 IP6Field("", "::"), 

965 length_from=lambda pkt: pkt.len), 

966 lambda pkt: pkt.key == 6), 

967 ], 

968 StrLenField("value", "", 

969 length_from=lambda pkt:pkt.len))] 

970 

971 def extract_padding(self, p): 

972 return "", p 

973 

974 

975class DNSRRSVCB(_DNSRRdummy): 

976 name = "DNS SVCB Resource Record" 

977 fields_desc = [DNSStrField("rrname", ""), 

978 ShortEnumField("type", 64, dnstypes), 

979 ShortEnumField("rclass", 1, dnsclasses), 

980 IntField("ttl", 0), 

981 ShortField("rdlen", None), 

982 ShortField("svc_priority", 0), 

983 DNSStrField("target_name", ""), 

984 PacketListField("svc_params", [], SvcParam)] 

985 

986 

987class DNSRRHTTPS(_DNSRRdummy): 

988 name = "DNS HTTPS Resource Record" 

989 fields_desc = [DNSStrField("rrname", ""), 

990 ShortEnumField("type", 65, dnstypes) 

991 ] + DNSRRSVCB.fields_desc[2:] 

992 

993 

994# RFC 2782 - A DNS RR for specifying the location of services (DNS SRV) 

995 

996 

997class DNSRRSRV(_DNSRRdummy): 

998 name = "DNS SRV Resource Record" 

999 fields_desc = [DNSStrField("rrname", ""), 

1000 ShortEnumField("type", 33, dnstypes), 

1001 ShortEnumField("rclass", 1, dnsclasses), 

1002 IntField("ttl", 0), 

1003 ShortField("rdlen", None), 

1004 ShortField("priority", 0), 

1005 ShortField("weight", 0), 

1006 ShortField("port", 0), 

1007 DNSStrField("target", ""), ] 

1008 

1009 

1010# RFC 2845 - Secret Key Transaction Authentication for DNS (TSIG) 

1011tsig_algo_sizes = {"HMAC-MD5.SIG-ALG.REG.INT": 16, 

1012 "hmac-sha1": 20} 

1013 

1014 

1015class TimeSignedField(Field[int, bytes]): 

1016 def __init__(self, name, default): 

1017 Field.__init__(self, name, default, fmt="6s") 

1018 

1019 def _convert_seconds(self, packed_seconds): 

1020 """Unpack the internal representation.""" 

1021 seconds = struct.unpack("!H", packed_seconds[:2])[0] 

1022 seconds += struct.unpack("!I", packed_seconds[2:])[0] 

1023 return seconds 

1024 

1025 def i2m(self, pkt, seconds): 

1026 """Convert the number of seconds since 1-Jan-70 UTC to the packed 

1027 representation.""" 

1028 

1029 if seconds is None: 

1030 seconds = 0 

1031 

1032 tmp_short = (seconds >> 32) & 0xFFFF 

1033 tmp_int = seconds & 0xFFFFFFFF 

1034 

1035 return struct.pack("!HI", tmp_short, tmp_int) 

1036 

1037 def m2i(self, pkt, packed_seconds): 

1038 """Convert the internal representation to the number of seconds 

1039 since 1-Jan-70 UTC.""" 

1040 

1041 if packed_seconds is None: 

1042 return None 

1043 

1044 return self._convert_seconds(packed_seconds) 

1045 

1046 def i2repr(self, pkt, packed_seconds): 

1047 """Convert the internal representation to a nice one using the RFC 

1048 format.""" 

1049 time_struct = time.gmtime(packed_seconds) 

1050 return time.strftime("%a %b %d %H:%M:%S %Y", time_struct) 

1051 

1052 

1053class DNSRRTSIG(_DNSRRdummy): 

1054 name = "DNS TSIG Resource Record" 

1055 fields_desc = [DNSStrField("rrname", ""), 

1056 ShortEnumField("type", 250, dnstypes), 

1057 ShortEnumField("rclass", 1, dnsclasses), 

1058 IntField("ttl", 0), 

1059 ShortField("rdlen", None), 

1060 DNSStrField("algo_name", "hmac-sha1"), 

1061 TimeSignedField("time_signed", 0), 

1062 ShortField("fudge", 0), 

1063 FieldLenField("mac_len", 20, fmt="!H", length_of="mac_data"), # noqa: E501 

1064 StrLenField("mac_data", "", length_from=lambda pkt: pkt.mac_len), # noqa: E501 

1065 ShortField("original_id", 0), 

1066 ShortField("error", 0), 

1067 FieldLenField("other_len", 0, fmt="!H", length_of="other_data"), # noqa: E501 

1068 StrLenField("other_data", "", length_from=lambda pkt: pkt.other_len) # noqa: E501 

1069 ] 

1070 

1071 

1072DNSRR_DISPATCHER = { 

1073 6: DNSRRSOA, # RFC 1035 

1074 13: DNSRRHINFO, # RFC 1035 

1075 15: DNSRRMX, # RFC 1035 

1076 33: DNSRRSRV, # RFC 2782 

1077 41: DNSRROPT, # RFC 1671 

1078 43: DNSRRDS, # RFC 4034 

1079 46: DNSRRRSIG, # RFC 4034 

1080 47: DNSRRNSEC, # RFC 4034 

1081 48: DNSRRDNSKEY, # RFC 4034 

1082 50: DNSRRNSEC3, # RFC 5155 

1083 51: DNSRRNSEC3PARAM, # RFC 5155 

1084 64: DNSRRSVCB, # RFC 9460 

1085 65: DNSRRHTTPS, # RFC 9460 

1086 250: DNSRRTSIG, # RFC 2845 

1087 32769: DNSRRDLV, # RFC 4431 

1088} 

1089 

1090 

1091class DNSRR(Packet): 

1092 name = "DNS Resource Record" 

1093 show_indent = 0 

1094 fields_desc = [DNSStrField("rrname", ""), 

1095 ShortEnumField("type", 1, dnstypes), 

1096 ShortEnumField("rclass", 1, dnsclasses), 

1097 IntField("ttl", 0), 

1098 FieldLenField("rdlen", None, length_of="rdata", fmt="H"), 

1099 MultipleTypeField( 

1100 [ 

1101 # A 

1102 (IPField("rdata", "0.0.0.0"), 

1103 lambda pkt: pkt.type == 1), 

1104 # AAAA 

1105 (IP6Field("rdata", "::"), 

1106 lambda pkt: pkt.type == 28), 

1107 # NS, MD, MF, CNAME, PTR, DNAME 

1108 (DNSStrField("rdata", "", 

1109 length_from=lambda pkt: pkt.rdlen), 

1110 lambda pkt: pkt.type in [2, 3, 4, 5, 12, 39]), 

1111 # TEXT 

1112 (DNSTextField("rdata", [""], 

1113 length_from=lambda pkt: pkt.rdlen), 

1114 lambda pkt: pkt.type == 16), 

1115 ], 

1116 StrLenField("rdata", "", 

1117 length_from=lambda pkt:pkt.rdlen) 

1118 )] 

1119 

1120 def default_payload_class(self, payload): 

1121 return conf.padding_layer 

1122 

1123 

1124def _DNSRR(s, **kwargs): 

1125 """ 

1126 DNSRR dispatcher func 

1127 """ 

1128 if s: 

1129 # Try to find the type of the RR using the dispatcher 

1130 _, remain = dns_get_str(s, _ignore_compression=True) 

1131 cls = DNSRR_DISPATCHER.get( 

1132 struct.unpack("!H", remain[:2])[0], 

1133 DNSRR, 

1134 ) 

1135 rrlen = ( 

1136 len(s) - len(remain) + # rrname len 

1137 10 + 

1138 struct.unpack("!H", remain[8:10])[0] 

1139 ) 

1140 pkt = cls(s[:rrlen], **kwargs) / conf.padding_layer(s[rrlen:]) 

1141 # drop rdlen because if rdata was compressed, it will break everything 

1142 # when rebuilding 

1143 del pkt.fields["rdlen"] 

1144 return pkt 

1145 return None 

1146 

1147 

1148class DNSQR(Packet): 

1149 name = "DNS Question Record" 

1150 show_indent = 0 

1151 fields_desc = [DNSStrField("qname", "www.example.com"), 

1152 ShortEnumField("qtype", 1, dnsqtypes), 

1153 ShortEnumField("qclass", 1, dnsclasses)] 

1154 

1155 def default_payload_class(self, payload): 

1156 return conf.padding_layer 

1157 

1158 

1159class _DNSPacketListField(PacketListField): 

1160 # A normal PacketListField with backward-compatible hacks 

1161 def any2i(self, pkt, x): 

1162 # type: (Optional[Packet], List[Any]) -> List[Any] 

1163 if x is None: 

1164 warnings.warn( 

1165 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now " 

1166 "PacketListField(s) ! " 

1167 "Setting a null default should be [] instead of None"), 

1168 DeprecationWarning 

1169 ) 

1170 x = [] 

1171 return super(_DNSPacketListField, self).any2i(pkt, x) 

1172 

1173 def i2h(self, pkt, x): 

1174 # type: (Optional[Packet], List[Packet]) -> Any 

1175 class _list(list): 

1176 """ 

1177 Fake list object to provide compatibility with older DNS fields 

1178 """ 

1179 def __getattr__(self, attr): 

1180 try: 

1181 ret = getattr(self[0], attr) 

1182 warnings.warn( 

1183 ("The DNS fields 'qd', 'an', 'ns' and 'ar' are now " 

1184 "PacketListField(s) ! " 

1185 "To access the first element, use pkt.an[0] instead of " 

1186 "pkt.an"), 

1187 DeprecationWarning 

1188 ) 

1189 return ret 

1190 except AttributeError: 

1191 raise 

1192 return _list(x) 

1193 

1194 

1195class DNS(DNSCompressedPacket): 

1196 name = "DNS" 

1197 fields_desc = [ 

1198 ConditionalField(ShortField("length", None), 

1199 lambda p: isinstance(p.underlayer, TCP)), 

1200 ShortField("id", 0), 

1201 BitField("qr", 0, 1), 

1202 BitEnumField("opcode", 0, 4, {0: "QUERY", 1: "IQUERY", 2: "STATUS"}), 

1203 BitField("aa", 0, 1), 

1204 BitField("tc", 0, 1), 

1205 BitField("rd", 1, 1), 

1206 BitField("ra", 0, 1), 

1207 BitField("z", 0, 1), 

1208 # AD and CD bits are defined in RFC 2535 

1209 BitField("ad", 0, 1), # Authentic Data 

1210 BitField("cd", 0, 1), # Checking Disabled 

1211 BitEnumField("rcode", 0, 4, {0: "ok", 1: "format-error", 

1212 2: "server-failure", 3: "name-error", 

1213 4: "not-implemented", 5: "refused"}), 

1214 FieldLenField("qdcount", None, count_of="qd"), 

1215 FieldLenField("ancount", None, count_of="an"), 

1216 FieldLenField("nscount", None, count_of="ns"), 

1217 FieldLenField("arcount", None, count_of="ar"), 

1218 _DNSPacketListField("qd", [DNSQR()], DNSQR, count_from=lambda pkt: pkt.qdcount), 

1219 _DNSPacketListField("an", [], _DNSRR, count_from=lambda pkt: pkt.ancount), 

1220 _DNSPacketListField("ns", [], _DNSRR, count_from=lambda pkt: pkt.nscount), 

1221 _DNSPacketListField("ar", [], _DNSRR, count_from=lambda pkt: pkt.arcount), 

1222 ] 

1223 

1224 def get_full(self): 

1225 # Required for DNSCompressedPacket 

1226 if isinstance(self.underlayer, TCP): 

1227 return self.original[2:] 

1228 else: 

1229 return self.original 

1230 

1231 def answers(self, other): 

1232 return (isinstance(other, DNS) and 

1233 self.id == other.id and 

1234 self.qr == 1 and 

1235 other.qr == 0) 

1236 

1237 def mysummary(self): 

1238 name = "" 

1239 if self.qr: 

1240 type = "Ans" 

1241 if self.an and isinstance(self.an[0], DNSRR): 

1242 name = ' %s' % self.an[0].rdata 

1243 elif self.rcode != 0: 

1244 name = self.sprintf(' %rcode%') 

1245 else: 

1246 type = "Qry" 

1247 if self.qd and isinstance(self.qd[0], DNSQR): 

1248 name = ' %s' % self.qd[0].qname 

1249 return "%sDNS %s%s" % ( 

1250 "m" 

1251 if isinstance(self.underlayer, UDP) and self.underlayer.dport == 5353 

1252 else "", 

1253 type, 

1254 name, 

1255 ) 

1256 

1257 def post_build(self, pkt, pay): 

1258 if isinstance(self.underlayer, TCP) and self.length is None: 

1259 pkt = struct.pack("!H", len(pkt) - 2) + pkt[2:] 

1260 return pkt + pay 

1261 

1262 def compress(self): 

1263 """Return the compressed DNS packet (using `dns_compress()`)""" 

1264 return dns_compress(self) 

1265 

1266 def pre_dissect(self, s): 

1267 """ 

1268 Check that a valid DNS over TCP message can be decoded 

1269 """ 

1270 if isinstance(self.underlayer, TCP): 

1271 

1272 # Compute the length of the DNS packet 

1273 if len(s) >= 2: 

1274 dns_len = struct.unpack("!H", s[:2])[0] 

1275 else: 

1276 message = "Malformed DNS message: too small!" 

1277 log_runtime.info(message) 

1278 raise Scapy_Exception(message) 

1279 

1280 # Check if the length is valid 

1281 if dns_len < 14 or len(s) < dns_len: 

1282 message = "Malformed DNS message: invalid length!" 

1283 log_runtime.info(message) 

1284 raise Scapy_Exception(message) 

1285 

1286 return s 

1287 

1288 

1289bind_layers(UDP, DNS, dport=5353) 

1290bind_layers(UDP, DNS, sport=5353) 

1291bind_layers(UDP, DNS, dport=53) 

1292bind_layers(UDP, DNS, sport=53) 

1293DestIPField.bind_addr(UDP, "224.0.0.251", dport=5353) 

1294if conf.ipv6_enabled: 

1295 from scapy.layers.inet6 import DestIP6Field 

1296 DestIP6Field.bind_addr(UDP, "ff02::fb", dport=5353) 

1297bind_layers(TCP, DNS, dport=53) 

1298bind_layers(TCP, DNS, sport=53) 

1299 

1300# Nameserver config 

1301conf.nameservers = read_nameservers() 

1302_dns_cache = conf.netcache.new_cache("dns_cache", 300) 

1303 

1304 

1305@conf.commands.register 

1306def dns_resolve(qname, qtype="A", raw=False, verbose=1, timeout=3, **kwargs): 

1307 """ 

1308 Perform a simple DNS resolution using conf.nameservers with caching 

1309 

1310 :param qname: the name to query 

1311 :param qtype: the type to query (default A) 

1312 :param raw: return the whole DNS packet (default False) 

1313 :param verbose: show verbose errors 

1314 :param timeout: seconds until timeout (per server) 

1315 :raise TimeoutError: if no DNS servers were reached in time. 

1316 """ 

1317 # Unify types 

1318 qtype = DNSQR.qtype.any2i_one(None, qtype) 

1319 qname = DNSQR.qname.any2i(None, qname) 

1320 # Check cache 

1321 cache_ident = b";".join( 

1322 [qname, struct.pack("!B", qtype)] + 

1323 ([b"raw"] if raw else []) 

1324 ) 

1325 result = _dns_cache.get(cache_ident) 

1326 if result: 

1327 return result 

1328 

1329 kwargs.setdefault("timeout", timeout) 

1330 kwargs.setdefault("verbose", 0) 

1331 res = None 

1332 for nameserver in conf.nameservers: 

1333 # Try all nameservers 

1334 try: 

1335 # Spawn a UDP socket, connect to the nameserver on port 53 

1336 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 

1337 sock.settimeout(kwargs["timeout"]) 

1338 sock.connect((nameserver, 53)) 

1339 # Connected. Wrap it with DNS 

1340 sock = StreamSocket(sock, DNS) 

1341 # I/O 

1342 res = sock.sr1( 

1343 DNS(qd=[DNSQR(qname=qname, qtype=qtype)], id=RandShort()), 

1344 **kwargs, 

1345 ) 

1346 except IOError as ex: 

1347 if verbose: 

1348 log_runtime.warning(str(ex)) 

1349 continue 

1350 finally: 

1351 sock.close() 

1352 if res: 

1353 # We have a response ! Check for failure 

1354 if res[DNS].rcode == 2: # server failure 

1355 res = None 

1356 if verbose: 

1357 log_runtime.info( 

1358 "DNS: %s answered with failure for %s" % ( 

1359 nameserver, 

1360 qname, 

1361 ) 

1362 ) 

1363 else: 

1364 break 

1365 if res is not None: 

1366 if raw: 

1367 # Raw 

1368 result = res 

1369 else: 

1370 # Find answers 

1371 result = [ 

1372 x 

1373 for x in itertools.chain(res.an, res.ns, res.ar) 

1374 if x.type == qtype 

1375 ] 

1376 if result: 

1377 # Cache it 

1378 _dns_cache[cache_ident] = result 

1379 return result 

1380 else: 

1381 raise TimeoutError 

1382 

1383 

1384@conf.commands.register 

1385def dyndns_add(nameserver, name, rdata, type="A", ttl=10): 

1386 """Send a DNS add message to a nameserver for "name" to have a new "rdata" 

1387dyndns_add(nameserver, name, rdata, type="A", ttl=10) -> result code (0=ok) 

1388 

1389example: dyndns_add("ns1.toto.com", "dyn.toto.com", "127.0.0.1") 

1390RFC2136 

1391""" 

1392 zone = name[name.find(".") + 1:] 

1393 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5, 

1394 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501 

1395 ns=[DNSRR(rrname=name, type="A", 

1396 ttl=ttl, rdata=rdata)]), 

1397 verbose=0, timeout=5) 

1398 if r and r.haslayer(DNS): 

1399 return r.getlayer(DNS).rcode 

1400 else: 

1401 return -1 

1402 

1403 

1404@conf.commands.register 

1405def dyndns_del(nameserver, name, type="ALL", ttl=10): 

1406 """Send a DNS delete message to a nameserver for "name" 

1407dyndns_del(nameserver, name, type="ANY", ttl=10) -> result code (0=ok) 

1408 

1409example: dyndns_del("ns1.toto.com", "dyn.toto.com") 

1410RFC2136 

1411""" 

1412 zone = name[name.find(".") + 1:] 

1413 r = sr1(IP(dst=nameserver) / UDP() / DNS(opcode=5, 

1414 qd=[DNSQR(qname=zone, qtype="SOA")], # noqa: E501 

1415 ns=[DNSRR(rrname=name, type=type, 

1416 rclass="ANY", ttl=0, rdata="")]), # noqa: E501 

1417 verbose=0, timeout=5) 

1418 if r and r.haslayer(DNS): 

1419 return r.getlayer(DNS).rcode 

1420 else: 

1421 return -1 

1422 

1423 

1424class DNS_am(AnsweringMachine): 

1425 function_name = "dnsd" 

1426 filter = "udp port 53" 

1427 cls = DNS # We also use this automaton for llmnrd / mdnsd 

1428 

1429 def parse_options(self, joker=None, 

1430 match=None, 

1431 srvmatch=None, 

1432 joker6=False, 

1433 send_error=False, 

1434 relay=False, 

1435 from_ip=None, 

1436 from_ip6=None, 

1437 src_ip=None, 

1438 src_ip6=None, 

1439 ttl=10, 

1440 jokerarpa=None): 

1441 """ 

1442 :param joker: default IPv4 for unresolved domains. (Default: None) 

1443 Set to False to disable, None to mirror the interface's IP. 

1444 :param joker6: default IPv6 for unresolved domains (Default: False) 

1445 set to False to disable, None to mirror the interface's IPv6. 

1446 :param jokerarpa: answer for .in-addr.arpa PTR requests. (Default: None) 

1447 :param relay: relay unresolved domains to conf.nameservers (Default: False). 

1448 :param send_error: send an error message when this server can't answer 

1449 (Default: False) 

1450 :param match: a dictionary of {name: val} where name is a string representing 

1451 a domain name (A, AAAA) and val is a tuple of 2 elements, each 

1452 representing an IP or a list of IPs. If val is a single element, 

1453 (A, None) is assumed. 

1454 :param srvmatch: a dictionary of {name: (port, target)} used for SRV 

1455 :param from_ip: an source IP to filter. Can contain a netmask 

1456 :param from_ip6: an source IPv6 to filter. Can contain a netmask 

1457 :param ttl: the DNS time to live (in seconds) 

1458 :param src_ip: override the source IP 

1459 :param src_ip6: 

1460 

1461 Example:: 

1462 

1463 $ sudo iptables -I OUTPUT -p icmp --icmp-type 3/3 -j DROP 

1464 >>> dnsd(match={"google.com": "1.1.1.1"}, joker="192.168.0.2", iface="eth0") 

1465 >>> dnsd(srvmatch={ 

1466 ... "_ldap._tcp.dc._msdcs.DOMAIN.LOCAL.": (389, "srv1.domain.local") 

1467 ... }) 

1468 """ 

1469 def normv(v): 

1470 if isinstance(v, (tuple, list)) and len(v) == 2: 

1471 return v 

1472 elif isinstance(v, str): 

1473 return (v, None) 

1474 else: 

1475 raise ValueError("Bad match value: '%s'" % repr(v)) 

1476 

1477 def normk(k): 

1478 k = bytes_encode(k).lower() 

1479 if not k.endswith(b"."): 

1480 k += b"." 

1481 return k 

1482 if match is None: 

1483 self.match = {} 

1484 else: 

1485 self.match = {normk(k): normv(v) for k, v in match.items()} 

1486 if srvmatch is None: 

1487 self.srvmatch = {} 

1488 else: 

1489 self.srvmatch = {normk(k): normv(v) for k, v in srvmatch.items()} 

1490 self.joker = joker 

1491 self.joker6 = joker6 

1492 self.jokerarpa = jokerarpa 

1493 self.send_error = send_error 

1494 self.relay = relay 

1495 if isinstance(from_ip, str): 

1496 self.from_ip = Net(from_ip) 

1497 else: 

1498 self.from_ip = from_ip 

1499 if isinstance(from_ip6, str): 

1500 self.from_ip6 = Net(from_ip6) 

1501 else: 

1502 self.from_ip6 = from_ip6 

1503 self.src_ip = src_ip 

1504 self.src_ip6 = src_ip6 

1505 self.ttl = ttl 

1506 

1507 def is_request(self, req): 

1508 from scapy.layers.inet6 import IPv6 

1509 return ( 

1510 req.haslayer(self.cls) and 

1511 req.getlayer(self.cls).qr == 0 and ( 

1512 ( 

1513 not self.from_ip6 or req[IPv6].src in self.from_ip6 

1514 ) 

1515 if IPv6 in req else 

1516 ( 

1517 not self.from_ip or req[IP].src in self.from_ip 

1518 ) 

1519 ) 

1520 ) 

1521 

1522 def make_reply(self, req): 

1523 mDNS = isinstance(self, mDNS_am) 

1524 llmnr = self.cls != DNS 

1525 # Build reply from the request 

1526 resp = req.copy() 

1527 if Ether in req: 

1528 if mDNS: 

1529 resp[Ether].src, resp[Ether].dst = None, None 

1530 elif llmnr: 

1531 resp[Ether].src, resp[Ether].dst = None, req[Ether].src 

1532 else: 

1533 resp[Ether].src, resp[Ether].dst = ( 

1534 None if req[Ether].dst in "ff:ff:ff:ff:ff:ff" else req[Ether].dst, 

1535 req[Ether].src, 

1536 ) 

1537 from scapy.layers.inet6 import IPv6 

1538 if IPv6 in req: 

1539 resp[IPv6].underlayer.remove_payload() 

1540 if mDNS: 

1541 resp /= IPv6(dst="ff02::fb", src=self.src_ip6) 

1542 elif llmnr: 

1543 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6) 

1544 else: 

1545 resp /= IPv6(dst=req[IPv6].src, src=self.src_ip6 or req[IPv6].dst) 

1546 elif IP in req: 

1547 resp[IP].underlayer.remove_payload() 

1548 if mDNS: 

1549 resp /= IP(dst="224.0.0.251", src=self.src_ip) 

1550 elif llmnr: 

1551 resp /= IP(dst=req[IP].src, src=self.src_ip) 

1552 else: 

1553 resp /= IP(dst=req[IP].src, src=self.src_ip or req[IP].dst) 

1554 else: 

1555 warning("No IP or IPv6 layer in %s", req.command()) 

1556 return 

1557 try: 

1558 resp /= UDP(sport=req[UDP].dport, dport=req[UDP].sport) 

1559 except IndexError: 

1560 warning("No UDP layer in %s", req.command(), exc_info=True) 

1561 return 

1562 # Now process each query and store its answer in 'ans' 

1563 ans = [] 

1564 try: 

1565 req = req[self.cls] 

1566 except IndexError: 

1567 warning( 

1568 "No %s layer in %s", 

1569 self.cls.__name__, 

1570 req.command(), 

1571 exc_info=True, 

1572 ) 

1573 return 

1574 try: 

1575 queries = req.qd 

1576 except AttributeError: 

1577 warning("No qd attribute in %s", req.command(), exc_info=True) 

1578 return 

1579 for rq in queries: 

1580 # For each query 

1581 if isinstance(rq, Raw): 

1582 warning("Cannot parse qd element %s", rq.command(), exc_info=True) 

1583 continue 

1584 if rq.qtype in [1, 28]: 

1585 # A or AAAA 

1586 if rq.qtype == 28: 

1587 # AAAA 

1588 try: 

1589 rdata = self.match[rq.qname.lower()][1] 

1590 except KeyError: 

1591 if self.relay or self.joker6 is False: 

1592 rdata = None 

1593 else: 

1594 rdata = self.joker6 or get_if_addr6( 

1595 self.optsniff.get("iface", conf.iface) 

1596 ) 

1597 elif rq.qtype == 1: 

1598 # A 

1599 try: 

1600 rdata = self.match[rq.qname.lower()][0] 

1601 except KeyError: 

1602 if self.relay or self.joker is False: 

1603 rdata = None 

1604 else: 

1605 rdata = self.joker or get_if_addr( 

1606 self.optsniff.get("iface", conf.iface) 

1607 ) 

1608 if rdata is not None: 

1609 # Common A and AAAA 

1610 if not isinstance(rdata, list): 

1611 rdata = [rdata] 

1612 ans.extend([ 

1613 DNSRR(rrname=rq.qname, ttl=self.ttl, rdata=x, type=rq.qtype) 

1614 for x in rdata 

1615 ]) 

1616 continue # next 

1617 elif rq.qtype == 33: 

1618 # SRV 

1619 try: 

1620 port, target = self.srvmatch[rq.qname.lower()] 

1621 ans.append(DNSRRSRV( 

1622 rrname=rq.qname, 

1623 port=port, 

1624 target=target, 

1625 weight=100, 

1626 ttl=self.ttl 

1627 )) 

1628 continue # next 

1629 except KeyError: 

1630 # No result 

1631 pass 

1632 elif rq.qtype == 12: 

1633 # PTR 

1634 if rq.qname[-14:] == b".in-addr.arpa." and self.jokerarpa: 

1635 ans.append(DNSRR( 

1636 rrname=rq.qname, 

1637 type=rq.qtype, 

1638 ttl=self.ttl, 

1639 rdata=self.jokerarpa, 

1640 )) 

1641 continue 

1642 # It it arrives here, there is currently no answer 

1643 if self.relay: 

1644 # Relay mode ? 

1645 try: 

1646 _rslv = dns_resolve(rq.qname, qtype=rq.qtype) 

1647 if _rslv: 

1648 ans.extend(_rslv) 

1649 continue # next 

1650 except TimeoutError: 

1651 pass 

1652 # Error 

1653 break 

1654 else: 

1655 if not ans: 

1656 # No rq was actually answered, as none was valid. Discard. 

1657 return 

1658 # All rq were answered 

1659 if mDNS: 

1660 # in mDNS mode, don't repeat the question 

1661 resp /= self.cls(id=req.id, qr=1, qd=[], an=ans) 

1662 else: 

1663 resp /= self.cls(id=req.id, qr=1, qd=req.qd, an=ans) 

1664 return resp 

1665 # An error happened 

1666 if self.send_error: 

1667 resp /= self.cls(id=req.id, qr=1, qd=req.qd, rcode=3) 

1668 return resp 

1669 

1670 

1671class mDNS_am(DNS_am): 

1672 """ 

1673 mDNS answering machine. 

1674 

1675 This has the same arguments as DNS_am. See help(DNS_am) 

1676 

1677 Example:: 

1678 

1679 >>> mdnsd(joker="192.168.0.2", iface="eth0") 

1680 >>> mdnsd(match={"TEST.local": "192.168.0.2"}) 

1681 """ 

1682 function_name = "mdnsd" 

1683 filter = "udp port 5353"