Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/config.py: 53%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

676 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Implementation of the configuration object. 

8""" 

9 

10import atexit 

11import copy 

12import functools 

13import os 

14import pathlib 

15import re 

16import socket 

17import sys 

18import time 

19import warnings 

20 

21from dataclasses import dataclass 

22from enum import Enum 

23 

24import importlib 

25import importlib.abc 

26import importlib.util 

27 

28import scapy 

29from scapy import VERSION 

30from scapy.base_classes import BasePacket 

31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS 

32from scapy.error import ( 

33 log_loading, 

34 log_scapy, 

35 ScapyInvalidPlatformException, 

36 warning, 

37) 

38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style 

39 

40# Typing imports 

41from typing import ( 

42 cast, 

43 Any, 

44 Callable, 

45 Dict, 

46 Iterator, 

47 List, 

48 NoReturn, 

49 Optional, 

50 Set, 

51 Tuple, 

52 Type, 

53 Union, 

54 overload, 

55 TYPE_CHECKING, 

56) 

57from types import ModuleType 

58from scapy.compat import DecoratorCallable 

59 

60if TYPE_CHECKING: 

61 # Do not import at runtime 

62 import scapy.as_resolvers 

63 from scapy.modules.nmap import NmapKnowledgeBase 

64 from scapy.packet import Packet 

65 from scapy.supersocket import SuperSocket # noqa: F401 

66 import scapy.asn1.asn1 

67 import scapy.asn1.mib 

68 

69############ 

70# Config # 

71############ 

72 

73 

74class ConfClass(object): 

75 def configure(self, cnf): 

76 # type: (ConfClass) -> None 

77 self.__dict__ = cnf.__dict__.copy() 

78 

79 def __repr__(self): 

80 # type: () -> str 

81 return str(self) 

82 

83 def __str__(self): 

84 # type: () -> str 

85 s = "" 

86 dkeys = self.__class__.__dict__.copy() 

87 dkeys.update(self.__dict__) 

88 keys = sorted(dkeys) 

89 for i in keys: 

90 if i[0] != "_": 

91 r = repr(getattr(self, i)) 

92 r = " ".join(r.split()) 

93 wlen = 76 - max(len(i), 10) 

94 if len(r) > wlen: 

95 r = r[:wlen - 3] + "..." 

96 s += "%-10s = %s\n" % (i, r) 

97 return s[:-1] 

98 

99 

100class Interceptor(object): 

101 def __init__(self, 

102 name, # type: str 

103 default, # type: Any 

104 hook, # type: Callable[..., Any] 

105 args=None, # type: Optional[List[Any]] 

106 kargs=None # type: Optional[Dict[str, Any]] 

107 ): 

108 # type: (...) -> None 

109 self.name = name 

110 self.intname = "_intercepted_%s" % name 

111 self.default = default 

112 self.hook = hook 

113 self.args = args if args is not None else [] 

114 self.kargs = kargs if kargs is not None else {} 

115 

116 def __get__(self, obj, typ=None): 

117 # type: (Conf, Optional[type]) -> Any 

118 if not hasattr(obj, self.intname): 

119 setattr(obj, self.intname, self.default) 

120 return getattr(obj, self.intname) 

121 

122 @staticmethod 

123 def set_from_hook(obj, name, val): 

124 # type: (Conf, str, bool) -> None 

125 int_name = "_intercepted_%s" % name 

126 setattr(obj, int_name, val) 

127 

128 def __set__(self, obj, val): 

129 # type: (Conf, Any) -> None 

130 old = getattr(obj, self.intname, self.default) 

131 val = self.hook(self.name, val, old, *self.args, **self.kargs) 

132 setattr(obj, self.intname, val) 

133 

134 

135def _readonly(name): 

136 # type: (str) -> NoReturn 

137 default = Conf.__dict__[name].default 

138 Interceptor.set_from_hook(conf, name, default) 

139 raise ValueError("Read-only value !") 

140 

141 

142ReadOnlyAttribute = functools.partial( 

143 Interceptor, 

144 hook=(lambda name, *args, **kwargs: _readonly(name)) 

145) 

146ReadOnlyAttribute.__doc__ = "Read-only class attribute" 

147 

148 

149class ProgPath(ConfClass): 

150 _default: str = "<System default>" 

151 universal_open: str = "open" if DARWIN else "xdg-open" 

152 pdfreader: str = universal_open 

153 psreader: str = universal_open 

154 svgreader: str = universal_open 

155 dot: str = "dot" 

156 display: str = "display" 

157 tcpdump: str = "tcpdump" 

158 tcpreplay: str = "tcpreplay" 

159 hexedit: str = "hexer" 

160 tshark: str = "tshark" 

161 wireshark: str = "wireshark" 

162 ifconfig: str = "ifconfig" 

163 extcap_folders: List[str] = [ 

164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"), 

165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap", 

166 ] 

167 

168 

169class ConfigFieldList: 

170 def __init__(self): 

171 # type: () -> None 

172 self.fields = set() # type: Set[Any] 

173 self.layers = set() # type: Set[Any] 

174 

175 @staticmethod 

176 def _is_field(f): 

177 # type: (Any) -> bool 

178 return hasattr(f, "owners") 

179 

180 def _recalc_layer_list(self): 

181 # type: () -> None 

182 self.layers = {owner for f in self.fields for owner in f.owners} 

183 

184 def add(self, *flds): 

185 # type: (*Any) -> None 

186 self.fields |= {f for f in flds if self._is_field(f)} 

187 self._recalc_layer_list() 

188 

189 def remove(self, *flds): 

190 # type: (*Any) -> None 

191 self.fields -= set(flds) 

192 self._recalc_layer_list() 

193 

194 def __contains__(self, elt): 

195 # type: (Any) -> bool 

196 if isinstance(elt, BasePacket): 

197 return elt in self.layers 

198 return elt in self.fields 

199 

200 def __repr__(self): 

201 # type: () -> str 

202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501 

203 

204 

205class Emphasize(ConfigFieldList): 

206 pass 

207 

208 

209class Resolve(ConfigFieldList): 

210 pass 

211 

212 

213class Num2Layer: 

214 def __init__(self): 

215 # type: () -> None 

216 self.num2layer = {} # type: Dict[int, Type[Packet]] 

217 self.layer2num = {} # type: Dict[Type[Packet], int] 

218 

219 def register(self, num, layer): 

220 # type: (int, Type[Packet]) -> None 

221 self.register_num2layer(num, layer) 

222 self.register_layer2num(num, layer) 

223 

224 def register_num2layer(self, num, layer): 

225 # type: (int, Type[Packet]) -> None 

226 self.num2layer[num] = layer 

227 

228 def register_layer2num(self, num, layer): 

229 # type: (int, Type[Packet]) -> None 

230 self.layer2num[layer] = num 

231 

232 @overload 

233 def __getitem__(self, item): 

234 # type: (Type[Packet]) -> int 

235 pass 

236 

237 @overload 

238 def __getitem__(self, item): # noqa: F811 

239 # type: (int) -> Type[Packet] 

240 pass 

241 

242 def __getitem__(self, item): # noqa: F811 

243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]] 

244 if isinstance(item, int): 

245 return self.num2layer[item] 

246 else: 

247 return self.layer2num[item] 

248 

249 def __contains__(self, item): 

250 # type: (Union[int, Type[Packet]]) -> bool 

251 if isinstance(item, int): 

252 return item in self.num2layer 

253 else: 

254 return item in self.layer2num 

255 

256 def get(self, 

257 item, # type: Union[int, Type[Packet]] 

258 default=None, # type: Optional[Type[Packet]] 

259 ): 

260 # type: (...) -> Optional[Union[int, Type[Packet]]] 

261 return self[item] if item in self else default 

262 

263 def __repr__(self): 

264 # type: () -> str 

265 lst = [] 

266 for num, layer in self.num2layer.items(): 

267 if layer in self.layer2num and self.layer2num[layer] == num: 

268 dir = "<->" 

269 else: 

270 dir = " ->" 

271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__, 

272 layer._name))) 

273 for layer, num in self.layer2num.items(): 

274 if num not in self.num2layer or self.num2layer[num] != layer: 

275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__, 

276 layer._name))) 

277 lst.sort() 

278 return "\n".join(y for x, y in lst) 

279 

280 

281class LayersList(List[Type['scapy.packet.Packet']]): 

282 def __init__(self): 

283 # type: () -> None 

284 list.__init__(self) 

285 self.ldict = {} # type: Dict[str, List[Type[Packet]]] 

286 self.filtered = False 

287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501 

288 

289 def __repr__(self): 

290 # type: () -> str 

291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name) 

292 for layer in self) 

293 

294 def register(self, layer): 

295 # type: (Type[Packet]) -> None 

296 self.append(layer) 

297 

298 # Skip arch* modules 

299 if layer.__module__.startswith("scapy.arch."): 

300 return 

301 

302 # Register in module 

303 if layer.__module__ not in self.ldict: 

304 self.ldict[layer.__module__] = [] 

305 self.ldict[layer.__module__].append(layer) 

306 

307 def layers(self): 

308 # type: () -> List[Tuple[str, str]] 

309 result = [] 

310 # This import may feel useless, but it is required for the eval below 

311 import scapy # noqa: F401 

312 try: 

313 import builtins # noqa: F401 

314 except ImportError: 

315 import __builtin__ # noqa: F401 

316 for lay in self.ldict: 

317 try: 

318 doc = eval(lay).__doc__ 

319 except AttributeError: 

320 continue 

321 result.append((lay, doc.strip().split("\n")[0] if doc else lay)) 

322 return result 

323 

324 def filter(self, items): 

325 # type: (List[Type[Packet]]) -> None 

326 """Disable dissection of unused layers to speed up dissection""" 

327 if self.filtered: 

328 raise ValueError("Already filtered. Please disable it first") 

329 for lay in self.ldict.values(): 

330 for cls in lay: 

331 if cls not in self._backup_dict: 

332 self._backup_dict[cls] = cls.payload_guess[:] 

333 cls.payload_guess = [ 

334 y for y in cls.payload_guess if y[1] in items 

335 ] 

336 self.filtered = True 

337 

338 def unfilter(self): 

339 # type: () -> None 

340 """Re-enable dissection for all layers""" 

341 if not self.filtered: 

342 raise ValueError("Not filtered. Please filter first") 

343 for lay in self.ldict.values(): 

344 for cls in lay: 

345 cls.payload_guess = self._backup_dict[cls] 

346 self._backup_dict.clear() 

347 self.filtered = False 

348 

349 

350class CommandsList(List[Callable[..., Any]]): 

351 def __repr__(self): 

352 # type: () -> str 

353 s = [] 

354 for li in sorted(self, key=lambda x: x.__name__): 

355 doc = li.__doc__ if li.__doc__ else "--" 

356 doc = doc.lstrip().split('\n', 1)[0] 

357 s.append("%-22s: %s" % (li.__name__, doc)) 

358 return "\n".join(s) 

359 

360 def register(self, cmd): 

361 # type: (DecoratorCallable) -> DecoratorCallable 

362 self.append(cmd) 

363 return cmd # return cmd so that method can be used as a decorator 

364 

365 

366def lsc(): 

367 # type: () -> None 

368 """Displays Scapy's default commands""" 

