Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/layers/dhcp.py: 48%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

316 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7DHCP (Dynamic Host Configuration Protocol) and BOOTP 

8 

9Implements: 

10- rfc951 - BOOTSTRAP PROTOCOL (BOOTP) 

11- rfc1542 - Clarifications and Extensions for the Bootstrap Protocol 

12- rfc1533 - DHCP Options and BOOTP Vendor Extensions 

13""" 

14 

15try: 

16 from collections.abc import Iterable 

17except ImportError: 

18 # For backwards compatibility. This was removed in Python 3.8 

19 from collections import Iterable 

20import random 

21import struct 

22 

23import socket 

24import re 

25 

26from scapy.ansmachine import AnsweringMachine 

27from scapy.base_classes import Net 

28from scapy.compat import chb, orb, bytes_encode 

29from scapy.fields import ( 

30 ByteEnumField, 

31 ByteField, 

32 Field, 

33 FieldListField, 

34 FlagsField, 

35 IntField, 

36 IPField, 

37 MACField, 

38 ShortField, 

39 StrEnumField, 

40 StrField, 

41 StrFixedLenField, 

42 XIntField, 

43) 

44from scapy.layers.inet import UDP, IP 

45from scapy.layers.l2 import Ether, HARDWARE_TYPES 

46from scapy.packet import bind_layers, bind_bottom_up, Packet 

47from scapy.utils import atol, itom, ltoa, sane, str2mac, mac2str 

48from scapy.volatile import ( 

49 RandBin, 

50 RandByte, 

51 RandField, 

52 RandIP, 

53 RandInt, 

54 RandNum, 

55 RandNumExpo, 

56 VolatileValue, 

57) 

58 

59from scapy.arch import get_if_hwaddr 

60from scapy.sendrecv import srp1 

61from scapy.error import warning 

62from scapy.config import conf 

63 

64dhcpmagic = b"c\x82Sc" 

65 

66 

67class _BOOTP_chaddr(StrFixedLenField): 

68 def i2m(self, pkt, x): 

69 if isinstance(x, VolatileValue): 

70 x = x._fix() 

71 return MACField.i2m(self, pkt, x) 

72 

73 def i2repr(self, pkt, v): 

74 if isinstance(v, VolatileValue): 

75 return repr(v) 

76 if pkt.htype == 1: # Ethernet 

77 if v[6:] == b"\x00" * 10: # Default padding 

78 return "%s (+ 10 nul pad)" % str2mac(v[:6]) 

79 else: 

80 return "%s (pad: %s)" % (str2mac(v[:6]), v[6:]) 

81 return super(_BOOTP_chaddr, self).i2repr(pkt, v) 

82 

83 

84class BOOTP(Packet): 

85 name = "BOOTP" 

86 fields_desc = [ 

87 ByteEnumField("op", 1, {1: "BOOTREQUEST", 2: "BOOTREPLY"}), 

88 ByteEnumField("htype", 1, HARDWARE_TYPES), 

89 ByteField("hlen", 6), 

90 ByteField("hops", 0), 

91 XIntField("xid", 0), 

92 ShortField("secs", 0), 

93 FlagsField("flags", 0, 16, "???????????????B"), 

94 IPField("ciaddr", "0.0.0.0"), 

95 IPField("yiaddr", "0.0.0.0"), 

96 IPField("siaddr", "0.0.0.0"), 

97 IPField("giaddr", "0.0.0.0"), 

98 _BOOTP_chaddr("chaddr", b"", length=16), 

99 StrFixedLenField("sname", b"", length=64), 

100 StrFixedLenField("file", b"", length=128), 

101 StrEnumField("options", b"", {dhcpmagic: "DHCP magic"})] 

102 

103 def guess_payload_class(self, payload): 

104 if self.options[:len(dhcpmagic)] == dhcpmagic: 

105 return DHCP 

106 else: 

107 return Packet.guess_payload_class(self, payload) 

108 

109 def extract_padding(self, s): 

110 if self.options[:len(dhcpmagic)] == dhcpmagic: 

111 # set BOOTP options to DHCP magic cookie and make rest a payload of DHCP options # noqa: E501 

112 payload = self.options[len(dhcpmagic):] 

113 self.options = self.options[:len(dhcpmagic)] 

114 return payload, None 

115 else: 

116 return b"", None 

117 

118 def hashret(self): 

119 return struct.pack("!I", self.xid) 

120 

121 def answers(self, other): 

122 if not isinstance(other, BOOTP): 

123 return 0 

124 return self.xid == other.xid 

125 

126 

127class _DHCPByteFieldListField(FieldListField): 

128 def randval(self): 

129 class _RandByteFieldList(RandField): 

130 def _fix(self): 

131 return [RandByte()] * int(RandByte()) 

132 return _RandByteFieldList() 

133 

134 

135class RandClasslessStaticRoutesField(RandField): 

136 """ 

