Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/config.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

661 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Implementation of the configuration object. 

8""" 

9 

10import atexit 

11import copy 

12import functools 

13import os 

14import pathlib 

15import re 

16import socket 

17import sys 

18import time 

19import warnings 

20 

21from dataclasses import dataclass 

22from enum import Enum 

23 

24import importlib 

25import importlib.abc 

26import importlib.util 

27 

28import scapy 

29from scapy import VERSION 

30from scapy.base_classes import BasePacket 

31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS 

32from scapy.error import ( 

33 log_loading, 

34 log_scapy, 

35 ScapyInvalidPlatformException, 

36 warning, 

37) 

38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style 

39 

40# Typing imports 

41from typing import ( 

42 cast, 

43 Any, 

44 Callable, 

45 Dict, 

46 Iterator, 

47 List, 

48 NoReturn, 

49 Optional, 

50 Set, 

51 Tuple, 

52 Type, 

53 Union, 

54 overload, 

55 TYPE_CHECKING, 

56) 

57from types import ModuleType 

58from scapy.compat import DecoratorCallable 

59 

60if TYPE_CHECKING: 

61 # Do not import at runtime 

62 import scapy.as_resolvers 

63 from scapy.modules.nmap import NmapKnowledgeBase 

64 from scapy.packet import Packet 

65 from scapy.supersocket import SuperSocket # noqa: F401 

66 import scapy.asn1.asn1 

67 import scapy.asn1.mib 

68 

69############ 

70# Config # 

71############ 

72 

73 

74class ConfClass(object): 

75 def configure(self, cnf): 

76 # type: (ConfClass) -> None 

77 self.__dict__ = cnf.__dict__.copy() 

78 

79 def __repr__(self): 

80 # type: () -> str 

81 return str(self) 

82 

83 def __str__(self): 

84 # type: () -> str 

85 s = "" 

86 dkeys = self.__class__.__dict__.copy() 

87 dkeys.update(self.__dict__) 

88 keys = sorted(dkeys) 

89 for i in keys: 

90 if i[0] != "_": 

91 r = repr(getattr(self, i)) 

92 r = " ".join(r.split()) 

93 wlen = 76 - max(len(i), 10) 

94 if len(r) > wlen: 

95 r = r[:wlen - 3] + "..." 

96 s += "%-10s = %s\n" % (i, r) 

97 return s[:-1] 

98 

99 

100class Interceptor(object): 

101 def __init__(self, 

102 name, # type: str 

103 default, # type: Any 

104 hook, # type: Callable[..., Any] 

105 args=None, # type: Optional[List[Any]] 

106 kargs=None # type: Optional[Dict[str, Any]] 

107 ): 

108 # type: (...) -> None 

109 self.name = name 

110 self.intname = "_intercepted_%s" % name 

111 self.default = default 

112 self.hook = hook 

113 self.args = args if args is not None else [] 

114 self.kargs = kargs if kargs is not None else {} 

115 

116 def __get__(self, obj, typ=None): 

117 # type: (Conf, Optional[type]) -> Any 

118 if not hasattr(obj, self.intname): 

119 setattr(obj, self.intname, self.default) 

120 return getattr(obj, self.intname) 

121 

122 @staticmethod 

123 def set_from_hook(obj, name, val): 

124 # type: (Conf, str, bool) -> None 

125 int_name = "_intercepted_%s" % name 

126 setattr(obj, int_name, val) 

127 

128 def __set__(self, obj, val): 

129 # type: (Conf, Any) -> None 

130 old = getattr(obj, self.intname, self.default) 

131 val = self.hook(self.name, val, old, *self.args, **self.kargs) 

132 setattr(obj, self.intname, val) 

133 

134 

135def _readonly(name): 

136 # type: (str) -> NoReturn 

137 default = Conf.__dict__[name].default 

138 Interceptor.set_from_hook(conf, name, default) 

139 raise ValueError("Read-only value !") 

140 

141 

142ReadOnlyAttribute = functools.partial( 

143 Interceptor, 

144 hook=(lambda name, *args, **kwargs: _readonly(name)) 

145) 

146ReadOnlyAttribute.__doc__ = "Read-only class attribute" 

147 

148 

149class ProgPath(ConfClass): 

150 _default: str = "<System default>" 

151 universal_open: str = "open" if DARWIN else "xdg-open" 

152 pdfreader: str = universal_open 

153 psreader: str = universal_open 

154 svgreader: str = universal_open 

155 dot: str = "dot" 

156 display: str = "display" 

157 tcpdump: str = "tcpdump" 

158 tcpreplay: str = "tcpreplay" 

159 hexedit: str = "hexer" 

160 tshark: str = "tshark" 

161 wireshark: str = "wireshark" 

162 ifconfig: str = "ifconfig" 

163 extcap_folders: List[str] = [ 

164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"), 

165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap", 

166 ] 

167 

168 

169class ConfigFieldList: 

170 def __init__(self): 

171 # type: () -> None 

172 self.fields = set() # type: Set[Any] 

173 self.layers = set() # type: Set[Any] 

174 

175 @staticmethod 

176 def _is_field(f): 

177 # type: (Any) -> bool 

178 return hasattr(f, "owners") 

179 

180 def _recalc_layer_list(self): 

181 # type: () -> None 

182 self.layers = {owner for f in self.fields for owner in f.owners} 

183 

184 def add(self, *flds): 

185 # type: (*Any) -> None 

186 self.fields |= {f for f in flds if self._is_field(f)} 

187 self._recalc_layer_list() 

188 

189 def remove(self, *flds): 

190 # type: (*Any) -> None 

191 self.fields -= set(flds) 

192 self._recalc_layer_list() 

193 

194 def __contains__(self, elt): 

195 # type: (Any) -> bool 

196 if isinstance(elt, BasePacket): 

197 return elt in self.layers 

198 return elt in self.fields 

199 

200 def __repr__(self): 

201 # type: () -> str 

202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501 

203 

204 

205class Emphasize(ConfigFieldList): 

206 pass 

207 

208 

209class Resolve(ConfigFieldList): 

210 pass 

211 

212 

213class Num2Layer: 

214 def __init__(self): 

215 # type: () -> None 

216 self.num2layer = {} # type: Dict[int, Type[Packet]] 

217 self.layer2num = {} # type: Dict[Type[Packet], int] 

218 

219 def register(self, num, layer): 

220 # type: (int, Type[Packet]) -> None 

221 self.register_num2layer(num, layer) 

222 self.register_layer2num(num, layer) 

223 

224 def register_num2layer(self, num, layer): 

225 # type: (int, Type[Packet]) -> None 

226 self.num2layer[num] = layer 

227 

228 def register_layer2num(self, num, layer): 

229 # type: (int, Type[Packet]) -> None 

230 self.layer2num[layer] = num 

231 

232 @overload 

233 def __getitem__(self, item): 

234 # type: (Type[Packet]) -> int 

235 pass 

236 

237 @overload 

238 def __getitem__(self, item): # noqa: F811 

239 # type: (int) -> Type[Packet] 

240 pass 

241 

242 def __getitem__(self, item): # noqa: F811 

243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]] 

244 if isinstance(item, int): 

245 return self.num2layer[item] 

246 else: 

247 return self.layer2num[item] 

248 

249 def __contains__(self, item): 

250 # type: (Union[int, Type[Packet]]) -> bool 

251 if isinstance(item, int): 

252 return item in self.num2layer 

253 else: 

254 return item in self.layer2num 

255 

256 def get(self, 

257 item, # type: Union[int, Type[Packet]] 

258 default=None, # type: Optional[Type[Packet]] 

259 ): 

260 # type: (...) -> Optional[Union[int, Type[Packet]]] 

261 return self[item] if item in self else default 

262 

263 def __repr__(self): 

264 # type: () -> str 

265 lst = [] 

266 for num, layer in self.num2layer.items(): 

267 if layer in self.layer2num and self.layer2num[layer] == num: 

268 dir = "<->" 

269 else: 

270 dir = " ->" 

271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__, 

272 layer._name))) 

273 for layer, num in self.layer2num.items(): 

274 if num not in self.num2layer or self.num2layer[num] != layer: 

275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__, 

276 layer._name))) 

277 lst.sort() 

278 return "\n".join(y for x, y in lst) 

279 

280 

281class LayersList(List[Type['scapy.packet.Packet']]): 

282 def __init__(self): 

283 # type: () -> None 

284 list.__init__(self) 

285 self.ldict = {} # type: Dict[str, List[Type[Packet]]] 

286 self.filtered = False 

287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501 

288 

289 def __repr__(self): 

290 # type: () -> str 

291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name) 

292 for layer in self) 

293 

294 def register(self, layer): 

295 # type: (Type[Packet]) -> None 

296 self.append(layer) 

297 

298 # Skip arch* modules 

299 if layer.__module__.startswith("scapy.arch."): 

300 return 

301 

302 # Register in module 

303 if layer.__module__ not in self.ldict: 

304 self.ldict[layer.__module__] = [] 

305 self.ldict[layer.__module__].append(layer) 

306 

307 def layers(self): 

308 # type: () -> List[Tuple[str, str]] 

309 result = [] 

310 # This import may feel useless, but it is required for the eval below 

311 import scapy # noqa: F401 

312 try: 

313 import builtins # noqa: F401 

314 except ImportError: 

315 import __builtin__ # noqa: F401 

316 for lay in self.ldict: 

317 doc = eval(lay).__doc__ 

318 result.append((lay, doc.strip().split("\n")[0] if doc else lay)) 

319 return result 

320 

321 def filter(self, items): 

322 # type: (List[Type[Packet]]) -> None 

323 """Disable dissection of unused layers to speed up dissection""" 

324 if self.filtered: 

325 raise ValueError("Already filtered. Please disable it first") 

326 for lay in self.ldict.values(): 

327 for cls in lay: 

328 if cls not in self._backup_dict: 

329 self._backup_dict[cls] = cls.payload_guess[:] 

330 cls.payload_guess = [ 

331 y for y in cls.payload_guess if y[1] in items 

332 ] 

333 self.filtered = True 

334 

335 def unfilter(self): 

336 # type: () -> None 

337 """Re-enable dissection for all layers""" 

338 if not self.filtered: 

339 raise ValueError("Not filtered. Please filter first") 

340 for lay in self.ldict.values(): 

341 for cls in lay: 

342 cls.payload_guess = self._backup_dict[cls] 

343 self._backup_dict.clear() 

344 self.filtered = False 

345 

346 

347class CommandsList(List[Callable[..., Any]]): 

348 def __repr__(self): 

349 # type: () -> str 

350 s = [] 

351 for li in sorted(self, key=lambda x: x.__name__): 

352 doc = li.__doc__ if li.__doc__ else "--" 

353 doc = doc.lstrip().split('\n', 1)[0] 

354 s.append("%-22s: %s" % (li.__name__, doc)) 

355 return "\n".join(s) 

356 

357 def register(self, cmd): 

358 # type: (DecoratorCallable) -> DecoratorCallable 

359 self.append(cmd) 

360 return cmd # return cmd so that method can be used as a decorator 

361 

362 

363def lsc(): 

364 # type: () -> None 

365 """Displays Scapy's default commands""" 