369 print(repr(conf.commands)) 

370 

371 

372class CacheInstance(Dict[str, Any]): 

373 __slots__ = ["timeout", "name", "_timetable"] 

374 

375 def __init__(self, name="noname", timeout=None): 

376 # type: (str, Optional[int]) -> None 

377 self.timeout = timeout 

378 self.name = name 

379 self._timetable = {} # type: Dict[str, float] 

380 

381 def flush(self): 

382 # type: () -> None 

383 self._timetable.clear() 

384 self.clear() 

385 

386 def __getitem__(self, item): 

387 # type: (str) -> Any 

388 if item in self.__slots__: 

389 return object.__getattribute__(self, item) 

390 if not self.__contains__(item): 

391 raise KeyError(item) 

392 return super(CacheInstance, self).__getitem__(item) 

393 

394 def __contains__(self, item): 

395 if not super(CacheInstance, self).__contains__(item): 

396 return False 

397 if self.timeout is not None: 

398 t = self._timetable[item] 

399 if time.time() - t > self.timeout: 

400 return False 

401 return True 

402 

403 def get(self, item, default=None): 

404 # type: (str, Optional[Any]) -> Any 

405 # overloading this method is needed to force the dict to go through 

406 # the timetable check 

407 try: 

408 return self[item] 