137 A RandValue for classless static routes 

138 """ 

139 

140 def _fix(self): 

141 return "%s/%d:%s" % (RandIP(), RandNum(0, 32), RandIP()) 

142 

143 

144class ClasslessFieldListField(FieldListField): 

145 def randval(self): 

146 class _RandClasslessField(RandField): 

147 def _fix(self): 

148 return [RandClasslessStaticRoutesField()] * int(RandNum(1, 28)) 

149 return _RandClasslessField() 

150 

151 

152class ClasslessStaticRoutesField(Field): 

153 """ 

154 RFC 3442 defines classless static routes as up to 9 bytes per entry: 

155 

156 # Code Len Destination 1 Router 1 

157 +-----+---+----+-----+----+----+----+----+----+ 

158 | 121 | n | d1 | ... | dN | r1 | r2 | r3 | r4 | 

159 +-----+---+----+-----+----+----+----+----+----+ 

160 

161 Destination first byte contains one octet describing the width followed 

162 by all the significant octets of the subnet. 

163 """ 

164 

165 def m2i(self, pkt, x): 

166 # type: (Packet, bytes) -> str 

167 # b'\x20\x01\x02\x03\x04\t\x08\x07\x06' -> (1.2.3.4/32:9.8.7.6) 

168 prefix = orb(x[0]) 

169 

170 octets = (prefix + 7) // 8 

171 # Create the destination IP by using the number of octets 

172 # and padding up to 4 bytes to ensure a valid IP. 

173 dest = x[1:1 + octets] 

174 dest = socket.inet_ntoa(dest.ljust(4, b'\x00')) 

175 

176 router = x[1 + octets:5 + octets] 

177 router = socket.inet_ntoa(router) 

178 

179 return dest + "/" + str(prefix) + ":" + router 

180 

181 def i2m(self, pkt, x): 

182 # type: (Packet, str) -> bytes 

183 # (1.2.3.4/32:9.8.7.6) -> b'\x20\x01\x02\x03\x04\t\x08\x07\x06' 

184 if not x: 

185 return b'' 

186 

187 spx = re.split('/|:', str(x)) 

188 prefix = int(spx[1]) 

189 # if prefix is invalid value ( 0 > prefix > 32 ) then break 

190 if prefix > 32 or prefix < 0: 

191 warning("Invalid prefix value: %d (0x%x)", prefix, prefix) 

192 return b'' 

193 octets = (prefix + 7) // 8 

194 dest = socket.inet_aton(spx[0])[:octets] 

195 router = socket.inet_aton(spx[2]) 

196 return struct.pack('b', prefix) + dest + router 

197 

198 def getfield(self, pkt, s): 

199 prefix = orb(s[0]) 

200 route_len = 5 + (prefix + 7) // 8 

201 return s[route_len:], self.m2i(pkt, s[:route_len]) 

202 

203 def addfield(self, pkt, s, val): 

204 return s + self.i2m(pkt, val) 

205 

206 def randval(self): 

207 return RandClasslessStaticRoutesField() 

208 

209 

210# DHCP_UNKNOWN, DHCP_IP, DHCP_IPLIST, DHCP_TYPE \ 

211# = range(4) 

212# 

213 

214# DHCP Options and BOOTP Vendor Extensions 

215 

216 

217DHCPTypes = { 

218 1: "discover", 

219 2: "offer", 

220 3: "request", 

221 4: "decline", 

222 5: "ack", 

223 6: "nak", 

224 7: "release", 

225 8: "inform", 

226 9: "force_renew", 

227 10: "lease_query", 

228 11: "lease_unassigned", 

229 12: "lease_unknown", 

230 13: "lease_active", 

231} 

232 

233DHCPOptions = { 

234 0: "pad", 

235 1: IPField("subnet_mask", "0.0.0.0"), 

236 2: IntField("time_zone", 500), 

237 3: IPField("router", "0.0.0.0"), 

238 4: IPField("time_server", "0.0.0.0"), 

239 5: IPField("IEN_name_server", "0.0.0.0"), 

240 6: IPField("name_server", "0.0.0.0"), 

241 7: IPField("log_server", "0.0.0.0"), 

242 8: IPField("cookie_server", "0.0.0.0"), 

243 9: IPField("lpr_server", "0.0.0.0"), 

244 10: IPField("impress-servers", "0.0.0.0"), 

245 11: IPField("resource-location-servers", "0.0.0.0"), 

246 12: "hostname", 

247 13: ShortField("boot-size", 1000), 

248 14: "dump_path", 

249 15: "domain", 

250 16: IPField("swap-server", "0.0.0.0"), 

251 17: "root_disk_path", 

252 18: "extensions-path", 

253 19: ByteField("ip-forwarding", 0), 

254 20: ByteField("non-local-source-routing", 0), 

255 21: IPField("policy-filter", "0.0.0.0"), 

256 22: ShortField("max_dgram_reass_size", 300), 

257 23: ByteField("default_ttl", 50), 

258 24: IntField("pmtu_timeout", 1000), 

259 25: ShortField("path-mtu-plateau-table", 1000), 

260 26: ShortField("interface-mtu", 50), 

261 27: ByteField("all-subnets-local", 0), 

262 28: IPField("broadcast_address", "0.0.0.0"), 

263 29: ByteField("perform-mask-discovery", 0), 

264 30: ByteField("mask-supplier", 0), 

265 31: ByteField("router-discovery", 0), 

266 32: IPField("router-solicitation-address", "0.0.0.0"), 

267 33: IPField("static-routes", "0.0.0.0"), 

268 34: ByteField("trailer-encapsulation", 0), 

269 35: IntField("arp_cache_timeout", 1000), 

270 36: ByteField("ieee802-3-encapsulation", 0), 

271 37: ByteField("tcp_ttl", 100), 

272 38: IntField("tcp_keepalive_interval", 1000), 

273 39: ByteField("tcp_keepalive_garbage", 0), 

274 40: StrField("NIS_domain", "www.example.com"), 

275 41: IPField("NIS_server", "0.0.0.0"), 

276 42: IPField("NTP_server", "0.0.0.0"), 

277 43: "vendor_specific", 

278 44: IPField("NetBIOS_server", "0.0.0.0"), 

279 45: IPField("NetBIOS_dist_server", "0.0.0.0"), 

280 46: ByteField("NetBIOS_node_type", 100), 

281 47: "netbios-scope", 

282 48: IPField("font-servers", "0.0.0.0"), 

283 49: IPField("x-display-manager", "0.0.0.0"), 

284 50: IPField("requested_addr", "0.0.0.0"), 

285 51: IntField("lease_time", 43200), 

286 52: ByteField("dhcp-option-overload", 100), 

287 53: ByteEnumField("message-type", 1, DHCPTypes), 

288 54: IPField("server_id", "0.0.0.0"), 

289 55: _DHCPByteFieldListField( 

290 "param_req_list", [], 

291 ByteField("opcode", 0)), 

292 56: "error_message", 

293 57: ShortField("max_dhcp_size", 1500), 

294 58: IntField("renewal_time", 21600), 

295 59: IntField("rebinding_time", 37800), 

296 60: StrField("vendor_class_id", "id"), 

297 61: StrField("client_id", ""), 

298 62: "nwip-domain-name", 

299 64: "NISplus_domain", 

300 65: IPField("NISplus_server", "0.0.0.0"), 

301 66: "tftp_server_name", 

302 67: StrField("boot-file-name", ""), 

303 68: IPField("mobile-ip-home-agent", "0.0.0.0"), 

304 69: IPField("SMTP_server", "0.0.0.0"), 

305 70: IPField("POP3_server", "0.0.0.0"), 

306 71: IPField("NNTP_server", "0.0.0.0"), 

307 72: IPField("WWW_server", "0.0.0.0"), 

308 73: IPField("Finger_server", "0.0.0.0"), 

309 74: IPField("IRC_server", "0.0.0.0"), 

310 75: IPField("StreetTalk_server", "0.0.0.0"), 

311 76: IPField("StreetTalk_Dir_Assistance", "0.0.0.0"), 

312 77: "user_class", 

313 78: "slp_service_agent", 

314 79: "slp_service_scope", 

315 80: "rapid_commit", 

316 81: "client_FQDN", 

317 82: "relay_agent_information", 

318 85: IPField("nds-server", "0.0.0.0"), 

319 86: StrField("nds-tree-name", ""), 

320 87: StrField("nds-context", ""), 

321 88: "bcms-controller-namesi", 

322 89: IPField("bcms-controller-address", "0.0.0.0"), 

323 91: IntField("client-last-transaction-time", 1000), 

324 92: IPField("associated-ip", "0.0.0.0"), 

325 93: "pxe_client_architecture", 

326 94: "pxe_client_network_interface", 

327 97: "pxe_client_machine_identifier", 

328 98: StrField("uap-servers", ""), 

329 100: StrField("pcode", ""), 

330 101: StrField("tcode", ""), 

331 108: IntField("ipv6-only-preferred", 0), 

332 112: IPField("netinfo-server-address", "0.0.0.0"), 

333 113: StrField("netinfo-server-tag", ""), 

334 114: StrField("captive-portal", ""), 

335 116: ByteField("auto-config", 0), 

336 117: ShortField("name-service-search", 0,), 

337 118: IPField("subnet-selection", "0.0.0.0"), 

338 121: ClasslessFieldListField( 

339 "classless_static_routes", 

340 [], 

341 ClasslessStaticRoutesField("route", 0)), 

342 124: "vendor_class", 

343 125: "vendor_specific_information", 

344 128: IPField("tftp_server_ip_address", "0.0.0.0"), 

345 136: IPField("pana-agent", "0.0.0.0"), 

346 137: "v4-lost", 

347 138: IPField("capwap-ac-v4", "0.0.0.0"), 

348 141: "sip_ua_service_domains", 

349 145: _DHCPByteFieldListField( 

350 "forcerenew_nonce_capable", [], 

351 ByteEnumField("algorithm", 1, {1: "HMAC-MD5"})), 

352 146: "rdnss-selection", 

353 150: IPField("tftp_server_address", "0.0.0.0"), 

354 159: "v4-portparams", 

355 160: StrField("v4-captive-portal", ""), 

356 161: StrField("mud-url", ""), 

357 208: "pxelinux_magic", 

358 209: "pxelinux_configuration_file", 

359 210: "pxelinux_path_prefix", 

360 211: "pxelinux_reboot_time", 

361 212: "option-6rd", 

362 213: "v4-access-domain", 

363 255: "end" 

364} 

365 

366DHCPRevOptions = {} 

367 

368for k, v in DHCPOptions.items(): 

369 if isinstance(v, str): 

370 n = v 

371 v = None 

372 else: 

373 n = v.name 

374 DHCPRevOptions[n] = (k, v) 

375del n 

376del v 

377del k 

378 

379 

380class RandDHCPOptions(RandField): 

381 def __init__(self, size=None, rndstr=None): 

382 if size is None: 

383 size = RandNumExpo(0.05) 

384 self.size = size 

385 if rndstr is None: 

386 rndstr = RandBin(RandNum(0, 255)) 

387 self.rndstr = rndstr 

388 self._opts = list(DHCPOptions.values()) 

389 self._opts.remove("pad") 

390 self._opts.remove("end") 

391 

392 def _fix(self): 

393 op = [] 

394 for k in range(self.size): 

395 o = random.choice(self._opts) 

396 if isinstance(o, str): 

397 op.append((o, self.rndstr * 1)) 

398 else: 

399 r = o.randval()._fix() 

400 if isinstance(r, bytes): 

401 r = r[:255] 

402 op.append((o.name, r)) 

403 return op 

404 

405 

406class DHCPOptionsField(StrField): 

407 """ 