366 print(repr(conf.commands)) 

367 

368 

369class CacheInstance(Dict[str, Any]): 

370 __slots__ = ["timeout", "name", "_timetable"] 

371 

372 def __init__(self, name="noname", timeout=None): 

373 # type: (str, Optional[int]) -> None 

374 self.timeout = timeout 

375 self.name = name 

376 self._timetable = {} # type: Dict[str, float] 

377 

378 def flush(self): 

379 # type: () -> None 

380 self._timetable.clear() 

381 self.clear() 

382 

383 def __getitem__(self, item): 

384 # type: (str) -> Any 

385 if item in self.__slots__: 

386 return object.__getattribute__(self, item) 

387 if not self.__contains__(item): 

388 raise KeyError(item) 

389 return super(CacheInstance, self).__getitem__(item) 

390 

391 def __contains__(self, item): 

392 if not super(CacheInstance, self).__contains__(item): 

393 return False 

394 if self.timeout is not None: 

395 t = self._timetable[item] 

396 if time.time() - t > self.timeout: 

397 return False 

398 return True 

399 

400 def get(self, item, default=None): 

401 # type: (str, Optional[Any]) -> Any 

402 # overloading this method is needed to force the dict to go through 

403 # the timetable check 

404 try: 

405 return self[item] 

