Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/config.py: 54%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

649 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Implementation of the configuration object. 

8""" 

9 

10 

11import atexit 

12import copy 

13import functools 

14import os 

15import re 

16import socket 

17import sys 

18import time 

19import warnings 

20 

21from dataclasses import dataclass 

22from enum import Enum 

23 

24import importlib 

25import importlib.abc 

26import importlib.util 

27 

28import scapy 

29from scapy import VERSION 

30from scapy.base_classes import BasePacket 

31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS 

32from scapy.error import ( 

33 log_loading, 

34 log_scapy, 

35 ScapyInvalidPlatformException, 

36 warning, 

37) 

38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style 

39 

40# Typing imports 

41from typing import ( 

42 cast, 

43 Any, 

44 Callable, 

45 Dict, 

46 Iterator, 

47 List, 

48 NoReturn, 

49 Optional, 

50 Set, 

51 Tuple, 

52 Type, 

53 Union, 

54 overload, 

55 TYPE_CHECKING, 

56) 

57from types import ModuleType 

58from scapy.compat import DecoratorCallable 

59 

60if TYPE_CHECKING: 

61 # Do not import at runtime 

62 import scapy.as_resolvers 

63 from scapy.modules.nmap import NmapKnowledgeBase 

64 from scapy.packet import Packet 

65 from scapy.supersocket import SuperSocket # noqa: F401 

66 import scapy.asn1.asn1 

67 import scapy.asn1.mib 

68 

69############ 

70# Config # 

71############ 

72 

73 

74class ConfClass(object): 

75 def configure(self, cnf): 

76 # type: (ConfClass) -> None 

77 self.__dict__ = cnf.__dict__.copy() 

78 

79 def __repr__(self): 

80 # type: () -> str 

81 return str(self) 

82 

83 def __str__(self): 

84 # type: () -> str 

85 s = "" 

86 dkeys = self.__class__.__dict__.copy() 

87 dkeys.update(self.__dict__) 

88 keys = sorted(dkeys) 

89 for i in keys: 

90 if i[0] != "_": 

91 r = repr(getattr(self, i)) 

92 r = " ".join(r.split()) 

93 wlen = 76 - max(len(i), 10) 

94 if len(r) > wlen: 

95 r = r[:wlen - 3] + "..." 

96 s += "%-10s = %s\n" % (i, r) 

97 return s[:-1] 

98 

99 

100class Interceptor(object): 

101 def __init__(self, 

102 name, # type: str 

103 default, # type: Any 

104 hook, # type: Callable[..., Any] 

105 args=None, # type: Optional[List[Any]] 

106 kargs=None # type: Optional[Dict[str, Any]] 

107 ): 

108 # type: (...) -> None 

109 self.name = name 

110 self.intname = "_intercepted_%s" % name 

111 self.default = default 

112 self.hook = hook 

113 self.args = args if args is not None else [] 

114 self.kargs = kargs if kargs is not None else {} 

115 

116 def __get__(self, obj, typ=None): 

117 # type: (Conf, Optional[type]) -> Any 

118 if not hasattr(obj, self.intname): 

119 setattr(obj, self.intname, self.default) 

120 return getattr(obj, self.intname) 

121 

122 @staticmethod 

123 def set_from_hook(obj, name, val): 

124 # type: (Conf, str, bool) -> None 

125 int_name = "_intercepted_%s" % name 

126 setattr(obj, int_name, val) 

127 

128 def __set__(self, obj, val): 

129 # type: (Conf, Any) -> None 

130 old = getattr(obj, self.intname, self.default) 

131 val = self.hook(self.name, val, old, *self.args, **self.kargs) 

132 setattr(obj, self.intname, val) 

133 

134 

135def _readonly(name): 

136 # type: (str) -> NoReturn 

137 default = Conf.__dict__[name].default 

138 Interceptor.set_from_hook(conf, name, default) 

139 raise ValueError("Read-only value !") 

140 

141 

142ReadOnlyAttribute = functools.partial( 

143 Interceptor, 

144 hook=(lambda name, *args, **kwargs: _readonly(name)) 

145) 

146ReadOnlyAttribute.__doc__ = "Read-only class attribute" 

147 

148 

149class ProgPath(ConfClass): 

150 _default: str = "<System default>" 

151 universal_open: str = "open" if DARWIN else "xdg-open" 

152 pdfreader: str = universal_open 

153 psreader: str = universal_open 

154 svgreader: str = universal_open 

155 dot: str = "dot" 

156 display: str = "display" 

157 tcpdump: str = "tcpdump" 

158 tcpreplay: str = "tcpreplay" 

159 hexedit: str = "hexer" 

160 tshark: str = "tshark" 

161 wireshark: str = "wireshark" 

162 ifconfig: str = "ifconfig" 

163 extcap_folders: List[str] = [ 

164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"), 

165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap", 

166 ] 

167 

168 

169class ConfigFieldList: 

170 def __init__(self): 

171 # type: () -> None 

172 self.fields = set() # type: Set[Any] 

173 self.layers = set() # type: Set[Any] 

174 

175 @staticmethod 

176 def _is_field(f): 

177 # type: (Any) -> bool 

178 return hasattr(f, "owners") 

179 

180 def _recalc_layer_list(self): 

181 # type: () -> None 

182 self.layers = {owner for f in self.fields for owner in f.owners} 

183 

184 def add(self, *flds): 

185 # type: (*Any) -> None 

186 self.fields |= {f for f in flds if self._is_field(f)} 

187 self._recalc_layer_list() 

188 

189 def remove(self, *flds): 

190 # type: (*Any) -> None 

191 self.fields -= set(flds) 

192 self._recalc_layer_list() 

193 

194 def __contains__(self, elt): 

195 # type: (Any) -> bool 

196 if isinstance(elt, BasePacket): 

197 return elt in self.layers 

198 return elt in self.fields 

199 

200 def __repr__(self): 

201 # type: () -> str 

202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501 

203 

204 

205class Emphasize(ConfigFieldList): 

206 pass 

207 

208 

209class Resolve(ConfigFieldList): 

210 pass 

211 

212 

213class Num2Layer: 

214 def __init__(self): 

215 # type: () -> None 

216 self.num2layer = {} # type: Dict[int, Type[Packet]] 

217 self.layer2num = {} # type: Dict[Type[Packet], int] 

218 

219 def register(self, num, layer): 

220 # type: (int, Type[Packet]) -> None 

221 self.register_num2layer(num, layer) 

222 self.register_layer2num(num, layer) 

223 

224 def register_num2layer(self, num, layer): 

225 # type: (int, Type[Packet]) -> None 

226 self.num2layer[num] = layer 

227 

228 def register_layer2num(self, num, layer): 

229 # type: (int, Type[Packet]) -> None 

230 self.layer2num[layer] = num 

231 

232 @overload 

233 def __getitem__(self, item): 

234 # type: (Type[Packet]) -> int 

235 pass 

236 

237 @overload 

238 def __getitem__(self, item): # noqa: F811 

239 # type: (int) -> Type[Packet] 

240 pass 

241 

242 def __getitem__(self, item): # noqa: F811 

243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]] 

244 if isinstance(item, int): 

245 return self.num2layer[item] 

246 else: 

247 return self.layer2num[item] 

248 

249 def __contains__(self, item): 

250 # type: (Union[int, Type[Packet]]) -> bool 

251 if isinstance(item, int): 

252 return item in self.num2layer 

253 else: 

254 return item in self.layer2num 

255 

256 def get(self, 

257 item, # type: Union[int, Type[Packet]] 

258 default=None, # type: Optional[Type[Packet]] 

259 ): 

260 # type: (...) -> Optional[Union[int, Type[Packet]]] 

261 return self[item] if item in self else default 

262 

263 def __repr__(self): 

264 # type: () -> str 

265 lst = [] 

266 for num, layer in self.num2layer.items(): 

267 if layer in self.layer2num and self.layer2num[layer] == num: 

268 dir = "<->" 

269 else: 

270 dir = " ->" 

271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__, 

272 layer._name))) 

273 for layer, num in self.layer2num.items(): 

274 if num not in self.num2layer or self.num2layer[num] != layer: 

275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__, 

276 layer._name))) 

277 lst.sort() 

278 return "\n".join(y for x, y in lst) 

279 

280 

281class LayersList(List[Type['scapy.packet.Packet']]): 

282 def __init__(self): 

283 # type: () -> None 

284 list.__init__(self) 

285 self.ldict = {} # type: Dict[str, List[Type[Packet]]] 

286 self.filtered = False 

287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501 

288 

289 def __repr__(self): 

290 # type: () -> str 

291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name) 

292 for layer in self) 

293 

294 def register(self, layer): 

295 # type: (Type[Packet]) -> None 

296 self.append(layer) 

297 if layer.__module__ not in self.ldict: 

298 self.ldict[layer.__module__] = [] 

299 self.ldict[layer.__module__].append(layer) 

300 

301 def layers(self): 

302 # type: () -> List[Tuple[str, str]] 

303 result = [] 

304 # This import may feel useless, but it is required for the eval below 

305 import scapy # noqa: F401 

306 try: 

307 import builtins # noqa: F401 

308 except ImportError: 

309 import __builtin__ # noqa: F401 

310 for lay in self.ldict: 

311 doc = eval(lay).__doc__ 

312 result.append((lay, doc.strip().split("\n")[0] if doc else lay)) 

313 return result 

314 

315 def filter(self, items): 

316 # type: (List[Type[Packet]]) -> None 

317 """Disable dissection of unused layers to speed up dissection""" 

318 if self.filtered: 

319 raise ValueError("Already filtered. Please disable it first") 

320 for lay in self.ldict.values(): 

321 for cls in lay: 

322 if cls not in self._backup_dict: 

323 self._backup_dict[cls] = cls.payload_guess[:] 

324 cls.payload_guess = [ 

325 y for y in cls.payload_guess if y[1] in items 

326 ] 

327 self.filtered = True 

328 

329 def unfilter(self): 

330 # type: () -> None 

331 """Re-enable dissection for all layers""" 

332 if not self.filtered: 

333 raise ValueError("Not filtered. Please filter first") 

334 for lay in self.ldict.values(): 

335 for cls in lay: 

336 cls.payload_guess = self._backup_dict[cls] 

337 self._backup_dict.clear() 

338 self.filtered = False 

339 

340 

341class CommandsList(List[Callable[..., Any]]): 

342 def __repr__(self): 

343 # type: () -> str 

344 s = [] 

345 for li in sorted(self, key=lambda x: x.__name__): 

346 doc = li.__doc__ if li.__doc__ else "--" 

347 doc = doc.lstrip().split('\n', 1)[0] 

348 s.append("%-22s: %s" % (li.__name__, doc)) 

349 return "\n".join(s) 

350 

351 def register(self, cmd): 

352 # type: (DecoratorCallable) -> DecoratorCallable 

353 self.append(cmd) 

354 return cmd # return cmd so that method can be used as a decorator 

355 

356 

357def lsc(): 

358 # type: () -> None 

359 """Displays Scapy's default commands""" 