408 A field that builds and dissects DHCP options. 

409 The internal value is a list of tuples with the format 

410 [("option_name", <option_value>), ...] 

411 Where expected names and values can be found using `DHCPOptions` 

412 """ 

413 islist = 1 

414 

415 def i2repr(self, pkt, x): 

416 s = [] 

417 for v in x: 

418 if isinstance(v, tuple) and len(v) >= 2: 

419 if v[0] in DHCPRevOptions and isinstance(DHCPRevOptions[v[0]][1], Field): # noqa: E501 

420 f = DHCPRevOptions[v[0]][1] 

421 vv = ",".join(f.i2repr(pkt, val) for val in v[1:]) 

422 else: 

423 vv = ",".join(repr(val) for val in v[1:]) 

424 s.append("%s=%s" % (v[0], vv)) 

425 else: 

426 s.append(sane(v)) 

427 return "[%s]" % (" ".join(s)) 

428 

429 def getfield(self, pkt, s): 

430 return b"", self.m2i(pkt, s) 

431 

432 def m2i(self, pkt, x): 

433 opt = [] 

434 while x: 

435 o = orb(x[0]) 

436 if o == 255: 

437 opt.append("end") 

438 x = x[1:] 

439 continue 

440 if o == 0: 

441 opt.append("pad") 

442 x = x[1:] 

443 continue 

444 if len(x) < 2 or len(x) < orb(x[1]) + 2: 

445 opt.append(x) 

446 break 

447 elif o in DHCPOptions: 

448 f = DHCPOptions[o] 

449 

450 if isinstance(f, str): 

451 olen = orb(x[1]) 

452 opt.append((f, x[2:olen + 2])) 

453 x = x[olen + 2:] 

454 else: 

455 olen = orb(x[1]) 

456 lval = [f.name] 

457 

458 if olen == 0: 

459 try: 

460 _, val = f.getfield(pkt, b'') 

461 except Exception: 

462 opt.append(x) 

463 break 

464 else: 

465 lval.append(val) 

466 

467 try: 

468 left = x[2:olen + 2] 

469 while left: 

470 left, val = f.getfield(pkt, left) 

471 lval.append(val) 

472 except Exception: 

473 opt.append(x) 

474 break 

475 else: 

476 otuple = tuple(lval) 

477 opt.append(otuple) 

478 x = x[olen + 2:] 

479 else: 

480 olen = orb(x[1]) 

481 opt.append((o, x[2:olen + 2])) 

482 x = x[olen + 2:] 

483 return opt 

484 

485 def i2m(self, pkt, x): 

486 if isinstance(x, str): 

487 return x 

488 s = b"" 

489 for o in x: 

490 if isinstance(o, tuple) and len(o) >= 2: 

491 name = o[0] 

492 lval = o[1:] 

493 

494 if isinstance(name, int): 

495 onum, oval = name, b"".join(lval) 

496 elif name in DHCPRevOptions: 

497 onum, f = DHCPRevOptions[name] 

498 if f is not None: 

499 lval = (f.addfield(pkt, b"", f.any2i(pkt, val)) for val in lval) # noqa: E501 

500 else: 

501 lval = (bytes_encode(x) for x in lval) 

502 oval = b"".join(lval) 

503 else: 

504 warning("Unknown field option %s", name) 

505 continue 

506 

507 s += struct.pack("!BB", onum, len(oval)) 

508 s += oval 

509 

510 elif (isinstance(o, str) and o in DHCPRevOptions and 

511 DHCPRevOptions[o][1] is None): 

512 s += chb(DHCPRevOptions[o][0]) 

513 elif isinstance(o, int): 

514 s += chb(o) + b"\0" 

515 elif isinstance(o, (str, bytes)): 

516 s += bytes_encode(o) 

517 else: 

518 warning("Malformed option %s", o) 

519 return s 

520 

521 def randval(self): 

522 return RandDHCPOptions() 

523 

524 

525class DHCP(Packet): 

526 name = "DHCP options" 

527 fields_desc = [DHCPOptionsField("options", b"")] 

528 

529 def mysummary(self): 

530 for id in self.options: 

531 if isinstance(id, tuple) and id[0] == "message-type": 

532 return "DHCP %s" % DHCPTypes.get(id[1], "").capitalize() 

533 return super(DHCP, self).mysummary() 

534 

535 

536bind_layers(UDP, BOOTP, dport=67, sport=68) 

537bind_layers(UDP, BOOTP, dport=68, sport=67) 

538bind_bottom_up(UDP, BOOTP, dport=67, sport=67) 

539bind_layers(BOOTP, DHCP, options=b'c\x82Sc') 

540 

541 

542@conf.commands.register 

543def dhcp_request(hw=None, 

544 req_type='discover', 

545 server_id=None, 

546 requested_addr=None, 

547 hostname=None, 

548 iface=None, 

549 **kargs): 

550 """ 