406 except KeyError: 

407 return default 

408 

409 def __setitem__(self, item, v): 

410 # type: (str, str) -> None 

411 if item in self.__slots__: 

412 return object.__setattr__(self, item, v) 

413 self._timetable[item] = time.time() 

414 super(CacheInstance, self).__setitem__(item, v) 

415 

416 def update(self, 

417 other, # type: Any 

418 **kwargs # type: Any 

419 ): 

420 # type: (...) -> None 

421 for key, value in other.items(): 

422 # We only update an element from `other` either if it does 

423 # not exist in `self` or if the entry in `self` is older. 

424 if key not in self or self._timetable[key] < other._timetable[key]: 

425 dict.__setitem__(self, key, value) 

426 self._timetable[key] = other._timetable[key] 

427 

428 def iteritems(self): 

429 # type: () -> Iterator[Tuple[str, Any]] 

430 if self.timeout is None: 

431 return super(CacheInstance, self).items() 

432 t0 = time.time() 

433 return ( 

434 (k, v) 

435 for (k, v) in super(CacheInstance, self).items() 

436 if t0 - self._timetable[k] < self.timeout 

437 ) 

438 

439 def iterkeys(self): 

440 # type: () -> Iterator[str] 

441 if self.timeout is None: 

442 return super(CacheInstance, self).keys() 

443 t0 = time.time() 

444 return ( 

445 k 

446 for k in super(CacheInstance, self).keys() 

447 if t0 - self._timetable[k] < self.timeout 

448 ) 

449 

450 def __iter__(self): 

451 # type: () -> Iterator[str] 

452 return self.iterkeys() 

453 

454 def itervalues(self): 

455 # type: () -> Iterator[Tuple[str, Any]] 

456 if self.timeout is None: 

457 return super(CacheInstance, self).values() 

458 t0 = time.time() 

459 return ( 

460 v 

461 for (k, v) in super(CacheInstance, self).items() 

462 if t0 - self._timetable[k] < self.timeout 

463 ) 

464 

465 def items(self): 

466 # type: () -> Any 

467 return list(self.iteritems()) 

468 

469 def keys(self): 

470 # type: () -> Any 

471 return list(self.iterkeys()) 

472 

473 def values(self): 

474 # type: () -> Any 

475 return list(self.itervalues()) 

476 

477 def __len__(self): 

478 # type: () -> int 

479 if self.timeout is None: 

480 return super(CacheInstance, self).__len__() 

481 return len(self.keys()) 

482 

483 def summary(self): 

484 # type: () -> str 

485 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501 

486 

487 def __repr__(self): 

488 # type: () -> str 

489 s = [] 

490 if self: 

491 mk = max(len(k) for k in self) 

492 fmt = "%%-%is %%s" % (mk + 1) 

493 for item in self.items(): 

494 s.append(fmt % item) 

495 return "\n".join(s) 

496 

497 def copy(self): 

498 # type: () -> CacheInstance 

499 return copy.copy(self) 

500 

501 

502class NetCache: 

503 def __init__(self): 

504 # type: () -> None 

505 self._caches_list = [] # type: List[CacheInstance] 

506 

507 def add_cache(self, cache): 

508 # type: (CacheInstance) -> None 