360 print(repr(conf.commands)) 

361 

362 

363class CacheInstance(Dict[str, Any]): 

364 __slots__ = ["timeout", "name", "_timetable"] 

365 

366 def __init__(self, name="noname", timeout=None): 

367 # type: (str, Optional[int]) -> None 

368 self.timeout = timeout 

369 self.name = name 

370 self._timetable = {} # type: Dict[str, float] 

371 

372 def flush(self): 

373 # type: () -> None 

374 self._timetable.clear() 

375 self.clear() 

376 

377 def __getitem__(self, item): 

378 # type: (str) -> Any 

379 if item in self.__slots__: 

380 return object.__getattribute__(self, item) 

381 if not self.__contains__(item): 

382 raise KeyError(item) 

383 return super(CacheInstance, self).__getitem__(item) 

384 

385 def __contains__(self, item): 

386 if not super(CacheInstance, self).__contains__(item): 

387 return False 

388 if self.timeout is not None: 

389 t = self._timetable[item] 

390 if time.time() - t > self.timeout: 

391 return False 

392 return True 

393 

394 def get(self, item, default=None): 

395 # type: (str, Optional[Any]) -> Any 

396 # overloading this method is needed to force the dict to go through 

397 # the timetable check 

398 try: 

399 return self[item] 

