Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/dhcp.py: 47%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

323 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7DHCP (Dynamic Host Configuration Protocol) and BOOTP 

8 

9Implements: 

10- rfc951 - BOOTSTRAP PROTOCOL (BOOTP) 

11- rfc1542 - Clarifications and Extensions for the Bootstrap Protocol 

12- rfc1533 - DHCP Options and BOOTP Vendor Extensions 

13""" 

14 

15try: 

16 from collections.abc import Iterable 

17except ImportError: 

18 # For backwards compatibility. This was removed in Python 3.8 

19 from collections import Iterable 

20import random 

21import struct 

22 

23import socket 

24import re 

25 

26from scapy.ansmachine import AnsweringMachine 

27from scapy.base_classes import Net 

28from scapy.compat import chb, orb, bytes_encode 

29from scapy.fields import ( 

30 ByteEnumField, 

31 ByteField, 

32 Field, 

33 FieldListField, 

34 FlagsField, 

35 IntField, 

36 IPField, 

37 MACField, 

38 ShortField, 

39 StrEnumField, 

40 StrField, 

41 StrFixedLenField, 

42 XIntField, 

43) 

44from scapy.layers.inet import UDP, IP 

45from scapy.layers.l2 import Ether, HARDWARE_TYPES 

46from scapy.packet import bind_layers, bind_bottom_up, Packet 

47from scapy.utils import atol, itom, ltoa, sane, str2mac, mac2str 

48from scapy.volatile import ( 

49 RandBin, 

50 RandByte, 

51 RandField, 

52 RandIP, 

53 RandInt, 

54 RandNum, 

55 RandNumExpo, 

56 VolatileValue, 

57) 

58 

59from scapy.arch import get_if_hwaddr 

60from scapy.sendrecv import srp1 

61from scapy.error import warning 

62from scapy.config import conf 

63 

64# Typing imports 

65from typing import ( 

66 List, 

67 Optional, 

68 Union, 

69) 

70 

71dhcpmagic = b"c\x82Sc" 

72 

73 

74class _BOOTP_chaddr(StrFixedLenField): 

75 def i2m(self, pkt, x): 

76 if isinstance(x, VolatileValue): 

77 x = x._fix() 

78 return MACField.i2m(self, pkt, x) 

79 

80 def i2repr(self, pkt, v): 

81 if isinstance(v, VolatileValue): 

82 return repr(v) 

83 if pkt.htype == 1: # Ethernet 

84 if v[6:] == b"\x00" * 10: # Default padding 

85 return "%s (+ 10 nul pad)" % str2mac(v[:6]) 

86 else: 

87 return "%s (pad: %s)" % (str2mac(v[:6]), v[6:]) 

88 return super(_BOOTP_chaddr, self).i2repr(pkt, v) 

89 

90 

91class BOOTP(Packet): 

92 name = "BOOTP" 

93 fields_desc = [ 

94 ByteEnumField("op", 1, {1: "BOOTREQUEST", 2: "BOOTREPLY"}), 

95 ByteEnumField("htype", 1, HARDWARE_TYPES), 

96 ByteField("hlen", 6), 

97 ByteField("hops", 0), 

98 XIntField("xid", 0), 

99 ShortField("secs", 0), 

100 FlagsField("flags", 0, 16, "???????????????B"), 

101 IPField("ciaddr", "0.0.0.0"), 

102 IPField("yiaddr", "0.0.0.0"), 

103 IPField("siaddr", "0.0.0.0"), 

104 IPField("giaddr", "0.0.0.0"), 

105 _BOOTP_chaddr("chaddr", b"", length=16), 

106 StrFixedLenField("sname", b"", length=64), 

107 StrFixedLenField("file", b"", length=128), 

108 StrEnumField("options", b"", {dhcpmagic: "DHCP magic"})] 

109 

110 def guess_payload_class(self, payload): 

111 if self.options[:len(dhcpmagic)] == dhcpmagic: 

112 return DHCP 

113 else: 

114 return Packet.guess_payload_class(self, payload) 

115 

116 def extract_padding(self, s): 

117 if self.options[:len(dhcpmagic)] == dhcpmagic: 

118 # set BOOTP options to DHCP magic cookie and make rest a payload of DHCP options # noqa: E501 

119 payload = self.options[len(dhcpmagic):] 

120 self.options = self.options[:len(dhcpmagic)] 

121 return payload, None 

122 else: 

123 return b"", None 

124 

125 def hashret(self): 

126 return struct.pack("!I", self.xid) 

127 

128 def answers(self, other): 

129 if not isinstance(other, BOOTP): 

130 return 0 

131 return self.xid == other.xid 

132 

133 

134class _DHCPByteFieldListField(FieldListField): 

135 def randval(self): 

136 class _RandByteFieldList(RandField): 

137 def _fix(self): 

138 return [RandByte()] * int(RandByte()) 

139 return _RandByteFieldList() 

140 

141 

142class RandClasslessStaticRoutesField(RandField): 

143 """ 