551 Send a DHCP discover request and return the answer. 

552 

553 Usage:: 

554 

555 >>> dhcp_request() # send DHCP discover 

556 >>> dhcp_request(req_type='request', 

557 ... requested_addr='10.53.4.34') # send DHCP request 

558 """ 

559 if conf.checkIPaddr: 

560 warning( 

561 "conf.checkIPaddr is enabled, may not be able to match the answer" 

562 ) 

563 if hw is None: 

564 if iface is None: 

565 iface = conf.iface 

566 hw = get_if_hwaddr(iface) 

567 dhcp_options = [ 

568 ('message-type', req_type), 

569 ('client_id', b'\x01' + mac2str(hw)), 

570 ] 

571 if requested_addr is not None: 

572 dhcp_options.append(('requested_addr', requested_addr)) 

573 elif req_type == 'request': 

574 warning("DHCP Request without requested_addr will likely be ignored") 

575 if server_id is not None: 

576 dhcp_options.append(('server_id', server_id)) 

577 if hostname is not None: 

578 dhcp_options.extend([ 

579 ('hostname', hostname), 

580 ('client_FQDN', b'\x00\x00\x00' + bytes_encode(hostname)), 

581 ]) 

582 dhcp_options.extend([ 

583 ('vendor_class_id', b'MSFT 5.0'), 

584 ('param_req_list', [ 

585 1, 3, 6, 15, 31, 33, 43, 44, 46, 47, 119, 121, 249, 252 

586 ]), 

587 'end' 

588 ]) 

589 return srp1( 

590 Ether(dst="ff:ff:ff:ff:ff:ff", src=hw) / 

591 IP(src="0.0.0.0", dst="255.255.255.255") / 

592 UDP(sport=68, dport=67) / 

593 BOOTP(chaddr=hw, xid=RandInt(), flags="B") / 

594 DHCP(options=dhcp_options), 

595 iface=iface, **kargs 

596 ) 

597 

598 

599class BOOTP_am(AnsweringMachine): 

600 function_name = "bootpd" 

601 filter = "udp and port 68 and port 67" 

602 

603 def parse_options(self, 

604 pool=Net("192.168.1.128/25"), 

605 network="192.168.1.0/24", 

606 gw="192.168.1.1", 

607 nameserver=None, 

608 domain=None, 

609 renewal_time=60, 

610 lease_time=1800, 

611 **kwargs): 

612 """ 