409 except KeyError: 

410 return default 

411 

412 def __setitem__(self, item, v): 

413 # type: (str, str) -> None 

414 if item in self.__slots__: 

415 return object.__setattr__(self, item, v) 

416 self._timetable[item] = time.time() 

417 super(CacheInstance, self).__setitem__(item, v) 

418 

419 def update(self, 

420 other, # type: Any 

421 **kwargs # type: Any 

422 ): 

423 # type: (...) -> None 

424 for key, value in other.items(): 

425 # We only update an element from `other` either if it does 

426 # not exist in `self` or if the entry in `self` is older. 

427 if key not in self or self._timetable[key] < other._timetable[key]: 

428 dict.__setitem__(self, key, value) 

429 self._timetable[key] = other._timetable[key] 

430 

431 def iteritems(self): 

432 # type: () -> Iterator[Tuple[str, Any]] 

433 if self.timeout is None: 

434 return super(CacheInstance, self).items() 

435 t0 = time.time() 

436 return ( 

437 (k, v) 

438 for (k, v) in super(CacheInstance, self).items() 

439 if t0 - self._timetable[k] < self.timeout 

440 ) 

441 

442 def iterkeys(self): 

443 # type: () -> Iterator[str] 

444 if self.timeout is None: 

445 return super(CacheInstance, self).keys() 

446 t0 = time.time() 

447 return ( 

448 k 

449 for k in super(CacheInstance, self).keys() 

450 if t0 - self._timetable[k] < self.timeout 

451 ) 

452 

453 def __iter__(self): 

454 # type: () -> Iterator[str] 

455 return self.iterkeys() 

456 

457 def itervalues(self): 

458 # type: () -> Iterator[Tuple[str, Any]] 

459 if self.timeout is None: 

460 return super(CacheInstance, self).values() 

461 t0 = time.time() 

462 return ( 

463 v 

464 for (k, v) in super(CacheInstance, self).items() 

465 if t0 - self._timetable[k] < self.timeout 

466 ) 

467 

468 def items(self): 

469 # type: () -> Any 

470 return list(self.iteritems()) 

471 

472 def keys(self): 

473 # type: () -> Any 

474 return list(self.iterkeys()) 

475 

476 def values(self): 

477 # type: () -> Any 

478 return list(self.itervalues()) 

479 

480 def __len__(self): 

481 # type: () -> int 

482 if self.timeout is None: 

483 return super(CacheInstance, self).__len__() 

484 return len(self.keys()) 

485 

486 def summary(self): 

487 # type: () -> str 

488 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501 

489 

490 def __repr__(self): 

491 # type: () -> str 

492 s = [] 

493 if self: 

494 mk = max(len(k) for k in self) 

495 fmt = "%%-%is %%s" % (mk + 1) 

496 for item in self.items(): 

497 s.append(fmt % item) 

498 return "\n".join(s) 

499 

500 def copy(self): 

501 # type: () -> CacheInstance 

502 return copy.copy(self) 

503 

504 

505class NetCache: 

506 def __init__(self): 

507 # type: () -> None 

508 self._caches_list = [] # type: List[CacheInstance] 

509 

510 def add_cache(self, cache): 

511 # type: (CacheInstance) -> None 

512 self._caches_list.append(cache) 

513 setattr(self, cache.name, cache) 

514 

515 def new_cache(self, name, timeout=None): 

516 # type: (str, Optional[int]) -> CacheInstance 

517 c = CacheInstance(name=name, timeout=timeout) 

518 self.add_cache(c) 

519 return c 

520 

521 def __delattr__(self, attr): 

522 # type: (str) -> NoReturn 