144 A RandValue for classless static routes 

145 """ 

146 

147 def _fix(self): 

148 return "%s/%d:%s" % (RandIP(), RandNum(0, 32), RandIP()) 

149 

150 

151class ClasslessFieldListField(FieldListField): 

152 def randval(self): 

153 class _RandClasslessField(RandField): 

154 def _fix(self): 

155 return [RandClasslessStaticRoutesField()] * int(RandNum(1, 28)) 

156 return _RandClasslessField() 

157 

158 

159class ClasslessStaticRoutesField(Field): 

160 """ 

161 RFC 3442 defines classless static routes as up to 9 bytes per entry: 

162 

163 # Code Len Destination 1 Router 1 

164 +-----+---+----+-----+----+----+----+----+----+ 

165 | 121 | n | d1 | ... | dN | r1 | r2 | r3 | r4 | 

166 +-----+---+----+-----+----+----+----+----+----+ 

167 

168 Destination first byte contains one octet describing the width followed 

169 by all the significant octets of the subnet. 

170 """ 

171 

172 def m2i(self, pkt, x): 

173 # type: (Packet, bytes) -> str 

174 # b'\x20\x01\x02\x03\x04\t\x08\x07\x06' -> (1.2.3.4/32:9.8.7.6) 

175 prefix = orb(x[0]) 

176 

177 octets = (prefix + 7) // 8 

178 # Create the destination IP by using the number of octets 

179 # and padding up to 4 bytes to ensure a valid IP. 

180 dest = x[1:1 + octets] 

181 dest = socket.inet_ntoa(dest.ljust(4, b'\x00')) 

182 

183 router = x[1 + octets:5 + octets] 

184 router = socket.inet_ntoa(router) 

185 

186 return dest + "/" + str(prefix) + ":" + router 

187 

188 def i2m(self, pkt, x): 

189 # type: (Packet, str) -> bytes 

190 # (1.2.3.4/32:9.8.7.6) -> b'\x20\x01\x02\x03\x04\t\x08\x07\x06' 

191 if not x: 

192 return b'' 

193 

194 spx = re.split('/|:', str(x)) 

195 prefix = int(spx[1]) 

196 # if prefix is invalid value ( 0 > prefix > 32 ) then break 

197 if prefix > 32 or prefix < 0: 

198 warning("Invalid prefix value: %d (0x%x)", prefix, prefix) 

199 return b'' 

200 octets = (prefix + 7) // 8 

201 dest = socket.inet_aton(spx[0])[:octets] 

202 router = socket.inet_aton(spx[2]) 

203 return struct.pack('b', prefix) + dest + router 

204 

205 def getfield(self, pkt, s): 

206 prefix = orb(s[0]) 

207 route_len = 5 + (prefix + 7) // 8 

208 return s[route_len:], self.m2i(pkt, s[:route_len]) 

209 

210 def addfield(self, pkt, s, val): 

211 return s + self.i2m(pkt, val) 

212 

213 def randval(self): 

214 return RandClasslessStaticRoutesField() 

215 

216 

217# DHCP_UNKNOWN, DHCP_IP, DHCP_IPLIST, DHCP_TYPE \ 

218# = range(4) 

219# 

220 

221# DHCP Options and BOOTP Vendor Extensions 

222 

223 

224DHCPTypes = { 

225 1: "discover", 

226 2: "offer", 

227 3: "request", 

228 4: "decline", 

229 5: "ack", 

230 6: "nak", 

231 7: "release", 

232 8: "inform", 

233 9: "force_renew", 

234 10: "lease_query", 

235 11: "lease_unassigned", 

236 12: "lease_unknown", 

237 13: "lease_active", 

238} 

239 

240DHCPOptions = { 

241 0: "pad", 

242 1: IPField("subnet_mask", "0.0.0.0"), 

243 2: IntField("time_zone", 500), 

244 3: IPField("router", "0.0.0.0"), 

245 4: IPField("time_server", "0.0.0.0"), 

246 5: IPField("IEN_name_server", "0.0.0.0"), 

247 6: IPField("name_server", "0.0.0.0"), 

248 7: IPField("log_server", "0.0.0.0"), 

249 8: IPField("cookie_server", "0.0.0.0"), 

250 9: IPField("lpr_server", "0.0.0.0"), 

251 10: IPField("impress-servers", "0.0.0.0"), 

252 11: IPField("resource-location-servers", "0.0.0.0"), 

253 12: "hostname", 

254 13: ShortField("boot-size", 1000), 

255 14: "dump_path", 

256 15: "domain", 

257 16: IPField("swap-server", "0.0.0.0"), 

258 17: "root_disk_path", 

259 18: "extensions-path", 

260 19: ByteField("ip-forwarding", 0), 

261 20: ByteField("non-local-source-routing", 0), 

262 21: IPField("policy-filter", "0.0.0.0"), 

263 22: ShortField("max_dgram_reass_size", 300), 

264 23: ByteField("default_ttl", 50), 

265 24: IntField("pmtu_timeout", 1000), 

266 25: ShortField("path-mtu-plateau-table", 1000), 

267 26: ShortField("interface-mtu", 50), 

268 27: ByteField("all-subnets-local", 0), 

269 28: IPField("broadcast_address", "0.0.0.0"), 

270 29: ByteField("perform-mask-discovery", 0), 

271 30: ByteField("mask-supplier", 0), 

272 31: ByteField("router-discovery", 0), 

273 32: IPField("router-solicitation-address", "0.0.0.0"), 

274 33: IPField("static-routes", "0.0.0.0"), 

275 34: ByteField("trailer-encapsulation", 0), 

276 35: IntField("arp_cache_timeout", 1000), 

277 36: ByteField("ieee802-3-encapsulation", 0), 

278 37: ByteField("tcp_ttl", 100), 

279 38: IntField("tcp_keepalive_interval", 1000), 

280 39: ByteField("tcp_keepalive_garbage", 0), 

281 40: StrField("NIS_domain", "www.example.com"), 

282 41: IPField("NIS_server", "0.0.0.0"), 

283 42: IPField("NTP_server", "0.0.0.0"), 

284 43: "vendor_specific", 

285 44: IPField("NetBIOS_server", "0.0.0.0"), 

286 45: IPField("NetBIOS_dist_server", "0.0.0.0"), 

287 46: ByteField("NetBIOS_node_type", 100), 

288 47: "netbios-scope", 

289 48: IPField("font-servers", "0.0.0.0"), 

290 49: IPField("x-display-manager", "0.0.0.0"), 

291 50: IPField("requested_addr", "0.0.0.0"), 

292 51: IntField("lease_time", 43200), 

293 52: ByteField("dhcp-option-overload", 100), 

294 53: ByteEnumField("message-type", 1, DHCPTypes), 

295 54: IPField("server_id", "0.0.0.0"), 

296 55: _DHCPByteFieldListField( 

297 "param_req_list", [], 

298 ByteField("opcode", 0)), 

299 56: "error_message", 

300 57: ShortField("max_dhcp_size", 1500), 

301 58: IntField("renewal_time", 21600), 

302 59: IntField("rebinding_time", 37800), 

303 60: StrField("vendor_class_id", "id"), 

304 61: StrField("client_id", ""), 

305 62: "nwip-domain-name", 

306 64: "NISplus_domain", 

307 65: IPField("NISplus_server", "0.0.0.0"), 

308 66: "tftp_server_name", 

309 67: StrField("boot-file-name", ""), 

310 68: IPField("mobile-ip-home-agent", "0.0.0.0"), 

311 69: IPField("SMTP_server", "0.0.0.0"), 

312 70: IPField("POP3_server", "0.0.0.0"), 

313 71: IPField("NNTP_server", "0.0.0.0"), 

314 72: IPField("WWW_server", "0.0.0.0"), 

315 73: IPField("Finger_server", "0.0.0.0"), 

316 74: IPField("IRC_server", "0.0.0.0"), 

317 75: IPField("StreetTalk_server", "0.0.0.0"), 

318 76: IPField("StreetTalk_Dir_Assistance", "0.0.0.0"), 

319 77: "user_class", 

320 78: "slp_service_agent", 

321 79: "slp_service_scope", 

322 80: "rapid_commit", 

323 81: "client_FQDN", 

324 82: "relay_agent_information", 

325 85: IPField("nds-server", "0.0.0.0"), 

326 86: StrField("nds-tree-name", ""), 

327 87: StrField("nds-context", ""), 

328 88: "bcms-controller-namesi", 

329 89: IPField("bcms-controller-address", "0.0.0.0"), 

330 91: IntField("client-last-transaction-time", 1000), 

331 92: IPField("associated-ip", "0.0.0.0"), 

332 93: "pxe_client_architecture", 

333 94: "pxe_client_network_interface", 

334 97: "pxe_client_machine_identifier", 

335 98: StrField("uap-servers", ""), 

336 100: StrField("pcode", ""), 

337 101: StrField("tcode", ""), 

338 108: IntField("ipv6-only-preferred", 0), 

339 112: IPField("netinfo-server-address", "0.0.0.0"), 

340 113: StrField("netinfo-server-tag", ""), 

341 114: StrField("captive-portal", ""), 

342 116: ByteField("auto-config", 0), 

343 117: ShortField("name-service-search", 0,), 

344 118: IPField("subnet-selection", "0.0.0.0"), 

345 121: ClasslessFieldListField( 

346 "classless_static_routes", 

347 [], 

348 ClasslessStaticRoutesField("route", 0)), 

349 124: "vendor_class", 

350 125: "vendor_specific_information", 

351 128: IPField("tftp_server_ip_address", "0.0.0.0"), 

352 136: IPField("pana-agent", "0.0.0.0"), 

353 137: "v4-lost", 

354 138: IPField("capwap-ac-v4", "0.0.0.0"), 

355 141: "sip_ua_service_domains", 

356 145: _DHCPByteFieldListField( 

357 "forcerenew_nonce_capable", [], 

358 ByteEnumField("algorithm", 1, {1: "HMAC-MD5"})), 

359 146: "rdnss-selection", 

360 150: IPField("tftp_server_address", "0.0.0.0"), 

361 159: "v4-portparams", 

362 160: StrField("v4-captive-portal", ""), 

363 161: StrField("mud-url", ""), 

364 208: "pxelinux_magic", 

365 209: "pxelinux_configuration_file", 

366 210: "pxelinux_path_prefix", 

367 211: "pxelinux_reboot_time", 

368 212: "option-6rd", 

369 213: "v4-access-domain", 

370 255: "end" 

371} 

372 

373DHCPRevOptions = {} 

374 

375for k, v in DHCPOptions.items(): 

376 if isinstance(v, str): 

377 n = v 

378 v = None 

379 else: 

380 n = v.name 

381 DHCPRevOptions[n] = (k, v) 

382del n 

383del v 

384del k 

385 

386 

387class RandDHCPOptions(RandField): 

388 def __init__(self, size=None, rndstr=None): 

389 if size is None: 

390 size = RandNumExpo(0.05) 

391 self.size = size 

392 if rndstr is None: 

393 rndstr = RandBin(RandNum(0, 255)) 

394 self.rndstr = rndstr 

395 self._opts = list(DHCPOptions.values()) 

396 self._opts.remove("pad") 

397 self._opts.remove("end") 

398 

399 def _fix(self): 

400 op = [] 

401 for k in range(self.size): 

402 o = random.choice(self._opts) 

403 if isinstance(o, str): 

404 op.append((o, self.rndstr * 1)) 

405 else: 

406 r = o.randval()._fix() 

407 if isinstance(r, bytes): 

408 r = r[:255] 

409 op.append((o.name, r)) 

410 return op 

411 

412 def __iter__(self): 

413 return iter(self._fix()) 

414 

415 

416class DHCPOptionsField(StrField): 

417 """ 