509 self._caches_list.append(cache) 

510 setattr(self, cache.name, cache) 

511 

512 def new_cache(self, name, timeout=None): 

513 # type: (str, Optional[int]) -> CacheInstance 

514 c = CacheInstance(name=name, timeout=timeout) 

515 self.add_cache(c) 

516 return c 

517 

518 def __delattr__(self, attr): 

519 # type: (str) -> NoReturn 

520 raise AttributeError("Cannot delete attributes") 

521 

522 def update(self, other): 

523 # type: (NetCache) -> None 

524 for co in other._caches_list: 

525 if hasattr(self, co.name): 

526 getattr(self, co.name).update(co) 

527 else: 

528 self.add_cache(co.copy()) 

529 

530 def flush(self): 

531 # type: () -> None 

532 for c in self._caches_list: 

533 c.flush() 

534 

535 def __repr__(self): 

536 # type: () -> str 

537 return "\n".join(c.summary() for c in self._caches_list) 

538 

539 

540class ScapyExt: 

541 __slots__ = ["specs", "name", "version", "bash_completions"] 

542 

543 class MODE(Enum): 

544 LAYERS = "layers" 

545 CONTRIB = "contrib" 

546 MODULES = "modules" 

547 

548 @dataclass 

549 class ScapyExtSpec: 

550 fullname: str 

551 mode: 'ScapyExt.MODE' 

552 spec: Any 

553 default: bool 

554 

555 def __init__(self): 

556 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {} 

557 self.bash_completions = {} 

558 

559 def config(self, name, version): 

560 self.name = name 

561 self.version = version 

562 

563 def register(self, name, mode, path, default=None): 

564 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !" 

565 fullname = f"scapy.{mode.value}.{name}" 

566 spec = importlib.util.spec_from_file_location( 

567 fullname, 

568 str(path), 

569 ) 

570 spec = self.ScapyExtSpec( 

571 fullname=fullname, 

572 mode=mode, 

573 spec=spec, 

574 default=default or False, 

575 ) 

576 if default is None: 

577 spec.default = bool(importlib.util.find_spec(spec.fullname)) 

578 self.specs[fullname] = spec 

579 

580 def register_bashcompletion(self, script: pathlib.Path): 

581 self.bash_completions[script.name] = script 

582 

583 def __repr__(self): 

584 return "<ScapyExt %s %s (%s specs)>" % ( 

585 self.name, 

586 self.version, 

587 len(self.specs), 

588 ) 

589 

590 

591class ExtsManager(importlib.abc.MetaPathFinder): 

592 __slots__ = ["exts", "all_specs"] 

593 

594 GPLV2_LICENCES = [ 

595 "GPL-2.0-only", 

596 "GPL-2.0-or-later", 

597 ] 

598 

599 def __init__(self): 

600 self.exts: List[ScapyExt] = [] 

601 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {} 

602 self._loaded: List[str] = [] 

603 # Add to meta_path as we are an import provider 

604 if self not in sys.meta_path: 

605 sys.meta_path.append(self) 

606 

607 def find_spec(self, fullname, path, target=None): 

608 if fullname in self.all_specs: 

609 return self.all_specs[fullname].spec 

610 

611 def invalidate_caches(self): 

612 pass 

613 

614 def _register_spec(self, spec): 

615 # Register to known specs 

616 self.all_specs[spec.fullname] = spec 

617 

618 # If default=True, inject it in the currently loaded modules 

619 if spec.default: 

620 loader = importlib.util.LazyLoader(spec.spec.loader) 

621 spec.spec.loader = loader 

622 module = importlib.util.module_from_spec(spec.spec) 

623 sys.modules[spec.fullname] = module 

624 loader.exec_module(module) 

625 

626 def load(self, extension: str): 

627 """ 

628 Load a scapy extension. 

629 

630 :param extension: the name of the extension, as installed. 

631 """ 

632 if extension in self._loaded: 

633 return 

634 

635 try: 

636 import importlib.metadata 

637 except ImportError: 

638 log_loading.warning( 

639 "'%s' not loaded. " 

640 "Scapy extensions require at least Python 3.8+ !" % extension 

641 ) 

642 return 

643 

644 # Get extension distribution 

645 try: 

646 distr = importlib.metadata.distribution(extension) 

647 except importlib.metadata.PackageNotFoundError: 

648 log_loading.warning("The extension '%s' was not found !" % extension) 

649 return 

650 

651 # Check the classifiers 

652 if distr.metadata.get('License-Expression', None) not in self.GPLV2_LICENCES: 

653 log_loading.warning( 

654 "'%s' has no GPLv2 classifier therefore cannot be loaded." % extension 

655 ) 

656 return 

657 

658 # Create the extension 

659 ext = ScapyExt() 

660 

661 # Get the top-level declared "import packages" 

662 # HACK: not available nicely in importlib :/ 

663 packages = distr.read_text("top_level.txt").split() 

664 

665 for package in packages: 

666 scapy_ext = importlib.import_module(package) 

667 

668 # We initialize the plugin by calling it's 'scapy_ext' function 

669 try: 

670 scapy_ext_func = scapy_ext.scapy_ext 

671 except AttributeError: 

672 log_loading.warning( 

673 "'%s' does not look like a Scapy plugin !" % extension 

674 ) 