523 raise AttributeError("Cannot delete attributes") 

524 

525 def update(self, other): 

526 # type: (NetCache) -> None 

527 for co in other._caches_list: 

528 if hasattr(self, co.name): 

529 getattr(self, co.name).update(co) 

530 else: 

531 self.add_cache(co.copy()) 

532 

533 def flush(self): 

534 # type: () -> None 

535 for c in self._caches_list: 

536 c.flush() 

537 

538 def __repr__(self): 

539 # type: () -> str 

540 return "\n".join(c.summary() for c in self._caches_list) 

541 

542 

543class ScapyExt: 

544 __slots__ = ["specs", "name", "version", "bash_completions"] 

545 

546 class MODE(Enum): 

547 LAYERS = "layers" 

548 CONTRIB = "contrib" 

549 MODULES = "modules" 

550 

551 @dataclass 

552 class ScapyExtSpec: 

553 fullname: str 

554 mode: 'ScapyExt.MODE' 

555 spec: Any 

556 default: bool 

557 

558 def __init__(self): 

559 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {} 

560 self.bash_completions = {} 

561 

562 def config(self, name, version): 

563 self.name = name 

564 self.version = version 

565 

566 def register(self, name, mode, path, default=None): 

567 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !" 

568 fullname = f"scapy.{mode.value}.{name}" 

569 spec = importlib.util.spec_from_file_location( 

570 fullname, 

571 str(path), 

572 ) 

573 spec = self.ScapyExtSpec( 

574 fullname=fullname, 

575 mode=mode, 

576 spec=spec, 

577 default=default or False, 

578 ) 

579 if default is None: 

580 spec.default = bool(importlib.util.find_spec(spec.fullname)) 

581 self.specs[fullname] = spec 

582 

583 def register_bashcompletion(self, script: pathlib.Path): 

584 self.bash_completions[script.name] = script 

585 

586 def __repr__(self): 

587 return "<ScapyExt %s %s (%s specs)>" % ( 

588 self.name, 

589 self.version, 

590 len(self.specs), 

591 ) 

592 

593 

594class ExtsManager(importlib.abc.MetaPathFinder): 

595 __slots__ = ["exts", "all_specs"] 

596 

597 GPLV2_LICENCES = [ 

598 "GPL-2.0-only", 

599 "GPL-2.0-or-later", 

600 ] 

601 

602 def __init__(self): 

603 self.exts: List[ScapyExt] = [] 

604 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {} 

605 self._loaded: List[str] = [] 

606 # Add to meta_path as we are an import provider 

607 if self not in sys.meta_path: 

608 sys.meta_path.append(self) 

609 

610 def find_spec(self, fullname, path, target=None): 

611 if fullname in self.all_specs: 

612 return self.all_specs[fullname].spec 

613 

614 def invalidate_caches(self): 

615 pass 

616 

617 def _register_spec(self, spec): 

618 # Register to known specs 

619 self.all_specs[spec.fullname] = spec 

620 

621 # If default=True, inject it in the currently loaded modules 

622 if spec.default: 

623 loader = importlib.util.LazyLoader(spec.spec.loader) 

624 spec.spec.loader = loader 

625 module = importlib.util.module_from_spec(spec.spec) 

626 sys.modules[spec.fullname] = module 

627 loader.exec_module(module) 

628 

629 def load(self, extension: str): 

630 """ 

631 Load a scapy extension. 

632 

633 :param extension: the name of the extension, as installed. 

634 """ 

635 if extension in self._loaded: 

636 return 

637 

638 try: 

639 import importlib.metadata 

640 except ImportError: 

641 log_loading.warning( 

642 "'%s' not loaded. " 

643 "Scapy extensions require at least Python 3.8+ !" % extension 

644 ) 

645 return 

646 

647 # Get extension distribution 

648 try: 

649 distr = importlib.metadata.distribution(extension) 

650 except importlib.metadata.PackageNotFoundError: 

651 log_loading.warning("The extension '%s' was not found !" % extension) 

652 return 

653 

654 # Check the classifiers 

655 if ( 

656 distr.metadata.get('License-Expression', None) not in self.GPLV2_LICENCES 

657 and distr.metadata.get('License', None) not in self.GPLV2_LICENCES 

658 ): 

659 log_loading.warning( 

660 "'%s' has no GPLv2 classifier therefore cannot be loaded." % extension 

661 ) 

662 return 

663 

664 # Create the extension 

665 ext = ScapyExt() 

666 

667 # Get the top-level declared "import packages" 

668 # HACK: not available nicely in importlib :/ 

669 packages = distr.read_text("top_level.txt").split() 

670 

671 for package in packages: 

672 scapy_ext = importlib.import_module(package) 

673 

674 # We initialize the plugin by calling it's 'scapy_ext' function 

675 try: 

676 scapy_ext_func = scapy_ext.scapy_ext 

677 except AttributeError: 

678 log_loading.warning( 

679 "'%s' does not look like a Scapy plugin !" % extension 

680 ) 

681 return 

682 try: 

683 scapy_ext_func(ext) 

684 except Exception as ex: 

685 log_loading.warning( 

686 "'%s' failed during initialization with %s" % ( 

687 extension, 

688 ex 

689 ) 

690 ) 

691 return 

692 

693 # Register all the specs provided by this extension 

694 for spec in ext.specs.values(): 

695 self._register_spec(spec) 

696 

697 # Add to the extension list 

698 self.exts.append(ext) 