613 :param pool: the range of addresses to distribute. Can be a Net, 

614 a list of IPs or a string (always gives the same IP). 

615 :param network: the subnet range 

616 :param gw: the gateway IP (can be None) 

617 :param nameserver: the DNS server IP (by default, same than gw) 

618 :param domain: the domain to advertise (can be None) 

619 

620 Other DHCP parameters can be passed as kwargs. See DHCPOptions in dhcp.py. 

621 For instance:: 

622 

623 dhcpd(pool=Net("10.0.10.0/24"), network="10.0.0.0/8", gw="10.0.10.1", 

624 classless_static_routes=["1.2.3.4/32:9.8.7.6"]) 

625 """ 

626 self.domain = domain 

627 netw, msk = (network.split("/") + ["32"])[:2] 

628 msk = itom(int(msk)) 

629 self.netmask = ltoa(msk) 

630 self.network = ltoa(atol(netw) & msk) 

631 self.broadcast = ltoa(atol(self.network) | (0xffffffff & ~msk)) 

632 self.gw = gw 

633 self.nameserver = nameserver or gw 

634 if isinstance(pool, str): 

635 pool = Net(pool) 

636 if isinstance(pool, Iterable): 

637 pool = [k for k in pool if k not in [gw, self.network, self.broadcast]] 

638 pool.reverse() 

639 if len(pool) == 1: 

640 pool, = pool 

641 self.pool = pool 

642 self.lease_time = lease_time 

643 self.renewal_time = renewal_time 

644 self.leases = {} 

645 self.kwargs = kwargs 

646 

647 def is_request(self, req): 

648 if not req.haslayer(BOOTP): 

649 return 0 

650 reqb = req.getlayer(BOOTP) 

651 if reqb.op != 1: 

652 return 0 

653 return 1 

654 

655 def print_reply(self, _, reply): 

656 print("Reply %s to %s" % (reply.getlayer(IP).dst, reply.dst)) 

657 

658 def make_reply(self, req): 

659 mac = req[Ether].src 

660 if isinstance(self.pool, list): 

661 if mac not in self.leases: 

662 self.leases[mac] = self.pool.pop() 

663 ip = self.leases[mac] 

664 else: 

665 ip = self.pool 

666 

667 repb = req.getlayer(BOOTP).copy() 

668 repb.op = "BOOTREPLY" 

669 repb.yiaddr = ip 

670 repb.siaddr = self.gw 

671 repb.ciaddr = self.gw 

672 repb.giaddr = self.gw 

673 del repb.payload 

674 rep = Ether(dst=mac) / IP(dst=ip) / UDP(sport=req.dport, dport=req.sport) / repb # noqa: E501 

675 return rep 

676 

677 

678class DHCP_am(BOOTP_am): 

679 function_name = "dhcpd" 

680 

681 def make_reply(self, req): 

682 resp = BOOTP_am.make_reply(self, req) 

683 if DHCP in req: 

684 dhcp_options = [ 

685 (op[0], {1: 2, 3: 5}.get(op[1], op[1])) 

686 for op in req[DHCP].options 

687 if isinstance(op, tuple) and op[0] == "message-type" 

688 ] 

689 dhcp_options += [ 

690 x for x in [ 

691 ("server_id", self.gw), 

692 ("domain", self.domain), 

693 ("router", self.gw), 

694 ("name_server", self.nameserver), 

695 ("broadcast_address", self.broadcast), 

696 ("subnet_mask", self.netmask), 

697 ("renewal_time", self.renewal_time), 

698 ("lease_time", self.lease_time), 

699 ] 

700 if x[1] is not None 

701 ] 

702 if self.kwargs: 

703 dhcp_options += self.kwargs.items() 

704 dhcp_options.append("end") 

705 resp /= DHCP(options=dhcp_options) 

706 return resp