418 A field that builds and dissects DHCP options. 

419 The internal value is a list of tuples with the format 

420 [("option_name", <option_value>), ...] 

421 Where expected names and values can be found using `DHCPOptions` 

422 """ 

423 islist = 1 

424 

425 def i2repr(self, pkt, x): 

426 s = [] 

427 for v in x: 

428 if isinstance(v, tuple) and len(v) >= 2: 

429 if v[0] in DHCPRevOptions and isinstance(DHCPRevOptions[v[0]][1], Field): # noqa: E501 

430 f = DHCPRevOptions[v[0]][1] 

431 vv = ",".join(f.i2repr(pkt, val) for val in v[1:]) 

432 else: 

433 vv = ",".join(repr(val) for val in v[1:]) 

434 s.append("%s=%s" % (v[0], vv)) 

435 else: 

436 s.append(sane(v)) 

437 return "[%s]" % (" ".join(s)) 

438 

439 def getfield(self, pkt, s): 

440 return b"", self.m2i(pkt, s) 

441 

442 def m2i(self, pkt, x): 

443 opt = [] 

444 while x: 

445 o = orb(x[0]) 

446 if o == 255: 

447 opt.append("end") 

448 x = x[1:] 

449 continue 

450 if o == 0: 

451 opt.append("pad") 

452 x = x[1:] 

453 continue 

454 if len(x) < 2 or len(x) < orb(x[1]) + 2: 

455 opt.append(x) 

456 break 

457 elif o in DHCPOptions: 

458 f = DHCPOptions[o] 

459 

460 if isinstance(f, str): 

461 olen = orb(x[1]) 

462 opt.append((f, x[2:olen + 2])) 

463 x = x[olen + 2:] 

464 else: 

465 olen = orb(x[1]) 

466 lval = [f.name] 

467 

468 if olen == 0: 

469 try: 

470 _, val = f.getfield(pkt, b'') 

471 except Exception: 

472 opt.append(x) 

473 break 

474 else: 

475 lval.append(val) 

476 

477 try: 

478 left = x[2:olen + 2] 

479 while left: 

480 left, val = f.getfield(pkt, left) 

481 lval.append(val) 

482 except Exception: 

483 opt.append(x) 

484 break 

485 else: 

486 otuple = tuple(lval) 

487 opt.append(otuple) 

488 x = x[olen + 2:] 

489 else: 

490 olen = orb(x[1]) 

491 opt.append((o, x[2:olen + 2])) 

492 x = x[olen + 2:] 

493 return opt 

494 

495 def i2m(self, pkt, x): 

496 if isinstance(x, str): 

497 return x 

498 s = b"" 

499 for o in x: 

500 if isinstance(o, tuple) and len(o) >= 2: 

501 name = o[0] 

502 lval = o[1:] 

503 

504 if isinstance(name, int): 

505 onum, oval = name, b"".join(lval) 

506 elif name in DHCPRevOptions: 

507 onum, f = DHCPRevOptions[name] 

508 if f is not None: 

509 lval = (f.addfield(pkt, b"", f.any2i(pkt, val)) for val in lval) # noqa: E501 

510 else: 

511 lval = (bytes_encode(x) for x in lval) 

512 oval = b"".join(lval) 

513 else: 

514 warning("Unknown field option %s", name) 

515 continue 

516 

517 s += struct.pack("!BB", onum, len(oval)) 

518 s += oval 

519 

520 elif (isinstance(o, str) and o in DHCPRevOptions and 

521 DHCPRevOptions[o][1] is None): 

522 s += chb(DHCPRevOptions[o][0]) 

523 elif isinstance(o, int): 

524 s += chb(o) + b"\0" 

525 elif isinstance(o, (str, bytes)): 

526 s += bytes_encode(o) 

527 else: 

528 warning("Malformed option %s", o) 

529 return s 

530 

531 def randval(self): 

532 return RandDHCPOptions() 

533 

534 

535class DHCP(Packet): 

536 name = "DHCP options" 

537 fields_desc = [DHCPOptionsField("options", b"")] 

538 

539 def mysummary(self): 

540 for id in self.options: 

541 if isinstance(id, tuple) and id[0] == "message-type": 

542 return "DHCP %s" % DHCPTypes.get(id[1], "").capitalize() 

543 return super(DHCP, self).mysummary() 

544 

545 

546bind_layers(UDP, BOOTP, dport=67, sport=68) 

547bind_layers(UDP, BOOTP, dport=68, sport=67) 

548bind_bottom_up(UDP, BOOTP, dport=67, sport=67) 

549bind_layers(BOOTP, DHCP, options=b'c\x82Sc') 

550 

551 

552@conf.commands.register 

553def dhcp_request(hw=None, 

554 req_type='discover', 

555 server_id=None, 

556 requested_addr=None, 

557 hostname=None, 

558 iface=None, 

559 **kargs): 

560 """ 