699 self._loaded.append(extension) 

700 

701 # If there are bash autocompletions, add them 

702 if ext.bash_completions: 

703 from scapy.main import _add_bash_autocompletion 

704 

705 for name, script in ext.bash_completions.items(): 

706 _add_bash_autocompletion(name, script) 

707 

708 def loadall(self) -> None: 

709 """ 

710 Load all extensions registered in conf. 

711 """ 

712 for extension in conf.load_extensions: 

713 self.load(extension) 

714 

715 def __repr__(self): 

716 from scapy.utils import pretty_list 

717 return pretty_list( 

718 [ 

719 (x.name, x.version, [y.fullname for y in x.specs.values()]) 

720 for x in self.exts 

721 ], 

722 [("Name", "Version", "Specs")], 

723 sortBy=0, 

724 ) 

725 

726 

727def _version_checker(module, minver): 

728 # type: (ModuleType, Tuple[int, ...]) -> bool 

729 """Checks that module has a higher version that minver. 

730 

731 params: 

732 - module: a module to test 

733 - minver: a tuple of versions 

734 """ 

735 # We could use LooseVersion, but distutils imports imp which is deprecated 

736 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?' 

737 version_tags_r = re.match( 

738 version_regexp, 

739 getattr(module, "__version__", "") 

740 ) 

741 if not version_tags_r: 

742 return False 

743 version_tags_i = version_tags_r.group(1).split(".") 

744 version_tags = tuple(int(x) for x in version_tags_i) 

745 return bool(version_tags >= minver) 

746 

747 

748def isCryptographyValid(): 

749 # type: () -> bool 

750 """ 

751 Check if the cryptography module >= 2.0.0 is present. This is the minimum 

752 version for most usages in Scapy. 

753 """ 

754 # Check import 

755 try: 

756 import cryptography 

757 except ImportError: 

758 return False 

759 

760 # Check minimum version 

761 return _version_checker(cryptography, (2, 0, 0)) 

762 

763 

764def isCryptographyAdvanced(): 

765 # type: () -> bool 

766 """ 

767 Check if the cryptography module is present, and if it supports X25519, 

768 ChaCha20Poly1305 and such. 

769 

770 Notes: 

771 - cryptography >= 2.0 is required 

772 - OpenSSL >= 1.1.0 is required 

773 """ 

774 try: 

775 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501 

776 X25519PrivateKey.generate() 

777 except Exception: 

778 return False 

779 else: 

780 return True 

781 

782 

783def isCryptographyBackendCompatible() -> bool: 

784 """ 

785 Check if the cryptography backend is compatible 

786 """ 

787 # Check for LibreSSL 

788 try: 

789 from cryptography.hazmat.backends import default_backend 

790 if "LibreSSL" in default_backend().openssl_version_text(): 

791 # BUG: LibreSSL - https://marc.info/?l=libressl&m=173846028619304&w=2 

792 # It takes 5 whole minutes to import RFC3526's modp parameters. This is 

793 # not okay. 

794 return False 

795 return True 

796 except Exception: 

797 return True 

798 

799 

800def isPyPy(): 

801 # type: () -> bool 

802 """Returns either scapy is running under PyPy or not""" 

803 try: 

804 import __pypy__ # noqa: F401 

805 return True 

806 except ImportError: 

807 return False 

808 

809 

810def _prompt_changer(attr, val, old): 

811 # type: (str, Any, Any) -> Any 

812 """Change the current prompt theme""" 

813 Interceptor.set_from_hook(conf, attr, val) 

814 try: 

815 sys.ps1 = conf.color_theme.prompt(conf.prompt) 

816 except Exception: 

817 pass 

818 try: 

819 apply_ipython_style( 

820 get_ipython() # type: ignore 

821 ) 

822 except NameError: 

823 pass 

824 return getattr(conf, attr, old) 

825 

826 

827def _set_conf_sockets(): 

828 # type: () -> None 

829 """Populate the conf.L2Socket and conf.L3Socket 

830 according to the various use_* parameters 

831 """ 

832 if conf.use_bpf and not BSD: 

833 Interceptor.set_from_hook(conf, "use_bpf", False) 

834 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !") 

835 if not conf.use_pcap and SOLARIS: 

836 Interceptor.set_from_hook(conf, "use_pcap", True) 

837 raise ScapyInvalidPlatformException( 

838 "Scapy only supports libpcap on Solaris !" 

839 ) 

840 # we are already in an Interceptor hook, use Interceptor.set_from_hook 

841 if conf.use_pcap: 

842 try: 

843 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \ 

844 L3pcapSocket 

845 except (OSError, ImportError): 

846 log_loading.warning("No libpcap provider available ! pcap won't be used") 

847 Interceptor.set_from_hook(conf, "use_pcap", False) 

848 else: 

849 conf.L3socket = L3pcapSocket 

850 conf.L3socket6 = functools.partial( 

851 L3pcapSocket, filter="ip6") 

852 conf.L2socket = L2pcapSocket 

853 conf.L2listen = L2pcapListenSocket 

854 elif conf.use_bpf: 

855 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \ 

856 L2bpfSocket, L3bpfSocket 

857 conf.L3socket = L3bpfSocket 

858 conf.L3socket6 = functools.partial( 

859 L3bpfSocket, filter="ip6") 

860 conf.L2socket = L2bpfSocket 

861 conf.L2listen = L2bpfListenSocket 

862 elif LINUX: 

863 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket 