675 return 

676 try: 

677 scapy_ext_func(ext) 

678 except Exception as ex: 

679 log_loading.warning( 

680 "'%s' failed during initialization with %s" % ( 

681 extension, 

682 ex 

683 ) 

684 ) 

685 return 

686 

687 # Register all the specs provided by this extension 

688 for spec in ext.specs.values(): 

689 self._register_spec(spec) 

690 

691 # Add to the extension list 

692 self.exts.append(ext) 

693 self._loaded.append(extension) 

694 

695 # If there are bash autocompletions, add them 

696 if ext.bash_completions: 

697 from scapy.main import _add_bash_autocompletion 

698 

699 for name, script in ext.bash_completions.items(): 

700 _add_bash_autocompletion(name, script) 

701 

702 def loadall(self) -> None: 

703 """ 

704 Load all extensions registered in conf. 

705 """ 

706 for extension in conf.load_extensions: 

707 self.load(extension) 

708 

709 def __repr__(self): 

710 from scapy.utils import pretty_list 

711 return pretty_list( 

712 [ 

713 (x.name, x.version, [y.fullname for y in x.specs.values()]) 

714 for x in self.exts 

715 ], 

716 [("Name", "Version", "Specs")], 

717 sortBy=0, 

718 ) 

719 

720 

721def _version_checker(module, minver): 

722 # type: (ModuleType, Tuple[int, ...]) -> bool 

723 """Checks that module has a higher version that minver. 

724 

725 params: 

726 - module: a module to test 

727 - minver: a tuple of versions 

728 """ 

729 # We could use LooseVersion, but distutils imports imp which is deprecated 

730 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?' 

731 version_tags_r = re.match( 

732 version_regexp, 

733 getattr(module, "__version__", "") 

734 ) 

735 if not version_tags_r: 

736 return False 

737 version_tags_i = version_tags_r.group(1).split(".") 

738 version_tags = tuple(int(x) for x in version_tags_i) 

739 return bool(version_tags >= minver) 

740 

741 

742def isCryptographyValid(): 

743 # type: () -> bool 

744 """ 

745 Check if the cryptography module >= 2.0.0 is present. This is the minimum 

746 version for most usages in Scapy. 

747 """ 

748 try: 

749 import cryptography 

750 except ImportError: 

751 return False 

752 return _version_checker(cryptography, (2, 0, 0)) 

753 

754 

755def isCryptographyAdvanced(): 

756 # type: () -> bool 

757 """ 

758 Check if the cryptography module is present, and if it supports X25519, 

759 ChaCha20Poly1305 and such. 

760 

761 Notes: 

762 - cryptography >= 2.0 is required 

763 - OpenSSL >= 1.1.0 is required 

764 """ 

765 try: 

766 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501 

767 X25519PrivateKey.generate() 

768 except Exception: 

769 return False 

770 else: 

771 return True 

772 

773 

774def isPyPy(): 

775 # type: () -> bool 

776 """Returns either scapy is running under PyPy or not""" 

777 try: 

778 import __pypy__ # noqa: F401 

779 return True 

780 except ImportError: 

781 return False 

782 

783 

784def _prompt_changer(attr, val, old): 

785 # type: (str, Any, Any) -> Any 

786 """Change the current prompt theme""" 

787 Interceptor.set_from_hook(conf, attr, val) 

788 try: 

789 sys.ps1 = conf.color_theme.prompt(conf.prompt) 

790 except Exception: 

791 pass 

792 try: 

793 apply_ipython_style( 

794 get_ipython() # type: ignore 

795 ) 

796 except NameError: 

797 pass 

798 return getattr(conf, attr, old) 

799 

800 

801def _set_conf_sockets(): 

802 # type: () -> None 

803 """Populate the conf.L2Socket and conf.L3Socket 

804 according to the various use_* parameters 

805 """ 

806 if conf.use_bpf and not BSD: 

807 Interceptor.set_from_hook(conf, "use_bpf", False) 

808 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !") 

809 if not conf.use_pcap and SOLARIS: 

810 Interceptor.set_from_hook(conf, "use_pcap", True) 

811 raise ScapyInvalidPlatformException( 

812 "Scapy only supports libpcap on Solaris !" 

813 ) 

814 # we are already in an Interceptor hook, use Interceptor.set_from_hook 

815 if conf.use_pcap: 

816 try: 

817 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \ 

818 L3pcapSocket 

819 except (OSError, ImportError): 

820 log_loading.warning("No libpcap provider available ! pcap won't be used") 

821 Interceptor.set_from_hook(conf, "use_pcap", False) 

822 else: 

823 conf.L3socket = L3pcapSocket 

824 conf.L3socket6 = functools.partial( 

825 L3pcapSocket, filter="ip6") 

826 conf.L2socket = L2pcapSocket 

827 conf.L2listen = L2pcapListenSocket 

828 elif conf.use_bpf: 

829 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \ 

830 L2bpfSocket, L3bpfSocket 

831 conf.L3socket = L3bpfSocket 

832 conf.L3socket6 = functools.partial( 

833 L3bpfSocket, filter="ip6") 

834 conf.L2socket = L2bpfSocket 

835 conf.L2listen = L2bpfListenSocket 

836 elif LINUX: 

837 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket 