400 except KeyError: 

401 return default 

402 

403 def __setitem__(self, item, v): 

404 # type: (str, str) -> None 

405 if item in self.__slots__: 

406 return object.__setattr__(self, item, v) 

407 self._timetable[item] = time.time() 

408 super(CacheInstance, self).__setitem__(item, v) 

409 

410 def update(self, 

411 other, # type: Any 

412 **kwargs # type: Any 

413 ): 

414 # type: (...) -> None 

415 for key, value in other.items(): 

416 # We only update an element from `other` either if it does 

417 # not exist in `self` or if the entry in `self` is older. 

418 if key not in self or self._timetable[key] < other._timetable[key]: 

419 dict.__setitem__(self, key, value) 

420 self._timetable[key] = other._timetable[key] 

421 

422 def iteritems(self): 

423 # type: () -> Iterator[Tuple[str, Any]] 

424 if self.timeout is None: 

425 return super(CacheInstance, self).items() 

426 t0 = time.time() 

427 return ( 

428 (k, v) 

429 for (k, v) in super(CacheInstance, self).items() 

430 if t0 - self._timetable[k] < self.timeout 

431 ) 

432 

433 def iterkeys(self): 

434 # type: () -> Iterator[str] 

435 if self.timeout is None: 

436 return super(CacheInstance, self).keys() 

437 t0 = time.time() 

438 return ( 

439 k 

440 for k in super(CacheInstance, self).keys() 

441 if t0 - self._timetable[k] < self.timeout 

442 ) 

443 

444 def __iter__(self): 

445 # type: () -> Iterator[str] 

446 return self.iterkeys() 

447 

448 def itervalues(self): 

449 # type: () -> Iterator[Tuple[str, Any]] 

450 if self.timeout is None: 

451 return super(CacheInstance, self).values() 

452 t0 = time.time() 

453 return ( 

454 v 

455 for (k, v) in super(CacheInstance, self).items() 

456 if t0 - self._timetable[k] < self.timeout 

457 ) 

458 

459 def items(self): 

460 # type: () -> Any 

461 return list(self.iteritems()) 

462 

463 def keys(self): 

464 # type: () -> Any 

465 return list(self.iterkeys()) 

466 

467 def values(self): 

468 # type: () -> Any 

469 return list(self.itervalues()) 

470 

471 def __len__(self): 

472 # type: () -> int 

473 if self.timeout is None: 

474 return super(CacheInstance, self).__len__() 

475 return len(self.keys()) 

476 

477 def summary(self): 

478 # type: () -> str 

479 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501 

480 

481 def __repr__(self): 

482 # type: () -> str 

483 s = [] 

484 if self: 

485 mk = max(len(k) for k in self) 

486 fmt = "%%-%is %%s" % (mk + 1) 

487 for item in self.items(): 

488 s.append(fmt % item) 