864 conf.L3socket = L3PacketSocket 

865 conf.L3socket6 = cast( 

866 "Type[SuperSocket]", 

867 functools.partial( 

868 L3PacketSocket, 

869 filter="ip6" 

870 ) 

871 ) 

872 conf.L2socket = L2Socket 

873 conf.L2listen = L2ListenSocket 

874 elif WINDOWS: 

875 from scapy.arch.windows import _NotAvailableSocket 

876 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6 

877 conf.L3socket = L3WinSocket 

878 conf.L3socket6 = L3WinSocket6 

879 conf.L2socket = _NotAvailableSocket 

880 conf.L2listen = _NotAvailableSocket 

881 else: 

882 from scapy.supersocket import L3RawSocket, L3RawSocket6 

883 conf.L3socket = L3RawSocket 

884 conf.L3socket6 = L3RawSocket6 

885 # Reload the interfaces 

886 conf.ifaces.reload() 

887 

888 

889def _socket_changer(attr, val, old): 

890 # type: (str, bool, bool) -> Any 

891 if not isinstance(val, bool): 

892 raise TypeError("This argument should be a boolean") 

893 Interceptor.set_from_hook(conf, attr, val) 

894 dependencies = { # Things that will be turned off 

895 "use_pcap": ["use_bpf"], 

896 "use_bpf": ["use_pcap"], 

897 } 

898 restore = {k: getattr(conf, k) for k in dependencies} 

899 del restore[attr] # This is handled directly by _set_conf_sockets 

900 if val: # Only if True 

901 for param in dependencies[attr]: 

902 Interceptor.set_from_hook(conf, param, False) 

903 try: 

904 _set_conf_sockets() 

905 except (ScapyInvalidPlatformException, ImportError) as e: 

906 for key, value in restore.items(): 

907 Interceptor.set_from_hook(conf, key, value) 

908 if isinstance(e, ScapyInvalidPlatformException): 

909 raise 

910 return getattr(conf, attr) 

911 

912 

913def _loglevel_changer(attr, val, old): 

914 # type: (str, int, int) -> int 

915 """Handle a change of conf.logLevel""" 

916 log_scapy.setLevel(val) 

917 return val 

918 

919 

920def _iface_changer(attr, val, old): 

921 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface' 

922 """Resolves the interface in conf.iface""" 

923 if isinstance(val, str): 

924 from scapy.interfaces import resolve_iface 

925 iface = resolve_iface(val) 

926 if old and iface.dummy: 

927 warning( 

928 "This interface is not specified in any provider ! " 

929 "See conf.ifaces output" 

930 ) 

931 return iface 

932 return val 

933 

934 

935def _reset_tls_nss_keys(attr, val, old): 

936 # type: (str, Any, Any) -> Any 

937 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes""" 

938 conf.tls_nss_keys = None 

939 return val 

940 

941 

942class Conf(ConfClass): 

943 """ 

944 This object contains the configuration of Scapy. 