561 Send a DHCP discover request and return the answer. 

562 

563 Usage:: 

564 

565 >>> dhcp_request() # send DHCP discover 

566 >>> dhcp_request(req_type='request', 

567 ... requested_addr='10.53.4.34') # send DHCP request 

568 """ 

569 if conf.checkIPaddr: 

570 warning( 

571 "conf.checkIPaddr is enabled, may not be able to match the answer" 

572 ) 

573 if hw is None: 

574 if iface is None: 

575 iface = conf.iface 

576 hw = get_if_hwaddr(iface) 

577 dhcp_options = [ 

578 ('message-type', req_type), 

579 ('client_id', b'\x01' + mac2str(hw)), 

580 ] 

581 if requested_addr is not None: 

582 dhcp_options.append(('requested_addr', requested_addr)) 

583 elif req_type == 'request': 

584 warning("DHCP Request without requested_addr will likely be ignored") 

585 if server_id is not None: 

586 dhcp_options.append(('server_id', server_id)) 

587 if hostname is not None: 

588 dhcp_options.extend([ 

589 ('hostname', hostname), 

590 ('client_FQDN', b'\x00\x00\x00' + bytes_encode(hostname)), 

591 ]) 

592 dhcp_options.extend([ 

593 ('vendor_class_id', b'MSFT 5.0'), 

594 ('param_req_list', [ 

595 1, 3, 6, 15, 31, 33, 43, 44, 46, 47, 119, 121, 249, 252 

596 ]), 

597 'end' 

598 ]) 

599 return srp1( 

600 Ether(dst="ff:ff:ff:ff:ff:ff", src=hw) / 

601 IP(src="0.0.0.0", dst="255.255.255.255") / 

602 UDP(sport=68, dport=67) / 

603 BOOTP(chaddr=hw, xid=RandInt(), flags="B") / 

604 DHCP(options=dhcp_options), 

605 iface=iface, **kargs 

606 ) 

607 

608 

609class BOOTP_am(AnsweringMachine): 

610 function_name = "bootpd" 

611 filter = "udp and port 68 and port 67" 

612 

613 def parse_options(self, 

614 pool: Union[Net, List[str]] = Net("192.168.1.128/25"), 

615 network: str = "192.168.1.0/24", 

616 gw: str = "192.168.1.1", 

617 nameserver: Union[str, List[str]] = None, 

618 domain: Optional[str] = None, 

619 renewal_time: int = 60, 

620 lease_time: int = 1800, 

621 **kwargs): 

622 """ 