489 return "\n".join(s) 

490 

491 def copy(self): 

492 # type: () -> CacheInstance 

493 return copy.copy(self) 

494 

495 

496class NetCache: 

497 def __init__(self): 

498 # type: () -> None 

499 self._caches_list = [] # type: List[CacheInstance] 

500 

501 def add_cache(self, cache): 

502 # type: (CacheInstance) -> None 

503 self._caches_list.append(cache) 

504 setattr(self, cache.name, cache) 

505 

506 def new_cache(self, name, timeout=None): 

507 # type: (str, Optional[int]) -> CacheInstance 

508 c = CacheInstance(name=name, timeout=timeout) 

509 self.add_cache(c) 

510 return c 

511 

512 def __delattr__(self, attr): 

513 # type: (str) -> NoReturn 

514 raise AttributeError("Cannot delete attributes") 

515 

516 def update(self, other): 

517 # type: (NetCache) -> None 

518 for co in other._caches_list: 

519 if hasattr(self, co.name): 

520 getattr(self, co.name).update(co) 

521 else: 

522 self.add_cache(co.copy()) 

523 

524 def flush(self): 

525 # type: () -> None 

526 for c in self._caches_list: 

527 c.flush() 

528 

529 def __repr__(self): 

530 # type: () -> str 

531 return "\n".join(c.summary() for c in self._caches_list) 

532 

533 

534class ScapyExt: 

535 __slots__ = ["specs", "name", "version"] 

536 

537 class MODE(Enum): 

538 LAYERS = "layers" 

539 CONTRIB = "contrib" 

540 MODULES = "modules" 

541 

542 @dataclass 

543 class ScapyExtSpec: 

544 fullname: str 

545 mode: 'ScapyExt.MODE' 

546 spec: Any 

547 default: bool 

548 

549 def __init__(self): 

550 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {} 

551 

552 def config(self, name, version): 

553 self.name = name 

554 self.version = version 

555 

556 def register(self, name, mode, path, default=None): 

557 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !" 

558 fullname = f"scapy.{mode.value}.{name}" 

559 spec = importlib.util.spec_from_file_location( 

560 fullname, 

561 str(path), 

562 ) 

563 spec = self.ScapyExtSpec( 

564 fullname=fullname, 

565 mode=mode, 

566 spec=spec, 

567 default=default or False, 

568 ) 

569 if default is None: 

570 spec.default = bool(importlib.util.find_spec(spec.fullname)) 

571 self.specs[fullname] = spec 

572 

573 def __repr__(self): 

574 return "<ScapyExt %s %s (%s specs)>" % ( 

575 self.name, 

576 self.version, 

577 len(self.specs), 

578 ) 

579 

580 

581class ExtsManager(importlib.abc.MetaPathFinder): 

582 __slots__ = ["exts", "_loaded", "all_specs"] 

583 

584 SCAPY_PLUGIN_CLASSIFIER = 'Framework :: Scapy' 

585 GPLV2_CLASSIFIERS = [ 

586 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)', 

587 'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)', 

588 ] 

589 

590 def __init__(self): 

591 self.exts: List[ScapyExt] = [] 

592 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {} 

593 self._loaded = [] 

594 

595 def find_spec(self, fullname, path, target=None): 

596 if fullname in self.all_specs: 

597 return self.all_specs[fullname].spec 

598 

599 def invalidate_caches(self): 

600 pass 

601 

602 def _register_spec(self, spec): 

603 self.all_specs[spec.fullname] = spec 

604 if spec.default: 

605 loader = importlib.util.LazyLoader(spec.spec.loader) 

606 spec.spec.loader = loader 

607 module = importlib.util.module_from_spec(spec.spec) 

608 sys.modules[spec.fullname] = module 

609 loader.exec_module(module) 

610 

611 def load(self): 

612 try: 

613 import importlib.metadata 

614 except ImportError: 

615 return 

616 for distr in importlib.metadata.distributions(): 

617 if any( 

618 v == self.SCAPY_PLUGIN_CLASSIFIER 

619 for k, v in distr.metadata.items() if k == 'Classifier' 

620 ): 

621 try: 

622 pkg = next( 

623 k 

624 for k, v in importlib.metadata.packages_distributions().items() 

625 if distr.name in v 

626 ) 

627 except KeyError: 

628 pkg = distr.name 

629 if pkg in self._loaded: 

630 continue 

631 if not any( 

632 v in self.GPLV2_CLASSIFIERS 

633 for k, v in distr.metadata.items() if k == 'Classifier' 

634 ): 

635 log_loading.warning( 

636 "'%s' has no GPLv2 classifier therefore cannot be loaded." % pkg # noqa: E501 

637 ) 

638 continue 

639 self._loaded.append(pkg) 

640 ext = ScapyExt() 

641 try: 

642 scapy_ext = importlib.import_module(pkg) 

643 except Exception as ex: 

644 log_loading.warning( 

645 "'%s' failed during import with %s" % ( 

646 pkg, 

647 ex 

648 ) 

649 ) 

650 continue 

651 try: 

652 scapy_ext_func = scapy_ext.scapy_ext 