945 """ 

946 version: str = ReadOnlyAttribute("version", VERSION) 

947 session: str = "" #: filename where the session will be saved 

948 interactive = False 

949 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto". 

950 #: Default: Auto 

951 interactive_shell = "auto" 

952 #: Configuration for "ipython" to use jedi (disabled by default) 

953 ipython_use_jedi = False 

954 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...) 

955 stealth = "not implemented" 

956 #: selects the default output interface for srp() and sendp(). 

957 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501 

958 layers: LayersList = LayersList() 

959 commands = CommandsList() # type: CommandsList 

960 #: Codec used by default for ASN1 objects 

961 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec' 

962 #: Default size for ASN1 objects 

963 ASN1_default_long_size = 0 

964 #: choose the AS resolver class to use 

965 AS_resolver = None # type: scapy.as_resolvers.AS_resolver 

966 dot15d4_protocol = None # Used in dot15d4.py 

967 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer) 

968 #: if 0, doesn't check that IPID matches between IP sent and 

969 #: ICMP IP citation received 

970 #: if 1, checks that they either are equal or byte swapped 

971 #: equals (bug in some IP stacks) 

972 #: if 2, strictly checks that they are equals 

973 checkIPID = False 

974 #: if 1, checks IP src in IP and ICMP IP citation match 

975 #: (bug in some NAT stacks) 

976 checkIPsrc = True 

977 checkIPaddr = True 

978 #: if True, checks that IP-in-IP layers match. If False, do 

979 #: not check IP layers that encapsulates another IP layer 

980 checkIPinIP = True 

981 #: if 1, also check that TCP seq and ack match the 

982 #: ones in ICMP citation 

983 check_TCPerror_seqack = False 

984 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose) 

985 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer) 

986 #: default mode for the promiscuous mode of a socket (to get answers if you 

987 #: spoof on a lan) 

988 sniff_promisc = True # type: bool 

989 raw_layer = None # type: Type[Packet] 

990 raw_summary = False # type: Union[bool, Callable[[bytes], Any]] 

991 padding_layer = None # type: Type[Packet] 

992 default_l2 = None # type: Type[Packet] 

993 l2types: Num2Layer = Num2Layer() 

994 l3types: Num2Layer = Num2Layer() 

995 L3socket = None # type: Type[scapy.supersocket.SuperSocket] 

996 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket] 

997 L2socket = None # type: Type[scapy.supersocket.SuperSocket] 

998 L2listen = None # type: Type[scapy.supersocket.SuperSocket] 

999 BTsocket = None # type: Type[scapy.supersocket.SuperSocket] 

1000 min_pkt_size = 60 

1001 #: holds MIB direct access dictionary 

1002 mib = None # type: 'scapy.asn1.mib.MIBDict' 

1003 bufsize = 2**16 

1004 #: history file 

1005 histfile: str = os.getenv( 

1006 'SCAPY_HISTFILE', 

1007 os.path.join( 

1008 os.path.expanduser("~"), 

1009 ".config", "scapy", "history" 

1010 ) 

1011 ) 

1012 #: includes padding in disassembled packets 

1013 padding = 1 

1014 #: BPF filter for packets to ignore 

1015 except_filter = "" 

1016 #: bpf filter added to every sniffing socket to exclude traffic 

1017 #: from analysis 

1018 filter = "" 

1019 #: when 1, store received packet that are not matched into `debug.recv` 

1020 debug_match = False 

1021 #: When 1, print some TLS session secrets when they are computed, and 

1022 #: warn about the session recognition. 

1023 debug_tls = False 

1024 wepkey = "" 

1025 #: holds the Scapy interface list and manager 

1026 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict' 

1027 #: holds the cache of interfaces loaded from Libpcap 

1028 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str, int]] 

1029 # `neighbor` will be filed by scapy.layers.l2 

1030 neighbor = None # type: 'scapy.layers.l2.Neighbor' 

1031 #: holds the name servers IP/hosts used for custom DNS resolution 

1032 nameservers = None # type: str 

1033 #: automatically load IPv4 routes on startup. Disable this if your 

1034 #: routing table is too big. 

1035 route_autoload = True 

1036 #: automatically load IPv6 routes on startup. Disable this if your 

1037 #: routing table is too big. 

1038 route6_autoload = True 

1039 #: holds the Scapy IPv4 routing table and provides methods to 

1040 #: manipulate it 

1041 route = None # type: 'scapy.route.Route' 

1042 # `route` will be filed by route.py 

1043 #: holds the Scapy IPv6 routing table and provides methods to 

1044 #: manipulate it 

1045 route6 = None # type: 'scapy.route6.Route6' 

1046 manufdb = None # type: 'scapy.data.ManufDA' 

1047 ethertypes = None # type: 'scapy.data.EtherDA' 

1048 protocols = None # type: 'scapy.dadict.DADict[int, str]' 

1049 services_udp = None # type: 'scapy.dadict.DADict[int, str]' 

1050 services_tcp = None # type: 'scapy.dadict.DADict[int, str]' 

1051 services_sctp = None # type: 'scapy.dadict.DADict[int, str]' 

1052 # 'route6' will be filed by route6.py 

1053 teredoPrefix = "" # type: str 

1054 teredoServerPort = None # type: int 

1055 auto_fragment = True 

1056 #: raise exception when a packet dissector raises an exception 

1057 debug_dissector = False 

1058 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer) 

1059 #: how much time between warnings from the same place 

1060 warning_threshold = 5 

1061 prog: ProgPath = ProgPath() 

1062 #: holds list of fields for which resolution should be done 

1063 resolve: Resolve = Resolve() 

1064 #: holds list of enum fields for which conversion to string 

1065 #: should NOT be done 

1066 noenum: Resolve = Resolve() 

1067 emph: Emphasize = Emphasize() 

1068 #: read only attribute to show if PyPy is in use 

1069 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy()) 

1070 #: use libpcap integration or not. Changing this value will update 

1071 #: the conf.L[2/3] sockets 

1072 use_pcap: bool = Interceptor( 

1073 "use_pcap", 

1074 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"), 

1075 _socket_changer 

1076 ) 

1077 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer) 

1078 use_npcap = False 

1079 ipv6_enabled: bool = socket.has_ipv6 

1080 stats_classic_protocols = [] # type: List[Type[Packet]] 

1081 stats_dot11_protocols = [] # type: List[Type[Packet]] 

1082 temp_files = [] # type: List[str] 

1083 #: netcache holds time-based caches for net operations 

1084 netcache: NetCache = NetCache() 

1085 geoip_city = None 

1086 #: Scapy extensions that are loaded automatically on load 

1087 load_extensions: List[str] = [] 

1088 # can, tls, http and a few others are not loaded by default 

1089 load_layers: List[str] = [ 

1090 'bluetooth', 

1091 'bluetooth4LE', 

1092 'dcerpc', 

1093 'dhcp', 

1094 'dhcp6', 

1095 'dns', 

1096 'dot11', 

1097 'dot15d4', 

1098 'eap', 

1099 'gprs', 

1100 'gssapi', 

1101 'hsrp', 

1102 'inet', 

1103 'inet6', 

1104 'ipsec', 

1105 'ir', 

1106 'isakmp', 

1107 'kerberos', 

1108 'l2', 

1109 'l2tp', 

1110 'ldap', 

1111 'llmnr', 

1112 'lltd', 

1113 'mgcp', 

1114 'msrpce.rpcclient', 

1115 'msrpce.rpcserver', 

1116 'mobileip', 

1117 'netbios', 

1118 'netflow', 

1119 'ntlm', 

1120 'ntp', 

1121 'ppi', 

1122 'ppp', 

1123 'pptp', 

1124 'radius', 

1125 'rip', 

1126 'rtp', 

1127 'sctp', 

1128 'sixlowpan', 

1129 'skinny', 

1130 'smb', 

1131 'smb2', 

1132 'smbclient', 

1133 'smbserver', 

1134 'snmp', 

1135 'spnego', 

1136 'tftp', 

1137 'vrrp', 

1138 'vxlan', 

1139 'x509', 

1140 'zigbee' 

1141 ] 

1142 #: a dict which can be used by contrib layers to store local 

1143 #: configuration 

1144 contribs = dict() # type: Dict[str, Any] 

1145 exts: ExtsManager = ExtsManager() 

1146 crypto_valid = isCryptographyValid() 

1147 crypto_valid_advanced = isCryptographyAdvanced() 

1148 #: controls whether or not to display the fancy banner 

1149 fancy_banner = True 

1150 #: controls whether tables (conf.iface, conf.route...) should be cropped 

1151 #: to fit the terminal 

1152 auto_crop_tables = True 

1153 #: how often to check for new packets. 

1154 #: Defaults to 0.05s. 

1155 recv_poll_rate = 0.05 

1156 #: When True, raise exception if no dst MAC found otherwise broadcast. 

1157 #: Default is False. 

1158 raise_no_dst_mac = False 

1159 loopback_name: str = "lo" if LINUX else "lo0" 

1160 nmap_base = "" # type: str 

1161 nmap_kdb = None # type: Optional[NmapKnowledgeBase] 

1162 #: a safety mechanism: the maximum amount of items included in a PacketListField 

1163 #: or a FieldListField 

1164 max_list_count = 100 

1165 #: When the TLS module is loaded (not by default), the following turns on sessions 

1166 tls_session_enable = False 

1167 #: Filename containing NSS Keys Log 

1168 tls_nss_filename = Interceptor( 

1169 "tls_nss_filename", 

1170 None, 

1171 _reset_tls_nss_keys 

1172 ) 

1173 #: Dictionary containing parsed NSS Keys 

1174 tls_nss_keys: Dict[str, bytes] = None 

1175 #: Whether to use NDR64 by default instead of NDR 32 

1176 ndr64: bool = True 

1177 #: When TCPSession is used, parse DCE/RPC sessions automatically. 

1178 #: This should be used for passive sniffing. 

1179 dcerpc_session_enable = False 

1180 #: If a capture is missing the first DCE/RPC binding message, we might incorrectly 

1181 #: assume that header signing isn't used. This forces it on. 

1182 dcerpc_force_header_signing = False 

1183 #: Windows SSPs for sniffing. This is used with 

1184 #: dcerpc_session_enable 

1185 winssps_passive = [] 

1186 #: Disables auto-stripping of StrFixedLenField for debugging purposes 

1187 debug_strfixedlenfield = False 

1188 

1189 def __getattribute__(self, attr): 

1190 # type: (str) -> Any 

1191 # Those are loaded on runtime to avoid import loops 

1192 if attr == "manufdb": 

1193 from scapy.data import MANUFDB 

1194 return MANUFDB 

1195 if attr == "ethertypes": 

1196 from scapy.data import ETHER_TYPES 

1197 return ETHER_TYPES 

1198 if attr == "protocols": 

1199 from scapy.data import IP_PROTOS 

1200 return IP_PROTOS 

1201 if attr == "services_udp": 

1202 from scapy.data import UDP_SERVICES 

1203 return UDP_SERVICES 

1204 if attr == "services_tcp": 

1205 from scapy.data import TCP_SERVICES 

1206 return TCP_SERVICES 

1207 if attr == "services_sctp": 

1208 from scapy.data import SCTP_SERVICES 

1209 return SCTP_SERVICES 

1210 if attr == "iface6": 

1211 warnings.warn( 

1212 "conf.iface6 is deprecated in favor of conf.iface", 

1213 DeprecationWarning 

1214 ) 

1215 attr = "iface" 

1216 return object.__getattribute__(self, attr) 

1217 

1218 

1219if not Conf.ipv6_enabled: 

1220 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501 

1221 for m in ["inet6", "dhcp6", "sixlowpan"]: 

1222 if m in Conf.load_layers: 

1223 Conf.load_layers.remove(m) 

1224 

1225conf = Conf() # type: Conf 

1226 

1227 

1228if not isCryptographyBackendCompatible(): 

1229 conf.crypto_valid = False 

1230 conf.crypto_valid_advanced = False 

1231 log_scapy.error( 

1232 "Scapy does not support LibreSSL as a backend to cryptography ! " 

1233 "See https://cryptography.io/en/latest/installation/#static-wheels " 

1234 "for instructions on how to recompile cryptography with another " 

1235 "backend." 

1236 ) 

1237 

1238 

1239def crypto_validator(func): 

1240 # type: (DecoratorCallable) -> DecoratorCallable 

1241 """ 

1242 This a decorator to be used for any method relying on the cryptography library. # noqa: E501 

1243 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'. 

1244 """ 

1245 def func_in(*args, **kwargs): 

1246 # type: (*Any, **Any) -> Any 

1247 if not conf.crypto_valid: 

1248 raise ImportError("Cannot execute crypto-related method! " 

1249 "Please install python-cryptography v2.0 or later.") # noqa: E501 

1250 return func(*args, **kwargs) 

1251 return func_in 

1252 

1253 

1254def scapy_delete_temp_files(): 

1255 # type: () -> None 

1256 for f in conf.temp_files: 

1257 try: 

1258 os.unlink(f) 

1259 except Exception: 

1260 pass 

1261 del conf.temp_files[:] 

1262 

1263 

1264atexit.register(scapy_delete_temp_files)