Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/config.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Implementation of the configuration object.
8"""
10import atexit
11import copy
12import functools
13import os
14import pathlib
15import re
16import socket
17import sys
18import time
19import warnings
21from dataclasses import dataclass
22from enum import Enum
24import importlib
25import importlib.abc
26import importlib.util
28import scapy
29from scapy import VERSION
30from scapy.base_classes import BasePacket
31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS
32from scapy.error import (
33 log_loading,
34 log_scapy,
35 ScapyInvalidPlatformException,
36 warning,
37)
38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style
40# Typing imports
41from typing import (
42 cast,
43 Any,
44 Callable,
45 Dict,
46 Iterator,
47 List,
48 NoReturn,
49 Optional,
50 Set,
51 Tuple,
52 Type,
53 Union,
54 overload,
55 TYPE_CHECKING,
56)
57from types import ModuleType
58from scapy.compat import DecoratorCallable
60if TYPE_CHECKING:
61 # Do not import at runtime
62 import scapy.as_resolvers
63 from scapy.modules.nmap import NmapKnowledgeBase
64 from scapy.packet import Packet
65 from scapy.supersocket import SuperSocket # noqa: F401
66 import scapy.asn1.asn1
67 import scapy.asn1.mib
69############
70# Config #
71############
74class ConfClass(object):
75 def configure(self, cnf):
76 # type: (ConfClass) -> None
77 self.__dict__ = cnf.__dict__.copy()
79 def __repr__(self):
80 # type: () -> str
81 return str(self)
83 def __str__(self):
84 # type: () -> str
85 s = ""
86 dkeys = self.__class__.__dict__.copy()
87 dkeys.update(self.__dict__)
88 keys = sorted(dkeys)
89 for i in keys:
90 if i[0] != "_":
91 r = repr(getattr(self, i))
92 r = " ".join(r.split())
93 wlen = 76 - max(len(i), 10)
94 if len(r) > wlen:
95 r = r[:wlen - 3] + "..."
96 s += "%-10s = %s\n" % (i, r)
97 return s[:-1]
100class Interceptor(object):
101 def __init__(self,
102 name, # type: str
103 default, # type: Any
104 hook, # type: Callable[..., Any]
105 args=None, # type: Optional[List[Any]]
106 kargs=None # type: Optional[Dict[str, Any]]
107 ):
108 # type: (...) -> None
109 self.name = name
110 self.intname = "_intercepted_%s" % name
111 self.default = default
112 self.hook = hook
113 self.args = args if args is not None else []
114 self.kargs = kargs if kargs is not None else {}
116 def __get__(self, obj, typ=None):
117 # type: (Conf, Optional[type]) -> Any
118 if not hasattr(obj, self.intname):
119 setattr(obj, self.intname, self.default)
120 return getattr(obj, self.intname)
122 @staticmethod
123 def set_from_hook(obj, name, val):
124 # type: (Conf, str, bool) -> None
125 int_name = "_intercepted_%s" % name
126 setattr(obj, int_name, val)
128 def __set__(self, obj, val):
129 # type: (Conf, Any) -> None
130 old = getattr(obj, self.intname, self.default)
131 val = self.hook(self.name, val, old, *self.args, **self.kargs)
132 setattr(obj, self.intname, val)
135def _readonly(name):
136 # type: (str) -> NoReturn
137 default = Conf.__dict__[name].default
138 Interceptor.set_from_hook(conf, name, default)
139 raise ValueError("Read-only value !")
142ReadOnlyAttribute = functools.partial(
143 Interceptor,
144 hook=(lambda name, *args, **kwargs: _readonly(name))
145)
146ReadOnlyAttribute.__doc__ = "Read-only class attribute"
149class ProgPath(ConfClass):
150 _default: str = "<System default>"
151 universal_open: str = "open" if DARWIN else "xdg-open"
152 pdfreader: str = universal_open
153 psreader: str = universal_open
154 svgreader: str = universal_open
155 dot: str = "dot"
156 display: str = "display"
157 tcpdump: str = "tcpdump"
158 tcpreplay: str = "tcpreplay"
159 hexedit: str = "hexer"
160 tshark: str = "tshark"
161 wireshark: str = "wireshark"
162 ifconfig: str = "ifconfig"
163 extcap_folders: List[str] = [
164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"),
165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap",
166 ]
169class ConfigFieldList:
170 def __init__(self):
171 # type: () -> None
172 self.fields = set() # type: Set[Any]
173 self.layers = set() # type: Set[Any]
175 @staticmethod
176 def _is_field(f):
177 # type: (Any) -> bool
178 return hasattr(f, "owners")
180 def _recalc_layer_list(self):
181 # type: () -> None
182 self.layers = {owner for f in self.fields for owner in f.owners}
184 def add(self, *flds):
185 # type: (*Any) -> None
186 self.fields |= {f for f in flds if self._is_field(f)}
187 self._recalc_layer_list()
189 def remove(self, *flds):
190 # type: (*Any) -> None
191 self.fields -= set(flds)
192 self._recalc_layer_list()
194 def __contains__(self, elt):
195 # type: (Any) -> bool
196 if isinstance(elt, BasePacket):
197 return elt in self.layers
198 return elt in self.fields
200 def __repr__(self):
201 # type: () -> str
202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501
205class Emphasize(ConfigFieldList):
206 pass
209class Resolve(ConfigFieldList):
210 pass
213class Num2Layer:
214 def __init__(self):
215 # type: () -> None
216 self.num2layer = {} # type: Dict[int, Type[Packet]]
217 self.layer2num = {} # type: Dict[Type[Packet], int]
219 def register(self, num, layer):
220 # type: (int, Type[Packet]) -> None
221 self.register_num2layer(num, layer)
222 self.register_layer2num(num, layer)
224 def register_num2layer(self, num, layer):
225 # type: (int, Type[Packet]) -> None
226 self.num2layer[num] = layer
228 def register_layer2num(self, num, layer):
229 # type: (int, Type[Packet]) -> None
230 self.layer2num[layer] = num
232 @overload
233 def __getitem__(self, item):
234 # type: (Type[Packet]) -> int
235 pass
237 @overload
238 def __getitem__(self, item): # noqa: F811
239 # type: (int) -> Type[Packet]
240 pass
242 def __getitem__(self, item): # noqa: F811
243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]]
244 if isinstance(item, int):
245 return self.num2layer[item]
246 else:
247 return self.layer2num[item]
249 def __contains__(self, item):
250 # type: (Union[int, Type[Packet]]) -> bool
251 if isinstance(item, int):
252 return item in self.num2layer
253 else:
254 return item in self.layer2num
256 def get(self,
257 item, # type: Union[int, Type[Packet]]
258 default=None, # type: Optional[Type[Packet]]
259 ):
260 # type: (...) -> Optional[Union[int, Type[Packet]]]
261 return self[item] if item in self else default
263 def __repr__(self):
264 # type: () -> str
265 lst = []
266 for num, layer in self.num2layer.items():
267 if layer in self.layer2num and self.layer2num[layer] == num:
268 dir = "<->"
269 else:
270 dir = " ->"
271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__,
272 layer._name)))
273 for layer, num in self.layer2num.items():
274 if num not in self.num2layer or self.num2layer[num] != layer:
275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__,
276 layer._name)))
277 lst.sort()
278 return "\n".join(y for x, y in lst)
281class LayersList(List[Type['scapy.packet.Packet']]):
282 def __init__(self):
283 # type: () -> None
284 list.__init__(self)
285 self.ldict = {} # type: Dict[str, List[Type[Packet]]]
286 self.filtered = False
287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501
289 def __repr__(self):
290 # type: () -> str
291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name)
292 for layer in self)
294 def register(self, layer):
295 # type: (Type[Packet]) -> None
296 self.append(layer)
298 # Skip arch* modules
299 if layer.__module__.startswith("scapy.arch."):
300 return
302 # Register in module
303 if layer.__module__ not in self.ldict:
304 self.ldict[layer.__module__] = []
305 self.ldict[layer.__module__].append(layer)
307 def layers(self):
308 # type: () -> List[Tuple[str, str]]
309 result = []
310 # This import may feel useless, but it is required for the eval below
311 import scapy # noqa: F401
312 try:
313 import builtins # noqa: F401
314 except ImportError:
315 import __builtin__ # noqa: F401
316 for lay in self.ldict:
317 doc = eval(lay).__doc__
318 result.append((lay, doc.strip().split("\n")[0] if doc else lay))
319 return result
321 def filter(self, items):
322 # type: (List[Type[Packet]]) -> None
323 """Disable dissection of unused layers to speed up dissection"""
324 if self.filtered:
325 raise ValueError("Already filtered. Please disable it first")
326 for lay in self.ldict.values():
327 for cls in lay:
328 if cls not in self._backup_dict:
329 self._backup_dict[cls] = cls.payload_guess[:]
330 cls.payload_guess = [
331 y for y in cls.payload_guess if y[1] in items
332 ]
333 self.filtered = True
335 def unfilter(self):
336 # type: () -> None
337 """Re-enable dissection for all layers"""
338 if not self.filtered:
339 raise ValueError("Not filtered. Please filter first")
340 for lay in self.ldict.values():
341 for cls in lay:
342 cls.payload_guess = self._backup_dict[cls]
343 self._backup_dict.clear()
344 self.filtered = False
347class CommandsList(List[Callable[..., Any]]):
348 def __repr__(self):
349 # type: () -> str
350 s = []
351 for li in sorted(self, key=lambda x: x.__name__):
352 doc = li.__doc__ if li.__doc__ else "--"
353 doc = doc.lstrip().split('\n', 1)[0]
354 s.append("%-22s: %s" % (li.__name__, doc))
355 return "\n".join(s)
357 def register(self, cmd):
358 # type: (DecoratorCallable) -> DecoratorCallable
359 self.append(cmd)
360 return cmd # return cmd so that method can be used as a decorator
363def lsc():
364 # type: () -> None
365 """Displays Scapy's default commands"""
366 print(repr(conf.commands))
369class CacheInstance(Dict[str, Any]):
370 __slots__ = ["timeout", "name", "_timetable"]
372 def __init__(self, name="noname", timeout=None):
373 # type: (str, Optional[int]) -> None
374 self.timeout = timeout
375 self.name = name
376 self._timetable = {} # type: Dict[str, float]
378 def flush(self):
379 # type: () -> None
380 self._timetable.clear()
381 self.clear()
383 def __getitem__(self, item):
384 # type: (str) -> Any
385 if item in self.__slots__:
386 return object.__getattribute__(self, item)
387 if not self.__contains__(item):
388 raise KeyError(item)
389 return super(CacheInstance, self).__getitem__(item)
391 def __contains__(self, item):
392 if not super(CacheInstance, self).__contains__(item):
393 return False
394 if self.timeout is not None:
395 t = self._timetable[item]
396 if time.time() - t > self.timeout:
397 return False
398 return True
400 def get(self, item, default=None):
401 # type: (str, Optional[Any]) -> Any
402 # overloading this method is needed to force the dict to go through
403 # the timetable check
404 try:
405 return self[item]
406 except KeyError:
407 return default
409 def __setitem__(self, item, v):
410 # type: (str, str) -> None
411 if item in self.__slots__:
412 return object.__setattr__(self, item, v)
413 self._timetable[item] = time.time()
414 super(CacheInstance, self).__setitem__(item, v)
416 def update(self,
417 other, # type: Any
418 **kwargs # type: Any
419 ):
420 # type: (...) -> None
421 for key, value in other.items():
422 # We only update an element from `other` either if it does
423 # not exist in `self` or if the entry in `self` is older.
424 if key not in self or self._timetable[key] < other._timetable[key]:
425 dict.__setitem__(self, key, value)
426 self._timetable[key] = other._timetable[key]
428 def iteritems(self):
429 # type: () -> Iterator[Tuple[str, Any]]
430 if self.timeout is None:
431 return super(CacheInstance, self).items()
432 t0 = time.time()
433 return (
434 (k, v)
435 for (k, v) in super(CacheInstance, self).items()
436 if t0 - self._timetable[k] < self.timeout
437 )
439 def iterkeys(self):
440 # type: () -> Iterator[str]
441 if self.timeout is None:
442 return super(CacheInstance, self).keys()
443 t0 = time.time()
444 return (
445 k
446 for k in super(CacheInstance, self).keys()
447 if t0 - self._timetable[k] < self.timeout
448 )
450 def __iter__(self):
451 # type: () -> Iterator[str]
452 return self.iterkeys()
454 def itervalues(self):
455 # type: () -> Iterator[Tuple[str, Any]]
456 if self.timeout is None:
457 return super(CacheInstance, self).values()
458 t0 = time.time()
459 return (
460 v
461 for (k, v) in super(CacheInstance, self).items()
462 if t0 - self._timetable[k] < self.timeout
463 )
465 def items(self):
466 # type: () -> Any
467 return list(self.iteritems())
469 def keys(self):
470 # type: () -> Any
471 return list(self.iterkeys())
473 def values(self):
474 # type: () -> Any
475 return list(self.itervalues())
477 def __len__(self):
478 # type: () -> int
479 if self.timeout is None:
480 return super(CacheInstance, self).__len__()
481 return len(self.keys())
483 def summary(self):
484 # type: () -> str
485 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501
487 def __repr__(self):
488 # type: () -> str
489 s = []
490 if self:
491 mk = max(len(k) for k in self)
492 fmt = "%%-%is %%s" % (mk + 1)
493 for item in self.items():
494 s.append(fmt % item)
495 return "\n".join(s)
497 def copy(self):
498 # type: () -> CacheInstance
499 return copy.copy(self)
502class NetCache:
503 def __init__(self):
504 # type: () -> None
505 self._caches_list = [] # type: List[CacheInstance]
507 def add_cache(self, cache):
508 # type: (CacheInstance) -> None
509 self._caches_list.append(cache)
510 setattr(self, cache.name, cache)
512 def new_cache(self, name, timeout=None):
513 # type: (str, Optional[int]) -> CacheInstance
514 c = CacheInstance(name=name, timeout=timeout)
515 self.add_cache(c)
516 return c
518 def __delattr__(self, attr):
519 # type: (str) -> NoReturn
520 raise AttributeError("Cannot delete attributes")
522 def update(self, other):
523 # type: (NetCache) -> None
524 for co in other._caches_list:
525 if hasattr(self, co.name):
526 getattr(self, co.name).update(co)
527 else:
528 self.add_cache(co.copy())
530 def flush(self):
531 # type: () -> None
532 for c in self._caches_list:
533 c.flush()
535 def __repr__(self):
536 # type: () -> str
537 return "\n".join(c.summary() for c in self._caches_list)
540class ScapyExt:
541 __slots__ = ["specs", "name", "version", "bash_completions"]
543 class MODE(Enum):
544 LAYERS = "layers"
545 CONTRIB = "contrib"
546 MODULES = "modules"
548 @dataclass
549 class ScapyExtSpec:
550 fullname: str
551 mode: 'ScapyExt.MODE'
552 spec: Any
553 default: bool
555 def __init__(self):
556 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {}
557 self.bash_completions = {}
559 def config(self, name, version):
560 self.name = name
561 self.version = version
563 def register(self, name, mode, path, default=None):
564 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !"
565 fullname = f"scapy.{mode.value}.{name}"
566 spec = importlib.util.spec_from_file_location(
567 fullname,
568 str(path),
569 )
570 spec = self.ScapyExtSpec(
571 fullname=fullname,
572 mode=mode,
573 spec=spec,
574 default=default or False,
575 )
576 if default is None:
577 spec.default = bool(importlib.util.find_spec(spec.fullname))
578 self.specs[fullname] = spec
580 def register_bashcompletion(self, script: pathlib.Path):
581 self.bash_completions[script.name] = script
583 def __repr__(self):
584 return "<ScapyExt %s %s (%s specs)>" % (
585 self.name,
586 self.version,
587 len(self.specs),
588 )
591class ExtsManager(importlib.abc.MetaPathFinder):
592 __slots__ = ["exts", "all_specs"]
594 GPLV2_LICENCES = [
595 "GPL-2.0-only",
596 "GPL-2.0-or-later",
597 ]
599 def __init__(self):
600 self.exts: List[ScapyExt] = []
601 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {}
602 self._loaded: List[str] = []
603 # Add to meta_path as we are an import provider
604 if self not in sys.meta_path:
605 sys.meta_path.append(self)
607 def find_spec(self, fullname, path, target=None):
608 if fullname in self.all_specs:
609 return self.all_specs[fullname].spec
611 def invalidate_caches(self):
612 pass
614 def _register_spec(self, spec):
615 # Register to known specs
616 self.all_specs[spec.fullname] = spec
618 # If default=True, inject it in the currently loaded modules
619 if spec.default:
620 loader = importlib.util.LazyLoader(spec.spec.loader)
621 spec.spec.loader = loader
622 module = importlib.util.module_from_spec(spec.spec)
623 sys.modules[spec.fullname] = module
624 loader.exec_module(module)
626 def load(self, extension: str):
627 """
628 Load a scapy extension.
630 :param extension: the name of the extension, as installed.
631 """
632 if extension in self._loaded:
633 return
635 try:
636 import importlib.metadata
637 except ImportError:
638 log_loading.warning(
639 "'%s' not loaded. "
640 "Scapy extensions require at least Python 3.8+ !" % extension
641 )
642 return
644 # Get extension distribution
645 try:
646 distr = importlib.metadata.distribution(extension)
647 except importlib.metadata.PackageNotFoundError:
648 log_loading.warning("The extension '%s' was not found !" % extension)
649 return
651 # Check the classifiers
652 if distr.metadata.get('License-Expression', None) not in self.GPLV2_LICENCES:
653 log_loading.warning(
654 "'%s' has no GPLv2 classifier therefore cannot be loaded." % extension
655 )
656 return
658 # Create the extension
659 ext = ScapyExt()
661 # Get the top-level declared "import packages"
662 # HACK: not available nicely in importlib :/
663 packages = distr.read_text("top_level.txt").split()
665 for package in packages:
666 scapy_ext = importlib.import_module(package)
668 # We initialize the plugin by calling it's 'scapy_ext' function
669 try:
670 scapy_ext_func = scapy_ext.scapy_ext
671 except AttributeError:
672 log_loading.warning(
673 "'%s' does not look like a Scapy plugin !" % extension
674 )
675 return
676 try:
677 scapy_ext_func(ext)
678 except Exception as ex:
679 log_loading.warning(
680 "'%s' failed during initialization with %s" % (
681 extension,
682 ex
683 )
684 )
685 return
687 # Register all the specs provided by this extension
688 for spec in ext.specs.values():
689 self._register_spec(spec)
691 # Add to the extension list
692 self.exts.append(ext)
693 self._loaded.append(extension)
695 # If there are bash autocompletions, add them
696 if ext.bash_completions:
697 from scapy.main import _add_bash_autocompletion
699 for name, script in ext.bash_completions.items():
700 _add_bash_autocompletion(name, script)
702 def loadall(self) -> None:
703 """
704 Load all extensions registered in conf.
705 """
706 for extension in conf.load_extensions:
707 self.load(extension)
709 def __repr__(self):
710 from scapy.utils import pretty_list
711 return pretty_list(
712 [
713 (x.name, x.version, [y.fullname for y in x.specs.values()])
714 for x in self.exts
715 ],
716 [("Name", "Version", "Specs")],
717 sortBy=0,
718 )
721def _version_checker(module, minver):
722 # type: (ModuleType, Tuple[int, ...]) -> bool
723 """Checks that module has a higher version that minver.
725 params:
726 - module: a module to test
727 - minver: a tuple of versions
728 """
729 # We could use LooseVersion, but distutils imports imp which is deprecated
730 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?'
731 version_tags_r = re.match(
732 version_regexp,
733 getattr(module, "__version__", "")
734 )
735 if not version_tags_r:
736 return False
737 version_tags_i = version_tags_r.group(1).split(".")
738 version_tags = tuple(int(x) for x in version_tags_i)
739 return bool(version_tags >= minver)
742def isCryptographyValid():
743 # type: () -> bool
744 """
745 Check if the cryptography module >= 2.0.0 is present. This is the minimum
746 version for most usages in Scapy.
747 """
748 # Check import
749 try:
750 import cryptography
751 except ImportError:
752 return False
754 # Check minimum version
755 return _version_checker(cryptography, (2, 0, 0))
758def isCryptographyAdvanced():
759 # type: () -> bool
760 """
761 Check if the cryptography module is present, and if it supports X25519,
762 ChaCha20Poly1305 and such.
764 Notes:
765 - cryptography >= 2.0 is required
766 - OpenSSL >= 1.1.0 is required
767 """
768 try:
769 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501
770 X25519PrivateKey.generate()
771 except Exception:
772 return False
773 else:
774 return True
777def isCryptographyBackendCompatible() -> bool:
778 """
779 Check if the cryptography backend is compatible
780 """
781 # Check for LibreSSL
782 try:
783 from cryptography.hazmat.backends import default_backend
784 if "LibreSSL" in default_backend().openssl_version_text():
785 # BUG: LibreSSL - https://marc.info/?l=libressl&m=173846028619304&w=2
786 # It takes 5 whole minutes to import RFC3526's modp parameters. This is
787 # not okay.
788 return False
789 return True
790 except Exception:
791 return True
794def isPyPy():
795 # type: () -> bool
796 """Returns either scapy is running under PyPy or not"""
797 try:
798 import __pypy__ # noqa: F401
799 return True
800 except ImportError:
801 return False
804def _prompt_changer(attr, val, old):
805 # type: (str, Any, Any) -> Any
806 """Change the current prompt theme"""
807 Interceptor.set_from_hook(conf, attr, val)
808 try:
809 sys.ps1 = conf.color_theme.prompt(conf.prompt)
810 except Exception:
811 pass
812 try:
813 apply_ipython_style(
814 get_ipython() # type: ignore
815 )
816 except NameError:
817 pass
818 return getattr(conf, attr, old)
821def _set_conf_sockets():
822 # type: () -> None
823 """Populate the conf.L2Socket and conf.L3Socket
824 according to the various use_* parameters
825 """
826 if conf.use_bpf and not BSD:
827 Interceptor.set_from_hook(conf, "use_bpf", False)
828 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !")
829 if not conf.use_pcap and SOLARIS:
830 Interceptor.set_from_hook(conf, "use_pcap", True)
831 raise ScapyInvalidPlatformException(
832 "Scapy only supports libpcap on Solaris !"
833 )
834 # we are already in an Interceptor hook, use Interceptor.set_from_hook
835 if conf.use_pcap:
836 try:
837 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \
838 L3pcapSocket
839 except (OSError, ImportError):
840 log_loading.warning("No libpcap provider available ! pcap won't be used")
841 Interceptor.set_from_hook(conf, "use_pcap", False)
842 else:
843 conf.L3socket = L3pcapSocket
844 conf.L3socket6 = functools.partial(
845 L3pcapSocket, filter="ip6")
846 conf.L2socket = L2pcapSocket
847 conf.L2listen = L2pcapListenSocket
848 elif conf.use_bpf:
849 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \
850 L2bpfSocket, L3bpfSocket
851 conf.L3socket = L3bpfSocket
852 conf.L3socket6 = functools.partial(
853 L3bpfSocket, filter="ip6")
854 conf.L2socket = L2bpfSocket
855 conf.L2listen = L2bpfListenSocket
856 elif LINUX:
857 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket
858 conf.L3socket = L3PacketSocket
859 conf.L3socket6 = cast(
860 "Type[SuperSocket]",
861 functools.partial(
862 L3PacketSocket,
863 filter="ip6"
864 )
865 )
866 conf.L2socket = L2Socket
867 conf.L2listen = L2ListenSocket
868 elif WINDOWS:
869 from scapy.arch.windows import _NotAvailableSocket
870 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6
871 conf.L3socket = L3WinSocket
872 conf.L3socket6 = L3WinSocket6
873 conf.L2socket = _NotAvailableSocket
874 conf.L2listen = _NotAvailableSocket
875 else:
876 from scapy.supersocket import L3RawSocket, L3RawSocket6
877 conf.L3socket = L3RawSocket
878 conf.L3socket6 = L3RawSocket6
879 # Reload the interfaces
880 conf.ifaces.reload()
883def _socket_changer(attr, val, old):
884 # type: (str, bool, bool) -> Any
885 if not isinstance(val, bool):
886 raise TypeError("This argument should be a boolean")
887 Interceptor.set_from_hook(conf, attr, val)
888 dependencies = { # Things that will be turned off
889 "use_pcap": ["use_bpf"],
890 "use_bpf": ["use_pcap"],
891 }
892 restore = {k: getattr(conf, k) for k in dependencies}
893 del restore[attr] # This is handled directly by _set_conf_sockets
894 if val: # Only if True
895 for param in dependencies[attr]:
896 Interceptor.set_from_hook(conf, param, False)
897 try:
898 _set_conf_sockets()
899 except (ScapyInvalidPlatformException, ImportError) as e:
900 for key, value in restore.items():
901 Interceptor.set_from_hook(conf, key, value)
902 if isinstance(e, ScapyInvalidPlatformException):
903 raise
904 return getattr(conf, attr)
907def _loglevel_changer(attr, val, old):
908 # type: (str, int, int) -> int
909 """Handle a change of conf.logLevel"""
910 log_scapy.setLevel(val)
911 return val
914def _iface_changer(attr, val, old):
915 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface'
916 """Resolves the interface in conf.iface"""
917 if isinstance(val, str):
918 from scapy.interfaces import resolve_iface
919 iface = resolve_iface(val)
920 if old and iface.dummy:
921 warning(
922 "This interface is not specified in any provider ! "
923 "See conf.ifaces output"
924 )
925 return iface
926 return val
929def _reset_tls_nss_keys(attr, val, old):
930 # type: (str, Any, Any) -> Any
931 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes"""
932 conf.tls_nss_keys = None
933 return val
936class Conf(ConfClass):
937 """
938 This object contains the configuration of Scapy.
939 """
940 version: str = ReadOnlyAttribute("version", VERSION)
941 session: str = "" #: filename where the session will be saved
942 interactive = False
943 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto".
944 #: Default: Auto
945 interactive_shell = "auto"
946 #: Configuration for "ipython" to use jedi (disabled by default)
947 ipython_use_jedi = False
948 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...)
949 stealth = "not implemented"
950 #: selects the default output interface for srp() and sendp().
951 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501
952 layers: LayersList = LayersList()
953 commands = CommandsList() # type: CommandsList
954 #: Codec used by default for ASN1 objects
955 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec'
956 #: Default size for ASN1 objects
957 ASN1_default_long_size = 0
958 #: choose the AS resolver class to use
959 AS_resolver = None # type: scapy.as_resolvers.AS_resolver
960 dot15d4_protocol = None # Used in dot15d4.py
961 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer)
962 #: if 0, doesn't check that IPID matches between IP sent and
963 #: ICMP IP citation received
964 #: if 1, checks that they either are equal or byte swapped
965 #: equals (bug in some IP stacks)
966 #: if 2, strictly checks that they are equals
967 checkIPID = False
968 #: if 1, checks IP src in IP and ICMP IP citation match
969 #: (bug in some NAT stacks)
970 checkIPsrc = True
971 checkIPaddr = True
972 #: if True, checks that IP-in-IP layers match. If False, do
973 #: not check IP layers that encapsulates another IP layer
974 checkIPinIP = True
975 #: if 1, also check that TCP seq and ack match the
976 #: ones in ICMP citation
977 check_TCPerror_seqack = False
978 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose)
979 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer)
980 #: default mode for the promiscuous mode of a socket (to get answers if you
981 #: spoof on a lan)
982 sniff_promisc = True # type: bool
983 raw_layer = None # type: Type[Packet]
984 raw_summary = False # type: Union[bool, Callable[[bytes], Any]]
985 padding_layer = None # type: Type[Packet]
986 default_l2 = None # type: Type[Packet]
987 l2types: Num2Layer = Num2Layer()
988 l3types: Num2Layer = Num2Layer()
989 L3socket = None # type: Type[scapy.supersocket.SuperSocket]
990 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket]
991 L2socket = None # type: Type[scapy.supersocket.SuperSocket]
992 L2listen = None # type: Type[scapy.supersocket.SuperSocket]
993 BTsocket = None # type: Type[scapy.supersocket.SuperSocket]
994 min_pkt_size = 60
995 #: holds MIB direct access dictionary
996 mib = None # type: 'scapy.asn1.mib.MIBDict'
997 bufsize = 2**16
998 #: history file
999 histfile: str = os.getenv(
1000 'SCAPY_HISTFILE',
1001 os.path.join(
1002 os.path.expanduser("~"),
1003 ".config", "scapy", "history"
1004 )
1005 )
1006 #: includes padding in disassembled packets
1007 padding = 1
1008 #: BPF filter for packets to ignore
1009 except_filter = ""
1010 #: bpf filter added to every sniffing socket to exclude traffic
1011 #: from analysis
1012 filter = ""
1013 #: when 1, store received packet that are not matched into `debug.recv`
1014 debug_match = False
1015 #: When 1, print some TLS session secrets when they are computed, and
1016 #: warn about the session recognition.
1017 debug_tls = False
1018 wepkey = ""
1019 #: holds the Scapy interface list and manager
1020 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict'
1021 #: holds the cache of interfaces loaded from Libpcap
1022 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str, int]]
1023 # `neighbor` will be filed by scapy.layers.l2
1024 neighbor = None # type: 'scapy.layers.l2.Neighbor'
1025 #: holds the name servers IP/hosts used for custom DNS resolution
1026 nameservers = None # type: str
1027 #: automatically load IPv4 routes on startup. Disable this if your
1028 #: routing table is too big.
1029 route_autoload = True
1030 #: automatically load IPv6 routes on startup. Disable this if your
1031 #: routing table is too big.
1032 route6_autoload = True
1033 #: holds the Scapy IPv4 routing table and provides methods to
1034 #: manipulate it
1035 route = None # type: 'scapy.route.Route'
1036 # `route` will be filed by route.py
1037 #: holds the Scapy IPv6 routing table and provides methods to
1038 #: manipulate it
1039 route6 = None # type: 'scapy.route6.Route6'
1040 manufdb = None # type: 'scapy.data.ManufDA'
1041 ethertypes = None # type: 'scapy.data.EtherDA'
1042 protocols = None # type: 'scapy.dadict.DADict[int, str]'
1043 services_udp = None # type: 'scapy.dadict.DADict[int, str]'
1044 services_tcp = None # type: 'scapy.dadict.DADict[int, str]'
1045 services_sctp = None # type: 'scapy.dadict.DADict[int, str]'
1046 # 'route6' will be filed by route6.py
1047 teredoPrefix = "" # type: str
1048 teredoServerPort = None # type: int
1049 auto_fragment = True
1050 #: raise exception when a packet dissector raises an exception
1051 debug_dissector = False
1052 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer)
1053 #: how much time between warnings from the same place
1054 warning_threshold = 5
1055 prog: ProgPath = ProgPath()
1056 #: holds list of fields for which resolution should be done
1057 resolve: Resolve = Resolve()
1058 #: holds list of enum fields for which conversion to string
1059 #: should NOT be done
1060 noenum: Resolve = Resolve()
1061 emph: Emphasize = Emphasize()
1062 #: read only attribute to show if PyPy is in use
1063 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy())
1064 #: use libpcap integration or not. Changing this value will update
1065 #: the conf.L[2/3] sockets
1066 use_pcap: bool = Interceptor(
1067 "use_pcap",
1068 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"),
1069 _socket_changer
1070 )
1071 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer)
1072 use_npcap = False
1073 ipv6_enabled: bool = socket.has_ipv6
1074 stats_classic_protocols = [] # type: List[Type[Packet]]
1075 stats_dot11_protocols = [] # type: List[Type[Packet]]
1076 temp_files = [] # type: List[str]
1077 #: netcache holds time-based caches for net operations
1078 netcache: NetCache = NetCache()
1079 geoip_city = None
1080 #: Scapy extensions that are loaded automatically on load
1081 load_extensions: List[str] = []
1082 # can, tls, http and a few others are not loaded by default
1083 load_layers: List[str] = [
1084 'bluetooth',
1085 'bluetooth4LE',
1086 'dcerpc',
1087 'dhcp',
1088 'dhcp6',
1089 'dns',
1090 'dot11',
1091 'dot15d4',
1092 'eap',
1093 'gprs',
1094 'gssapi',
1095 'hsrp',
1096 'inet',
1097 'inet6',
1098 'ipsec',
1099 'ir',
1100 'isakmp',
1101 'kerberos',
1102 'l2',
1103 'l2tp',
1104 'ldap',
1105 'llmnr',
1106 'lltd',
1107 'mgcp',
1108 'msrpce.rpcclient',
1109 'msrpce.rpcserver',
1110 'mobileip',
1111 'netbios',
1112 'netflow',
1113 'ntlm',
1114 'ntp',
1115 'ppi',
1116 'ppp',
1117 'pptp',
1118 'radius',
1119 'rip',
1120 'rtp',
1121 'sctp',
1122 'sixlowpan',
1123 'skinny',
1124 'smb',
1125 'smb2',
1126 'smbclient',
1127 'smbserver',
1128 'snmp',
1129 'spnego',
1130 'tftp',
1131 'vrrp',
1132 'vxlan',
1133 'x509',
1134 'zigbee'
1135 ]
1136 #: a dict which can be used by contrib layers to store local
1137 #: configuration
1138 contribs = dict() # type: Dict[str, Any]
1139 exts: ExtsManager = ExtsManager()
1140 crypto_valid = isCryptographyValid()
1141 crypto_valid_advanced = isCryptographyAdvanced()
1142 #: controls whether or not to display the fancy banner
1143 fancy_banner = True
1144 #: controls whether tables (conf.iface, conf.route...) should be cropped
1145 #: to fit the terminal
1146 auto_crop_tables = True
1147 #: how often to check for new packets.
1148 #: Defaults to 0.05s.
1149 recv_poll_rate = 0.05
1150 #: When True, raise exception if no dst MAC found otherwise broadcast.
1151 #: Default is False.
1152 raise_no_dst_mac = False
1153 loopback_name: str = "lo" if LINUX else "lo0"
1154 nmap_base = "" # type: str
1155 nmap_kdb = None # type: Optional[NmapKnowledgeBase]
1156 #: a safety mechanism: the maximum amount of items included in a PacketListField
1157 #: or a FieldListField
1158 max_list_count = 100
1159 #: When the TLS module is loaded (not by default), the following turns on sessions
1160 tls_session_enable = False
1161 #: Filename containing NSS Keys Log
1162 tls_nss_filename = Interceptor(
1163 "tls_nss_filename",
1164 None,
1165 _reset_tls_nss_keys
1166 )
1167 #: Dictionary containing parsed NSS Keys
1168 tls_nss_keys: Dict[str, bytes] = None
1169 #: Whether to use NDR64 by default instead of NDR 32
1170 ndr64: bool = True
1171 #: When TCPSession is used, parse DCE/RPC sessions automatically.
1172 #: This should be used for passive sniffing.
1173 dcerpc_session_enable = False
1174 #: If a capture is missing the first DCE/RPC binding message, we might incorrectly
1175 #: assume that header signing isn't used. This forces it on.
1176 dcerpc_force_header_signing = False
1177 #: Windows SSPs for sniffing. This is used with
1178 #: dcerpc_session_enable
1179 winssps_passive = []
1180 #: Disables auto-stripping of StrFixedLenField for debugging purposes
1181 debug_strfixedlenfield = False
1183 def __getattribute__(self, attr):
1184 # type: (str) -> Any
1185 # Those are loaded on runtime to avoid import loops
1186 if attr == "manufdb":
1187 from scapy.data import MANUFDB
1188 return MANUFDB
1189 if attr == "ethertypes":
1190 from scapy.data import ETHER_TYPES
1191 return ETHER_TYPES
1192 if attr == "protocols":
1193 from scapy.data import IP_PROTOS
1194 return IP_PROTOS
1195 if attr == "services_udp":
1196 from scapy.data import UDP_SERVICES
1197 return UDP_SERVICES
1198 if attr == "services_tcp":
1199 from scapy.data import TCP_SERVICES
1200 return TCP_SERVICES
1201 if attr == "services_sctp":
1202 from scapy.data import SCTP_SERVICES
1203 return SCTP_SERVICES
1204 if attr == "iface6":
1205 warnings.warn(
1206 "conf.iface6 is deprecated in favor of conf.iface",
1207 DeprecationWarning
1208 )
1209 attr = "iface"
1210 return object.__getattribute__(self, attr)
1213if not Conf.ipv6_enabled:
1214 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501
1215 for m in ["inet6", "dhcp6", "sixlowpan"]:
1216 if m in Conf.load_layers:
1217 Conf.load_layers.remove(m)
1219conf = Conf() # type: Conf
1222if not isCryptographyBackendCompatible():
1223 conf.crypto_valid = False
1224 conf.crypto_valid_advanced = False
1225 log_scapy.error(
1226 "Scapy does not support LibreSSL as a backend to cryptography ! "
1227 "See https://cryptography.io/en/latest/installation/#static-wheels "
1228 "for instructions on how to recompile cryptography with another "
1229 "backend."
1230 )
1233def crypto_validator(func):
1234 # type: (DecoratorCallable) -> DecoratorCallable
1235 """
1236 This a decorator to be used for any method relying on the cryptography library. # noqa: E501
1237 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'.
1238 """
1239 def func_in(*args, **kwargs):
1240 # type: (*Any, **Any) -> Any
1241 if not conf.crypto_valid:
1242 raise ImportError("Cannot execute crypto-related method! "
1243 "Please install python-cryptography v2.0 or later.") # noqa: E501
1244 return func(*args, **kwargs)
1245 return func_in
1248def scapy_delete_temp_files():
1249 # type: () -> None
1250 for f in conf.temp_files:
1251 try:
1252 os.unlink(f)
1253 except Exception:
1254 pass
1255 del conf.temp_files[:]
1258atexit.register(scapy_delete_temp_files)