653 except AttributeError: 

654 log_loading.info( 

655 "'%s' included the Scapy Framework specifier " 

656 "but did not include a scapy_ext" % pkg 

657 ) 

658 continue 

659 try: 

660 scapy_ext_func(ext) 

661 except Exception as ex: 

662 log_loading.warning( 

663 "'%s' failed during initialization with %s" % ( 

664 pkg, 

665 ex 

666 ) 

667 ) 

668 continue 

669 for spec in ext.specs.values(): 

670 self._register_spec(spec) 

671 self.exts.append(ext) 

672 if self not in sys.meta_path: 

673 sys.meta_path.append(self) 

674 

675 def __repr__(self): 

676 from scapy.utils import pretty_list 

677 return pretty_list( 

678 [ 

679 (x.name, x.version, [y.fullname for y in x.specs.values()]) 

680 for x in self.exts 

681 ], 

682 [("Name", "Version", "Specs")], 

683 sortBy=0, 

684 ) 

685 

686 

687def _version_checker(module, minver): 

688 # type: (ModuleType, Tuple[int, ...]) -> bool 

689 """Checks that module has a higher version that minver. 

690 

691 params: 

692 - module: a module to test 

693 - minver: a tuple of versions 

694 """ 

695 # We could use LooseVersion, but distutils imports imp which is deprecated 

696 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?' 

697 version_tags_r = re.match( 

698 version_regexp, 

699 getattr(module, "__version__", "") 

700 ) 

701 if not version_tags_r: 

702 return False 

703 version_tags_i = version_tags_r.group(1).split(".") 

704 version_tags = tuple(int(x) for x in version_tags_i) 

705 return bool(version_tags >= minver) 

706 

707 

708def isCryptographyValid(): 

709 # type: () -> bool 

710 """ 

711 Check if the cryptography module >= 2.0.0 is present. This is the minimum 

712 version for most usages in Scapy. 

713 """ 

714 try: 

715 import cryptography 

716 except ImportError: 

717 return False 

718 return _version_checker(cryptography, (2, 0, 0)) 

719 

720 

721def isCryptographyAdvanced(): 

722 # type: () -> bool 

723 """ 

724 Check if the cryptography module is present, and if it supports X25519, 

725 ChaCha20Poly1305 and such. 

726 

727 Notes: 

728 - cryptography >= 2.0 is required 

729 - OpenSSL >= 1.1.0 is required 

730 """ 

731 try: 

732 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501 

733 X25519PrivateKey.generate() 

734 except Exception: 

735 return False 

736 else: 

737 return True 

738 

739 

740def isPyPy(): 

741 # type: () -> bool 

742 """Returns either scapy is running under PyPy or not""" 

743 try: 

744 import __pypy__ # noqa: F401 

745 return True 

746 except ImportError: 

747 return False 

748 

749 

750def _prompt_changer(attr, val, old): 

751 # type: (str, Any, Any) -> Any 

752 """Change the current prompt theme""" 

753 Interceptor.set_from_hook(conf, attr, val) 

754 try: 

755 sys.ps1 = conf.color_theme.prompt(conf.prompt) 

756 except Exception: 

757 pass 

758 try: 

759 apply_ipython_style( 

760 get_ipython() # type: ignore 

761 ) 

762 except NameError: 

763 pass 

764 return getattr(conf, attr, old) 

765 

766 

767def _set_conf_sockets(): 

768 # type: () -> None 

769 """Populate the conf.L2Socket and conf.L3Socket 

770 according to the various use_* parameters 

771 """ 

772 if conf.use_bpf and not BSD: 

773 Interceptor.set_from_hook(conf, "use_bpf", False) 

774 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !") 

775 if not conf.use_pcap and SOLARIS: 

776 Interceptor.set_from_hook(conf, "use_pcap", True) 

777 raise ScapyInvalidPlatformException( 

778 "Scapy only supports libpcap on Solaris !" 

779 ) 

780 # we are already in an Interceptor hook, use Interceptor.set_from_hook 

781 if conf.use_pcap: 

782 try: 

783 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \ 

784 L3pcapSocket 

785 except (OSError, ImportError): 

786 log_loading.warning("No libpcap provider available ! pcap won't be used") 

787 Interceptor.set_from_hook(conf, "use_pcap", False) 

788 else: 

789 conf.L3socket = L3pcapSocket 

790 conf.L3socket6 = functools.partial( 

791 L3pcapSocket, filter="ip6") 

792 conf.L2socket = L2pcapSocket 

793 conf.L2listen = L2pcapListenSocket 

794 elif conf.use_bpf: 

795 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \ 

796 L2bpfSocket, L3bpfSocket 

797 conf.L3socket = L3bpfSocket 

798 conf.L3socket6 = functools.partial( 

799 L3bpfSocket, filter="ip6") 

800 conf.L2socket = L2bpfSocket 

801 conf.L2listen = L2bpfListenSocket 

802 elif LINUX: 

803 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket 

804 conf.L3socket = L3PacketSocket 