838 conf.L3socket = L3PacketSocket 

839 conf.L3socket6 = cast( 

840 "Type[SuperSocket]", 

841 functools.partial( 

842 L3PacketSocket, 

843 filter="ip6" 

844 ) 

845 ) 

846 conf.L2socket = L2Socket 

847 conf.L2listen = L2ListenSocket 

848 elif WINDOWS: 

849 from scapy.arch.windows import _NotAvailableSocket 

850 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6 

851 conf.L3socket = L3WinSocket 

852 conf.L3socket6 = L3WinSocket6 

853 conf.L2socket = _NotAvailableSocket 

854 conf.L2listen = _NotAvailableSocket 

855 else: 

856 from scapy.supersocket import L3RawSocket, L3RawSocket6 

857 conf.L3socket = L3RawSocket 

858 conf.L3socket6 = L3RawSocket6 

859 # Reload the interfaces 

860 conf.ifaces.reload() 

861 

862 

863def _socket_changer(attr, val, old): 

864 # type: (str, bool, bool) -> Any 

865 if not isinstance(val, bool): 

866 raise TypeError("This argument should be a boolean") 

867 Interceptor.set_from_hook(conf, attr, val) 

868 dependencies = { # Things that will be turned off 

869 "use_pcap": ["use_bpf"], 

870 "use_bpf": ["use_pcap"], 

871 } 

872 restore = {k: getattr(conf, k) for k in dependencies} 

873 del restore[attr] # This is handled directly by _set_conf_sockets 

874 if val: # Only if True 

875 for param in dependencies[attr]: 

876 Interceptor.set_from_hook(conf, param, False) 

877 try: 

878 _set_conf_sockets() 

879 except (ScapyInvalidPlatformException, ImportError) as e: 

880 for key, value in restore.items(): 

881 Interceptor.set_from_hook(conf, key, value) 

882 if isinstance(e, ScapyInvalidPlatformException): 

883 raise 

884 return getattr(conf, attr) 

885 

886 

887def _loglevel_changer(attr, val, old): 

888 # type: (str, int, int) -> int 

889 """Handle a change of conf.logLevel""" 

890 log_scapy.setLevel(val) 

891 return val 

892 

893 

894def _iface_changer(attr, val, old): 

895 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface' 

896 """Resolves the interface in conf.iface""" 

897 if isinstance(val, str): 

898 from scapy.interfaces import resolve_iface 

899 iface = resolve_iface(val) 

900 if old and iface.dummy: 

901 warning( 

902 "This interface is not specified in any provider ! " 

903 "See conf.ifaces output" 

904 ) 

905 return iface 

906 return val 

907 

908 

909def _reset_tls_nss_keys(attr, val, old): 

910 # type: (str, Any, Any) -> Any 

911 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes""" 

912 conf.tls_nss_keys = None 

913 return val 

914 

915 

916class Conf(ConfClass): 

917 """ 

918 This object contains the configuration of Scapy. 