623 :param pool: the range of addresses to distribute. Can be a Net, 

624 a list of IPs or a string (always gives the same IP). 

625 :param network: the subnet range 

626 :param gw: the gateway IP (can be None) 

627 :param nameserver: the DNS server IP (by default, same than gw). 

628 This can also be a list. 

629 :param domain: the domain to advertise (can be None) 

630 

631 Other DHCP parameters can be passed as kwargs. See DHCPOptions in dhcp.py. 

632 For instance:: 

633 

634 dhcpd(pool=Net("10.0.10.0/24"), network="10.0.0.0/8", gw="10.0.10.1", 

635 classless_static_routes=["1.2.3.4/32:9.8.7.6"]) 

636 

637 Other example with different options:: 

638 

639 dhcpd(pool=Net("10.0.10.0/24"), network="10.0.0.0/8", gw="10.0.10.1", 

640 nameserver=["8.8.8.8", "4.4.4.4"], domain="DOMAIN.LOCAL") 

641 """ 

642 self.domain = domain 

643 netw, msk = (network.split("/") + ["32"])[:2] 

644 msk = itom(int(msk)) 

645 self.netmask = ltoa(msk) 

646 self.network = ltoa(atol(netw) & msk) 

647 self.broadcast = ltoa(atol(self.network) | (0xffffffff & ~msk)) 

648 self.gw = gw 

649 if nameserver is None: 

650 self.nameserver = (gw,) 

651 elif isinstance(nameserver, str): 

652 self.nameserver = (nameserver,) 

653 else: 

654 self.nameserver = tuple(nameserver) 

655 

656 if isinstance(pool, str): 

657 pool = Net(pool) 

658 if isinstance(pool, Iterable): 

659 pool = [k for k in pool if k not in [gw, self.network, self.broadcast]] 

660 pool.reverse() 

661 if len(pool) == 1: 

662 pool, = pool 

663 self.pool = pool 

664 self.lease_time = lease_time 

665 self.renewal_time = renewal_time 

666 self.leases = {} 

667 self.kwargs = kwargs 

668 

669 def is_request(self, req): 

670 if not req.haslayer(BOOTP): 

671 return 0 

672 reqb = req.getlayer(BOOTP) 

673 if reqb.op != 1: 

674 return 0 

675 return 1 

676 

677 def print_reply(self, _, reply): 

678 print("Reply %s to %s" % (reply.getlayer(IP).dst, reply.dst)) 

679 

680 def make_reply(self, req): 

681 mac = req[Ether].src 

682 if isinstance(self.pool, list): 

683 if mac not in self.leases: 

684 self.leases[mac] = self.pool.pop() 

685 ip = self.leases[mac] 

686 else: 

687 ip = self.pool 

688 

689 repb = req.getlayer(BOOTP).copy() 

690 repb.op = "BOOTREPLY" 

691 repb.yiaddr = ip 

692 repb.siaddr = self.gw 

693 repb.ciaddr = self.gw 

694 repb.giaddr = self.gw 

695 del repb.payload 

696 rep = Ether(dst=mac) / IP(dst=ip) / UDP(sport=req.dport, dport=req.sport) / repb # noqa: E501 

697 return rep 

698 

699 

700class DHCP_am(BOOTP_am): 

701 function_name = "dhcpd" 

702 

703 def make_reply(self, req): 

704 resp = BOOTP_am.make_reply(self, req) 

705 if DHCP in req: 

706 dhcp_options = [ 

707 (op[0], {1: 2, 3: 5}.get(op[1], op[1])) 

708 for op in req[DHCP].options 

709 if isinstance(op, tuple) and op[0] == "message-type" 

710 ] 

711 dhcp_options += [ 

712 x for x in [ 

713 ("server_id", self.gw), 

714 ("domain", self.domain), 

715 ("router", self.gw), 

716 ("name_server", *self.nameserver), 

717 ("broadcast_address", self.broadcast), 

718 ("subnet_mask", self.netmask), 

719 ("renewal_time", self.renewal_time), 

720 ("lease_time", self.lease_time), 

721 ] 

722 if x[1] is not None 

723 ] 

724 if self.kwargs: 

725 dhcp_options += self.kwargs.items() 

726 dhcp_options.append("end") 

727 resp /= DHCP(options=dhcp_options) 

728 return resp