805 conf.L3socket6 = cast( 

806 "Type[SuperSocket]", 

807 functools.partial( 

808 L3PacketSocket, 

809 filter="ip6" 

810 ) 

811 ) 

812 conf.L2socket = L2Socket 

813 conf.L2listen = L2ListenSocket 

814 elif WINDOWS: 

815 from scapy.arch.windows import _NotAvailableSocket 

816 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6 

817 conf.L3socket = L3WinSocket 

818 conf.L3socket6 = L3WinSocket6 

819 conf.L2socket = _NotAvailableSocket 

820 conf.L2listen = _NotAvailableSocket 

821 else: 

822 from scapy.supersocket import L3RawSocket, L3RawSocket6 

823 conf.L3socket = L3RawSocket 

824 conf.L3socket6 = L3RawSocket6 

825 # Reload the interfaces 

826 conf.ifaces.reload() 

827 

828 

829def _socket_changer(attr, val, old): 

830 # type: (str, bool, bool) -> Any 

831 if not isinstance(val, bool): 

832 raise TypeError("This argument should be a boolean") 

833 Interceptor.set_from_hook(conf, attr, val) 

834 dependencies = { # Things that will be turned off 

835 "use_pcap": ["use_bpf"], 

836 "use_bpf": ["use_pcap"], 

837 } 

838 restore = {k: getattr(conf, k) for k in dependencies} 

839 del restore[attr] # This is handled directly by _set_conf_sockets 

840 if val: # Only if True 

841 for param in dependencies[attr]: 

842 Interceptor.set_from_hook(conf, param, False) 

843 try: 

844 _set_conf_sockets() 

845 except (ScapyInvalidPlatformException, ImportError) as e: 

846 for key, value in restore.items(): 

847 Interceptor.set_from_hook(conf, key, value) 

848 if isinstance(e, ScapyInvalidPlatformException): 

849 raise 

850 return getattr(conf, attr) 

851 

852 

853def _loglevel_changer(attr, val, old): 

854 # type: (str, int, int) -> int 

855 """Handle a change of conf.logLevel""" 

856 log_scapy.setLevel(val) 

857 return val 

858 

859 

860def _iface_changer(attr, val, old): 

861 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface' 

862 """Resolves the interface in conf.iface""" 

863 if isinstance(val, str): 

864 from scapy.interfaces import resolve_iface 

865 iface = resolve_iface(val) 

866 if old and iface.dummy: 

867 warning( 

868 "This interface is not specified in any provider ! " 

869 "See conf.ifaces output" 

870 ) 

871 return iface 

872 return val 

873 

874 

875def _reset_tls_nss_keys(attr, val, old): 

876 # type: (str, Any, Any) -> Any 

877 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes""" 

878 conf.tls_nss_keys = None 

879 return val 

880 

881 

882class Conf(ConfClass): 

883 """ 

884 This object contains the configuration of Scapy. 