919 """ 

920 version: str = ReadOnlyAttribute("version", VERSION) 

921 session: str = "" #: filename where the session will be saved 

922 interactive = False 

923 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto". 

924 #: Default: Auto 

925 interactive_shell = "auto" 

926 #: Configuration for "ipython" to use jedi (disabled by default) 

927 ipython_use_jedi = False 

928 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...) 

929 stealth = "not implemented" 

930 #: selects the default output interface for srp() and sendp(). 

931 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501 

932 layers: LayersList = LayersList() 

933 commands = CommandsList() # type: CommandsList 

934 #: Codec used by default for ASN1 objects 

935 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec' 

936 #: Default size for ASN1 objects 

937 ASN1_default_long_size = 0 

938 #: choose the AS resolver class to use 

939 AS_resolver = None # type: scapy.as_resolvers.AS_resolver 

940 dot15d4_protocol = None # Used in dot15d4.py 

941 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer) 

942 #: if 0, doesn't check that IPID matches between IP sent and 

943 #: ICMP IP citation received 

944 #: if 1, checks that they either are equal or byte swapped 

945 #: equals (bug in some IP stacks) 

946 #: if 2, strictly checks that they are equals 

947 checkIPID = False 

948 #: if 1, checks IP src in IP and ICMP IP citation match 

949 #: (bug in some NAT stacks) 

950 checkIPsrc = True 

951 checkIPaddr = True 

952 #: if True, checks that IP-in-IP layers match. If False, do 

953 #: not check IP layers that encapsulates another IP layer 

954 checkIPinIP = True 

955 #: if 1, also check that TCP seq and ack match the 

956 #: ones in ICMP citation 

957 check_TCPerror_seqack = False 

958 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose) 

959 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer) 

960 #: default mode for the promiscuous mode of a socket (to get answers if you 

961 #: spoof on a lan) 

962 sniff_promisc = True # type: bool 

963 raw_layer = None # type: Type[Packet] 

964 raw_summary = False # type: Union[bool, Callable[[bytes], Any]] 

965 padding_layer = None # type: Type[Packet] 

966 default_l2 = None # type: Type[Packet] 

967 l2types: Num2Layer = Num2Layer() 

968 l3types: Num2Layer = Num2Layer() 

969 L3socket = None # type: Type[scapy.supersocket.SuperSocket] 

970 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket] 

971 L2socket = None # type: Type[scapy.supersocket.SuperSocket] 

972 L2listen = None # type: Type[scapy.supersocket.SuperSocket] 

973 BTsocket = None # type: Type[scapy.supersocket.SuperSocket] 

974 min_pkt_size = 60 

975 #: holds MIB direct access dictionary 

976 mib = None # type: 'scapy.asn1.mib.MIBDict' 

977 bufsize = 2**16 

978 #: history file 

979 histfile: str = os.getenv( 

980 'SCAPY_HISTFILE', 

981 os.path.join( 

982 os.path.expanduser("~"), 

983 ".config", "scapy", "history" 

984 ) 

985 ) 

986 #: includes padding in disassembled packets 

987 padding = 1 

988 #: BPF filter for packets to ignore 

989 except_filter = "" 

990 #: bpf filter added to every sniffing socket to exclude traffic 

991 #: from analysis 

992 filter = "" 

993 #: when 1, store received packet that are not matched into `debug.recv` 

994 debug_match = False 

995 #: When 1, print some TLS session secrets when they are computed, and 

996 #: warn about the session recognition. 

997 debug_tls = False 

998 wepkey = "" 

999 #: holds the Scapy interface list and manager 

1000 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict' 

1001 #: holds the cache of interfaces loaded from Libpcap 

1002 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str, int]] 

1003 # `neighbor` will be filed by scapy.layers.l2 

1004 neighbor = None # type: 'scapy.layers.l2.Neighbor' 

1005 #: holds the name servers IP/hosts used for custom DNS resolution 

1006 nameservers = None # type: str 

1007 #: automatically load IPv4 routes on startup. Disable this if your 

1008 #: routing table is too big. 

1009 route_autoload = True 

1010 #: automatically load IPv6 routes on startup. Disable this if your 

1011 #: routing table is too big. 

1012 route6_autoload = True 

1013 #: holds the Scapy IPv4 routing table and provides methods to 

1014 #: manipulate it 

1015 route = None # type: 'scapy.route.Route' 

1016 # `route` will be filed by route.py 

1017 #: holds the Scapy IPv6 routing table and provides methods to 

1018 #: manipulate it 

1019 route6 = None # type: 'scapy.route6.Route6' 

1020 manufdb = None # type: 'scapy.data.ManufDA' 

1021 ethertypes = None # type: 'scapy.data.EtherDA' 

1022 protocols = None # type: 'scapy.dadict.DADict[int, str]' 

1023 services_udp = None # type: 'scapy.dadict.DADict[int, str]' 

1024 services_tcp = None # type: 'scapy.dadict.DADict[int, str]' 

1025 services_sctp = None # type: 'scapy.dadict.DADict[int, str]' 

1026 # 'route6' will be filed by route6.py 

1027 teredoPrefix = "" # type: str 

1028 teredoServerPort = None # type: int 

1029 auto_fragment = True 

1030 #: raise exception when a packet dissector raises an exception 

1031 debug_dissector = False 

1032 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer) 

1033 #: how much time between warnings from the same place 

1034 warning_threshold = 5 

1035 prog: ProgPath = ProgPath() 

1036 #: holds list of fields for which resolution should be done 

1037 resolve: Resolve = Resolve() 

1038 #: holds list of enum fields for which conversion to string 

1039 #: should NOT be done 

1040 noenum: Resolve = Resolve() 

1041 emph: Emphasize = Emphasize() 

1042 #: read only attribute to show if PyPy is in use 

1043 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy()) 

1044 #: use libpcap integration or not. Changing this value will update 

1045 #: the conf.L[2/3] sockets 

1046 use_pcap: bool = Interceptor( 

1047 "use_pcap", 

1048 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"), 

1049 _socket_changer 

1050 ) 

1051 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer) 

1052 use_npcap = False 

1053 ipv6_enabled: bool = socket.has_ipv6 

1054 stats_classic_protocols = [] # type: List[Type[Packet]] 

1055 stats_dot11_protocols = [] # type: List[Type[Packet]] 

1056 temp_files = [] # type: List[str] 

1057 #: netcache holds time-based caches for net operations 

1058 netcache: NetCache = NetCache() 

1059 geoip_city = None 

1060 #: Scapy extensions that are loaded automatically on load 

1061 load_extensions: List[str] = [] 

1062 # can, tls, http and a few others are not loaded by default 

1063 load_layers: List[str] = [ 

1064 'bluetooth', 

1065 'bluetooth4LE', 

1066 'dcerpc', 

1067 'dhcp', 

1068 'dhcp6', 

1069 'dns', 

1070 'dot11', 

1071 'dot15d4', 

1072 'eap', 

1073 'gprs', 

1074 'gssapi', 

1075 'hsrp', 

1076 'inet', 

1077 'inet6', 

1078 'ipsec', 

1079 'ir', 

1080 'isakmp', 

1081 'kerberos', 

1082 'l2', 

1083 'l2tp', 

1084 'ldap', 

1085 'llmnr', 

1086 'lltd', 

1087 'mgcp', 

1088 'msrpce.rpcclient', 

1089 'msrpce.rpcserver', 

1090 'mobileip', 

1091 'netbios', 

1092 'netflow', 

1093 'ntlm', 

1094 'ntp', 

1095 'ppi', 

1096 'ppp', 

1097 'pptp', 

1098 'radius', 

1099 'rip', 

1100 'rtp', 

1101 'sctp', 

1102 'sixlowpan', 

1103 'skinny', 

1104 'smb', 

1105 'smb2', 

1106 'smbclient', 

1107 'smbserver', 

1108 'snmp', 

1109 'spnego', 

1110 'tftp', 

1111 'vrrp', 

1112 'vxlan', 

1113 'x509', 

1114 'zigbee' 

1115 ] 

1116 #: a dict which can be used by contrib layers to store local 

1117 #: configuration 

1118 contribs = dict() # type: Dict[str, Any] 

1119 exts: ExtsManager = ExtsManager() 

1120 crypto_valid = isCryptographyValid() 

1121 crypto_valid_advanced = isCryptographyAdvanced() 

1122 #: controls whether or not to display the fancy banner 

1123 fancy_banner = True 

1124 #: controls whether tables (conf.iface, conf.route...) should be cropped 

1125 #: to fit the terminal 

1126 auto_crop_tables = True 

1127 #: how often to check for new packets. 

1128 #: Defaults to 0.05s. 

1129 recv_poll_rate = 0.05 

1130 #: When True, raise exception if no dst MAC found otherwise broadcast. 

1131 #: Default is False. 

1132 raise_no_dst_mac = False 

1133 loopback_name: str = "lo" if LINUX else "lo0" 

1134 nmap_base = "" # type: str 

1135 nmap_kdb = None # type: Optional[NmapKnowledgeBase] 

1136 #: a safety mechanism: the maximum amount of items included in a PacketListField 

1137 #: or a FieldListField 

1138 max_list_count = 100 

1139 #: When the TLS module is loaded (not by default), the following turns on sessions 

1140 tls_session_enable = False 

1141 #: Filename containing NSS Keys Log 

1142 tls_nss_filename = Interceptor( 

1143 "tls_nss_filename", 

1144 None, 

1145 _reset_tls_nss_keys 

1146 ) 

1147 #: Dictionary containing parsed NSS Keys 

1148 tls_nss_keys: Dict[str, bytes] = None 

1149 #: Whether to use NDR64 by default instead of NDR 32 

1150 ndr64: bool = True 

1151 #: When TCPSession is used, parse DCE/RPC sessions automatically. 

1152 #: This should be used for passive sniffing. 

1153 dcerpc_session_enable = False 

1154 #: If a capture is missing the first DCE/RPC binding message, we might incorrectly 

1155 #: assume that header signing isn't used. This forces it on. 

1156 dcerpc_force_header_signing = False 

1157 #: Windows SSPs for sniffing. This is used with 

1158 #: dcerpc_session_enable 

1159 winssps_passive = [] 

1160 #: Disables auto-stripping of StrFixedLenField for debugging purposes 

1161 debug_strfixedlenfield = False 

1162 

1163 def __getattribute__(self, attr): 

1164 # type: (str) -> Any 

1165 # Those are loaded on runtime to avoid import loops 

1166 if attr == "manufdb": 

1167 from scapy.data import MANUFDB 

1168 return MANUFDB 

1169 if attr == "ethertypes": 

1170 from scapy.data import ETHER_TYPES 

1171 return ETHER_TYPES 

1172 if attr == "protocols": 

1173 from scapy.data import IP_PROTOS 

1174 return IP_PROTOS 

1175 if attr == "services_udp": 

1176 from scapy.data import UDP_SERVICES 

1177 return UDP_SERVICES 

1178 if attr == "services_tcp": 

1179 from scapy.data import TCP_SERVICES 

1180 return TCP_SERVICES 

1181 if attr == "services_sctp": 

1182 from scapy.data import SCTP_SERVICES 

1183 return SCTP_SERVICES 

1184 if attr == "iface6": 

1185 warnings.warn( 

1186 "conf.iface6 is deprecated in favor of conf.iface", 

1187 DeprecationWarning 

1188 ) 

1189 attr = "iface" 

1190 return object.__getattribute__(self, attr) 

1191 

1192 

1193if not Conf.ipv6_enabled: 

1194 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501 

1195 for m in ["inet6", "dhcp6", "sixlowpan"]: 

1196 if m in Conf.load_layers: 

1197 Conf.load_layers.remove(m) 

1198 

1199conf = Conf() # type: Conf 

1200 

1201 

1202def crypto_validator(func): 

1203 # type: (DecoratorCallable) -> DecoratorCallable 

1204 """ 

1205 This a decorator to be used for any method relying on the cryptography library. # noqa: E501 

1206 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'. 

1207 """ 

1208 def func_in(*args, **kwargs): 

1209 # type: (*Any, **Any) -> Any 

1210 if not conf.crypto_valid: 

1211 raise ImportError("Cannot execute crypto-related method! " 

1212 "Please install python-cryptography v1.7 or later.") # noqa: E501 

1213 return func(*args, **kwargs) 

1214 return func_in 

1215 

1216 

1217def scapy_delete_temp_files(): 

1218 # type: () -> None 

1219 for f in conf.temp_files: 

1220 try: 

1221 os.unlink(f) 

1222 except Exception: 

1223 pass 

1224 del conf.temp_files[:] 

1225 

1226 

1227atexit.register(scapy_delete_temp_files)