885 """ 

886 version: str = ReadOnlyAttribute("version", VERSION) 

887 session: str = "" #: filename where the session will be saved 

888 interactive = False 

889 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto". 

890 #: Default: Auto 

891 interactive_shell = "auto" 

892 #: Configuration for "ipython" to use jedi (disabled by default) 

893 ipython_use_jedi = False 

894 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...) 

895 stealth = "not implemented" 

896 #: selects the default output interface for srp() and sendp(). 

897 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501 

898 layers: LayersList = LayersList() 

899 commands = CommandsList() # type: CommandsList 

900 #: Codec used by default for ASN1 objects 

901 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec' 

902 #: Default size for ASN1 objects 

903 ASN1_default_long_size = 0 

904 #: choose the AS resolver class to use 

905 AS_resolver = None # type: scapy.as_resolvers.AS_resolver 

906 dot15d4_protocol = None # Used in dot15d4.py 

907 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer) 

908 #: if 0, doesn't check that IPID matches between IP sent and 

909 #: ICMP IP citation received 

910 #: if 1, checks that they either are equal or byte swapped 

911 #: equals (bug in some IP stacks) 

912 #: if 2, strictly checks that they are equals 

913 checkIPID = False 

914 #: if 1, checks IP src in IP and ICMP IP citation match 

915 #: (bug in some NAT stacks) 

916 checkIPsrc = True 

917 checkIPaddr = True 

918 #: if True, checks that IP-in-IP layers match. If False, do 

919 #: not check IP layers that encapsulates another IP layer 

920 checkIPinIP = True 

921 #: if 1, also check that TCP seq and ack match the 

922 #: ones in ICMP citation 

923 check_TCPerror_seqack = False 

924 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose) 

925 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer) 

926 #: default mode for the promiscuous mode of a socket (to get answers if you 

927 #: spoof on a lan) 

928 sniff_promisc = True # type: bool 

929 raw_layer = None # type: Type[Packet] 

930 raw_summary = False # type: Union[bool, Callable[[bytes], Any]] 

931 padding_layer = None # type: Type[Packet] 

932 default_l2 = None # type: Type[Packet] 

933 l2types: Num2Layer = Num2Layer() 

934 l3types: Num2Layer = Num2Layer() 

935 L3socket = None # type: Type[scapy.supersocket.SuperSocket] 

936 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket] 

937 L2socket = None # type: Type[scapy.supersocket.SuperSocket] 

938 L2listen = None # type: Type[scapy.supersocket.SuperSocket] 

939 BTsocket = None # type: Type[scapy.supersocket.SuperSocket] 

940 min_pkt_size = 60 

941 #: holds MIB direct access dictionary 

942 mib = None # type: 'scapy.asn1.mib.MIBDict' 

943 bufsize = 2**16 

944 #: history file 

945 histfile: str = os.getenv( 

946 'SCAPY_HISTFILE', 

947 os.path.join( 

948 os.path.expanduser("~"), 

949 ".config", "scapy", "history" 

950 ) 

951 ) 

952 #: includes padding in disassembled packets 

953 padding = 1 

954 #: BPF filter for packets to ignore 

955 except_filter = "" 

956 #: bpf filter added to every sniffing socket to exclude traffic 

957 #: from analysis 

958 filter = "" 

959 #: when 1, store received packet that are not matched into `debug.recv` 

960 debug_match = False 

961 #: When 1, print some TLS session secrets when they are computed, and 

962 #: warn about the session recognition. 

963 debug_tls = False 

964 wepkey = "" 

965 #: holds the Scapy interface list and manager 

966 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict' 

967 #: holds the cache of interfaces loaded from Libpcap 

968 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str]] 

969 # `neighbor` will be filed by scapy.layers.l2 

970 neighbor = None # type: 'scapy.layers.l2.Neighbor' 

971 #: holds the name servers IP/hosts used for custom DNS resolution 

972 nameservers = None # type: str 

973 #: automatically load IPv4 routes on startup. Disable this if your 

974 #: routing table is too big. 

975 route_autoload = True 

976 #: automatically load IPv6 routes on startup. Disable this if your 

977 #: routing table is too big. 

978 route6_autoload = True 

979 #: holds the Scapy IPv4 routing table and provides methods to 

980 #: manipulate it 

981 route = None # type: 'scapy.route.Route' 

982 # `route` will be filed by route.py 

983 #: holds the Scapy IPv6 routing table and provides methods to 

984 #: manipulate it 

985 route6 = None # type: 'scapy.route6.Route6' 

986 manufdb = None # type: 'scapy.data.ManufDA' 

987 ethertypes = None # type: 'scapy.data.EtherDA' 

988 protocols = None # type: 'scapy.dadict.DADict[int, str]' 

989 services_udp = None # type: 'scapy.dadict.DADict[int, str]' 

990 services_tcp = None # type: 'scapy.dadict.DADict[int, str]' 

991 services_sctp = None # type: 'scapy.dadict.DADict[int, str]' 

992 # 'route6' will be filed by route6.py 

993 teredoPrefix = "" # type: str 

994 teredoServerPort = None # type: int 

995 auto_fragment = True 

996 #: raise exception when a packet dissector raises an exception 

997 debug_dissector = False 

998 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer) 

999 #: how much time between warnings from the same place 

1000 warning_threshold = 5 

1001 prog: ProgPath = ProgPath() 

1002 #: holds list of fields for which resolution should be done 

1003 resolve: Resolve = Resolve() 

1004 #: holds list of enum fields for which conversion to string 

1005 #: should NOT be done 

1006 noenum: Resolve = Resolve() 

1007 emph: Emphasize = Emphasize() 

1008 #: read only attribute to show if PyPy is in use 

1009 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy()) 

1010 #: use libpcap integration or not. Changing this value will update 

1011 #: the conf.L[2/3] sockets 

1012 use_pcap: bool = Interceptor( 

1013 "use_pcap", 

1014 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"), 

1015 _socket_changer 

1016 ) 

1017 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer) 

1018 use_npcap = False 

1019 ipv6_enabled: bool = socket.has_ipv6 

1020 stats_classic_protocols = [] # type: List[Type[Packet]] 

1021 stats_dot11_protocols = [] # type: List[Type[Packet]] 

1022 temp_files = [] # type: List[str] 

1023 #: netcache holds time-based caches for net operations 

1024 netcache: NetCache = NetCache() 

1025 geoip_city = None 

1026 # can, tls, http and a few others are not loaded by default 

1027 load_layers: List[str] = [ 

1028 'bluetooth', 

1029 'bluetooth4LE', 

1030 'dcerpc', 

1031 'dhcp', 

1032 'dhcp6', 

1033 'dns', 

1034 'dot11', 

1035 'dot15d4', 

1036 'eap', 

1037 'gprs', 

1038 'gssapi', 

1039 'hsrp', 

1040 'inet', 

1041 'inet6', 

1042 'ipsec', 

1043 'ir', 

1044 'isakmp', 

1045 'kerberos', 

1046 'l2', 

1047 'l2tp', 

1048 'ldap', 

1049 'llmnr', 

1050 'lltd', 

1051 'mgcp', 

1052 'mobileip', 

1053 'netbios', 

1054 'netflow', 

1055 'ntlm', 

1056 'ntp', 

1057 'ppi', 

1058 'ppp', 

1059 'pptp', 

1060 'radius', 

1061 'rip', 

1062 'rtp', 

1063 'sctp', 

1064 'sixlowpan', 

1065 'skinny', 

1066 'smb', 

1067 'smb2', 

1068 'smbclient', 

1069 'smbserver', 

1070 'snmp', 

1071 'spnego', 

1072 'tftp', 

1073 'vrrp', 

1074 'vxlan', 

1075 'x509', 

1076 'zigbee' 

1077 ] 

1078 #: a dict which can be used by contrib layers to store local 

1079 #: configuration 

1080 contribs = dict() # type: Dict[str, Any] 

1081 exts: ExtsManager = ExtsManager() 

1082 crypto_valid = isCryptographyValid() 

1083 crypto_valid_advanced = isCryptographyAdvanced() 

1084 #: controls whether or not to display the fancy banner 

1085 fancy_banner = True 

1086 #: controls whether tables (conf.iface, conf.route...) should be cropped 

1087 #: to fit the terminal 

1088 auto_crop_tables = True 

1089 #: how often to check for new packets. 

1090 #: Defaults to 0.05s. 

1091 recv_poll_rate = 0.05 

1092 #: When True, raise exception if no dst MAC found otherwise broadcast. 

1093 #: Default is False. 

1094 raise_no_dst_mac = False 

1095 loopback_name: str = "lo" if LINUX else "lo0" 

1096 nmap_base = "" # type: str 

1097 nmap_kdb = None # type: Optional[NmapKnowledgeBase] 

1098 #: a safety mechanism: the maximum amount of items included in a PacketListField 

1099 #: or a FieldListField 

1100 max_list_count = 100 

1101 #: When the TLS module is loaded (not by default), the following turns on sessions 

1102 tls_session_enable = False 

1103 #: Filename containing NSS Keys Log 

1104 tls_nss_filename = Interceptor( 

1105 "tls_nss_filename", 

1106 None, 

1107 _reset_tls_nss_keys 

1108 ) 

1109 #: Dictionary containing parsed NSS Keys 

1110 tls_nss_keys: Dict[str, bytes] = None 

1111 #: When TCPSession is used, parse DCE/RPC sessions automatically. 

1112 #: This should be used for passive sniffing. 

1113 dcerpc_session_enable = False 

1114 #: If a capture is missing the first DCE/RPC bindin message, we might incorrectly 

1115 #: assume that header signing isn't used. This forces it on. 

1116 dcerpc_force_header_signing = False 

1117 #: Windows SSPs for sniffing. This is used with 

1118 #: dcerpc_session_enable 

1119 winssps_passive = [] 

1120 

1121 def __getattribute__(self, attr): 

1122 # type: (str) -> Any 

1123 # Those are loaded on runtime to avoid import loops 

1124 if attr == "manufdb": 

1125 from scapy.data import MANUFDB 

1126 return MANUFDB 

1127 if attr == "ethertypes": 

1128 from scapy.data import ETHER_TYPES 

1129 return ETHER_TYPES 

1130 if attr == "protocols": 

1131 from scapy.data import IP_PROTOS 

1132 return IP_PROTOS 

1133 if attr == "services_udp": 

1134 from scapy.data import UDP_SERVICES 

1135 return UDP_SERVICES 

1136 if attr == "services_tcp": 

1137 from scapy.data import TCP_SERVICES 

1138 return TCP_SERVICES 

1139 if attr == "services_sctp": 

1140 from scapy.data import SCTP_SERVICES 

1141 return SCTP_SERVICES 

1142 if attr == "iface6": 

1143 warnings.warn( 

1144 "conf.iface6 is deprecated in favor of conf.iface", 

1145 DeprecationWarning 

1146 ) 

1147 attr = "iface" 

1148 return object.__getattribute__(self, attr) 

1149 

1150 

1151if not Conf.ipv6_enabled: 

1152 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501 

1153 for m in ["inet6", "dhcp6", "sixlowpan"]: 

1154 if m in Conf.load_layers: 

1155 Conf.load_layers.remove(m) 

1156 

1157conf = Conf() # type: Conf 

1158 

1159# Python 3.8 Only 

1160if sys.version_info >= (3, 8): 

1161 conf.exts.load() 

1162 

1163 

1164def crypto_validator(func): 

1165 # type: (DecoratorCallable) -> DecoratorCallable 

1166 """ 

1167 This a decorator to be used for any method relying on the cryptography library. # noqa: E501 

1168 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'. 

1169 """ 

1170 def func_in(*args, **kwargs): 

1171 # type: (*Any, **Any) -> Any 

1172 if not conf.crypto_valid: 

1173 raise ImportError("Cannot execute crypto-related method! " 

1174 "Please install python-cryptography v1.7 or later.") # noqa: E501 

1175 return func(*args, **kwargs) 

1176 return func_in 

1177 

1178 

1179def scapy_delete_temp_files(): 

1180 # type: () -> None 

1181 for f in conf.temp_files: 

1182 try: 

1183 os.unlink(f) 

1184 except Exception: 

1185 pass 

1186 del conf.temp_files[:] 

1187 

1188 

1189atexit.register(scapy_delete_temp_files)