Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/config.py: 53%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Implementation of the configuration object.
8"""
10import atexit
11import copy
12import functools
13import os
14import pathlib
15import re
16import socket
17import sys
18import time
19import warnings
21from dataclasses import dataclass
22from enum import Enum
24import importlib
25import importlib.abc
26import importlib.util
28import scapy
29from scapy import VERSION
30from scapy.base_classes import BasePacket
31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS
32from scapy.error import (
33 log_loading,
34 log_scapy,
35 ScapyInvalidPlatformException,
36 warning,
37)
38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style
40# Typing imports
41from typing import (
42 cast,
43 Any,
44 Callable,
45 Dict,
46 Iterator,
47 List,
48 NoReturn,
49 Optional,
50 Set,
51 Tuple,
52 Type,
53 Union,
54 overload,
55 TYPE_CHECKING,
56)
57from types import ModuleType
58from scapy.compat import DecoratorCallable
60if TYPE_CHECKING:
61 # Do not import at runtime
62 import scapy.as_resolvers
63 from scapy.modules.nmap import NmapKnowledgeBase
64 from scapy.packet import Packet
65 from scapy.supersocket import SuperSocket # noqa: F401
66 import scapy.asn1.asn1
67 import scapy.asn1.mib
69############
70# Config #
71############
74class ConfClass(object):
75 def configure(self, cnf):
76 # type: (ConfClass) -> None
77 self.__dict__ = cnf.__dict__.copy()
79 def __repr__(self):
80 # type: () -> str
81 return str(self)
83 def __str__(self):
84 # type: () -> str
85 s = ""
86 dkeys = self.__class__.__dict__.copy()
87 dkeys.update(self.__dict__)
88 keys = sorted(dkeys)
89 for i in keys:
90 if i[0] != "_":
91 r = repr(getattr(self, i))
92 r = " ".join(r.split())
93 wlen = 76 - max(len(i), 10)
94 if len(r) > wlen:
95 r = r[:wlen - 3] + "..."
96 s += "%-10s = %s\n" % (i, r)
97 return s[:-1]
100class Interceptor(object):
101 def __init__(self,
102 name, # type: str
103 default, # type: Any
104 hook, # type: Callable[..., Any]
105 args=None, # type: Optional[List[Any]]
106 kargs=None # type: Optional[Dict[str, Any]]
107 ):
108 # type: (...) -> None
109 self.name = name
110 self.intname = "_intercepted_%s" % name
111 self.default = default
112 self.hook = hook
113 self.args = args if args is not None else []
114 self.kargs = kargs if kargs is not None else {}
116 def __get__(self, obj, typ=None):
117 # type: (Conf, Optional[type]) -> Any
118 if not hasattr(obj, self.intname):
119 setattr(obj, self.intname, self.default)
120 return getattr(obj, self.intname)
122 @staticmethod
123 def set_from_hook(obj, name, val):
124 # type: (Conf, str, bool) -> None
125 int_name = "_intercepted_%s" % name
126 setattr(obj, int_name, val)
128 def __set__(self, obj, val):
129 # type: (Conf, Any) -> None
130 old = getattr(obj, self.intname, self.default)
131 val = self.hook(self.name, val, old, *self.args, **self.kargs)
132 setattr(obj, self.intname, val)
135def _readonly(name):
136 # type: (str) -> NoReturn
137 default = Conf.__dict__[name].default
138 Interceptor.set_from_hook(conf, name, default)
139 raise ValueError("Read-only value !")
142ReadOnlyAttribute = functools.partial(
143 Interceptor,
144 hook=(lambda name, *args, **kwargs: _readonly(name))
145)
146ReadOnlyAttribute.__doc__ = "Read-only class attribute"
149class ProgPath(ConfClass):
150 _default: str = "<System default>"
151 universal_open: str = "open" if DARWIN else "xdg-open"
152 pdfreader: str = universal_open
153 psreader: str = universal_open
154 svgreader: str = universal_open
155 dot: str = "dot"
156 display: str = "display"
157 tcpdump: str = "tcpdump"
158 tcpreplay: str = "tcpreplay"
159 hexedit: str = "hexer"
160 tshark: str = "tshark"
161 wireshark: str = "wireshark"
162 ifconfig: str = "ifconfig"
163 extcap_folders: List[str] = [
164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"),
165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap",
166 ]
169class ConfigFieldList:
170 def __init__(self):
171 # type: () -> None
172 self.fields = set() # type: Set[Any]
173 self.layers = set() # type: Set[Any]
175 @staticmethod
176 def _is_field(f):
177 # type: (Any) -> bool
178 return hasattr(f, "owners")
180 def _recalc_layer_list(self):
181 # type: () -> None
182 self.layers = {owner for f in self.fields for owner in f.owners}
184 def add(self, *flds):
185 # type: (*Any) -> None
186 self.fields |= {f for f in flds if self._is_field(f)}
187 self._recalc_layer_list()
189 def remove(self, *flds):
190 # type: (*Any) -> None
191 self.fields -= set(flds)
192 self._recalc_layer_list()
194 def __contains__(self, elt):
195 # type: (Any) -> bool
196 if isinstance(elt, BasePacket):
197 return elt in self.layers
198 return elt in self.fields
200 def __repr__(self):
201 # type: () -> str
202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501
205class Emphasize(ConfigFieldList):
206 pass
209class Resolve(ConfigFieldList):
210 pass
213class Num2Layer:
214 def __init__(self):
215 # type: () -> None
216 self.num2layer = {} # type: Dict[int, Type[Packet]]
217 self.layer2num = {} # type: Dict[Type[Packet], int]
219 def register(self, num, layer):
220 # type: (int, Type[Packet]) -> None
221 self.register_num2layer(num, layer)
222 self.register_layer2num(num, layer)
224 def register_num2layer(self, num, layer):
225 # type: (int, Type[Packet]) -> None
226 self.num2layer[num] = layer
228 def register_layer2num(self, num, layer):
229 # type: (int, Type[Packet]) -> None
230 self.layer2num[layer] = num
232 @overload
233 def __getitem__(self, item):
234 # type: (Type[Packet]) -> int
235 pass
237 @overload
238 def __getitem__(self, item): # noqa: F811
239 # type: (int) -> Type[Packet]
240 pass
242 def __getitem__(self, item): # noqa: F811
243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]]
244 if isinstance(item, int):
245 return self.num2layer[item]
246 else:
247 return self.layer2num[item]
249 def __contains__(self, item):
250 # type: (Union[int, Type[Packet]]) -> bool
251 if isinstance(item, int):
252 return item in self.num2layer
253 else:
254 return item in self.layer2num
256 def get(self,
257 item, # type: Union[int, Type[Packet]]
258 default=None, # type: Optional[Type[Packet]]
259 ):
260 # type: (...) -> Optional[Union[int, Type[Packet]]]
261 return self[item] if item in self else default
263 def __repr__(self):
264 # type: () -> str
265 lst = []
266 for num, layer in self.num2layer.items():
267 if layer in self.layer2num and self.layer2num[layer] == num:
268 dir = "<->"
269 else:
270 dir = " ->"
271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__,
272 layer._name)))
273 for layer, num in self.layer2num.items():
274 if num not in self.num2layer or self.num2layer[num] != layer:
275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__,
276 layer._name)))
277 lst.sort()
278 return "\n".join(y for x, y in lst)
281class LayersList(List[Type['scapy.packet.Packet']]):
282 def __init__(self):
283 # type: () -> None
284 list.__init__(self)
285 self.ldict = {} # type: Dict[str, List[Type[Packet]]]
286 self.filtered = False
287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501
289 def __repr__(self):
290 # type: () -> str
291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name)
292 for layer in self)
294 def register(self, layer):
295 # type: (Type[Packet]) -> None
296 self.append(layer)
298 # Skip arch* modules
299 if layer.__module__.startswith("scapy.arch."):
300 return
302 # Register in module
303 if layer.__module__ not in self.ldict:
304 self.ldict[layer.__module__] = []
305 self.ldict[layer.__module__].append(layer)
307 def layers(self):
308 # type: () -> List[Tuple[str, str]]
309 result = []
310 # This import may feel useless, but it is required for the eval below
311 import scapy # noqa: F401
312 try:
313 import builtins # noqa: F401
314 except ImportError:
315 import __builtin__ # noqa: F401
316 for lay in self.ldict:
317 try:
318 doc = eval(lay).__doc__
319 except AttributeError:
320 continue
321 result.append((lay, doc.strip().split("\n")[0] if doc else lay))
322 return result
324 def filter(self, items):
325 # type: (List[Type[Packet]]) -> None
326 """Disable dissection of unused layers to speed up dissection"""
327 if self.filtered:
328 raise ValueError("Already filtered. Please disable it first")
329 for lay in self.ldict.values():
330 for cls in lay:
331 if cls not in self._backup_dict:
332 self._backup_dict[cls] = cls.payload_guess[:]
333 cls.payload_guess = [
334 y for y in cls.payload_guess if y[1] in items
335 ]
336 self.filtered = True
338 def unfilter(self):
339 # type: () -> None
340 """Re-enable dissection for all layers"""
341 if not self.filtered:
342 raise ValueError("Not filtered. Please filter first")
343 for lay in self.ldict.values():
344 for cls in lay:
345 cls.payload_guess = self._backup_dict[cls]
346 self._backup_dict.clear()
347 self.filtered = False
350class CommandsList(List[Callable[..., Any]]):
351 def __repr__(self):
352 # type: () -> str
353 s = []
354 for li in sorted(self, key=lambda x: x.__name__):
355 doc = li.__doc__ if li.__doc__ else "--"
356 doc = doc.lstrip().split('\n', 1)[0]
357 s.append("%-22s: %s" % (li.__name__, doc))
358 return "\n".join(s)
360 def register(self, cmd):
361 # type: (DecoratorCallable) -> DecoratorCallable
362 self.append(cmd)
363 return cmd # return cmd so that method can be used as a decorator
366def lsc():
367 # type: () -> None
368 """Displays Scapy's default commands"""
369 print(repr(conf.commands))
372class CacheInstance(Dict[str, Any]):
373 __slots__ = ["timeout", "name", "_timetable"]
375 def __init__(self, name="noname", timeout=None):
376 # type: (str, Optional[int]) -> None
377 self.timeout = timeout
378 self.name = name
379 self._timetable = {} # type: Dict[str, float]
381 def flush(self):
382 # type: () -> None
383 self._timetable.clear()
384 self.clear()
386 def __getitem__(self, item):
387 # type: (str) -> Any
388 if item in self.__slots__:
389 return object.__getattribute__(self, item)
390 if not self.__contains__(item):
391 raise KeyError(item)
392 return super(CacheInstance, self).__getitem__(item)
394 def __contains__(self, item):
395 if not super(CacheInstance, self).__contains__(item):
396 return False
397 if self.timeout is not None:
398 t = self._timetable[item]
399 if time.time() - t > self.timeout:
400 return False
401 return True
403 def get(self, item, default=None):
404 # type: (str, Optional[Any]) -> Any
405 # overloading this method is needed to force the dict to go through
406 # the timetable check
407 try:
408 return self[item]
409 except KeyError:
410 return default
412 def __setitem__(self, item, v):
413 # type: (str, str) -> None
414 if item in self.__slots__:
415 return object.__setattr__(self, item, v)
416 self._timetable[item] = time.time()
417 super(CacheInstance, self).__setitem__(item, v)
419 def update(self,
420 other, # type: Any
421 **kwargs # type: Any
422 ):
423 # type: (...) -> None
424 for key, value in other.items():
425 # We only update an element from `other` either if it does
426 # not exist in `self` or if the entry in `self` is older.
427 if key not in self or self._timetable[key] < other._timetable[key]:
428 dict.__setitem__(self, key, value)
429 self._timetable[key] = other._timetable[key]
431 def iteritems(self):
432 # type: () -> Iterator[Tuple[str, Any]]
433 if self.timeout is None:
434 return super(CacheInstance, self).items()
435 t0 = time.time()
436 return (
437 (k, v)
438 for (k, v) in super(CacheInstance, self).items()
439 if t0 - self._timetable[k] < self.timeout
440 )
442 def iterkeys(self):
443 # type: () -> Iterator[str]
444 if self.timeout is None:
445 return super(CacheInstance, self).keys()
446 t0 = time.time()
447 return (
448 k
449 for k in super(CacheInstance, self).keys()
450 if t0 - self._timetable[k] < self.timeout
451 )
453 def __iter__(self):
454 # type: () -> Iterator[str]
455 return self.iterkeys()
457 def itervalues(self):
458 # type: () -> Iterator[Tuple[str, Any]]
459 if self.timeout is None:
460 return super(CacheInstance, self).values()
461 t0 = time.time()
462 return (
463 v
464 for (k, v) in super(CacheInstance, self).items()
465 if t0 - self._timetable[k] < self.timeout
466 )
468 def items(self):
469 # type: () -> Any
470 return list(self.iteritems())
472 def keys(self):
473 # type: () -> Any
474 return list(self.iterkeys())
476 def values(self):
477 # type: () -> Any
478 return list(self.itervalues())
480 def __len__(self):
481 # type: () -> int
482 if self.timeout is None:
483 return super(CacheInstance, self).__len__()
484 return len(self.keys())
486 def summary(self):
487 # type: () -> str
488 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501
490 def __repr__(self):
491 # type: () -> str
492 s = []
493 if self:
494 mk = max(len(k) for k in self)
495 fmt = "%%-%is %%s" % (mk + 1)
496 for item in self.items():
497 s.append(fmt % item)
498 return "\n".join(s)
500 def copy(self):
501 # type: () -> CacheInstance
502 return copy.copy(self)
505class NetCache:
506 def __init__(self):
507 # type: () -> None
508 self._caches_list = [] # type: List[CacheInstance]
510 def add_cache(self, cache):
511 # type: (CacheInstance) -> None
512 self._caches_list.append(cache)
513 setattr(self, cache.name, cache)
515 def new_cache(self, name, timeout=None):
516 # type: (str, Optional[int]) -> CacheInstance
517 c = CacheInstance(name=name, timeout=timeout)
518 self.add_cache(c)
519 return c
521 def __delattr__(self, attr):
522 # type: (str) -> NoReturn
523 raise AttributeError("Cannot delete attributes")
525 def update(self, other):
526 # type: (NetCache) -> None
527 for co in other._caches_list:
528 if hasattr(self, co.name):
529 getattr(self, co.name).update(co)
530 else:
531 self.add_cache(co.copy())
533 def flush(self):
534 # type: () -> None
535 for c in self._caches_list:
536 c.flush()
538 def __repr__(self):
539 # type: () -> str
540 return "\n".join(c.summary() for c in self._caches_list)
543class ScapyExt:
544 __slots__ = ["specs", "name", "version", "bash_completions"]
546 class MODE(Enum):
547 LAYERS = "layers"
548 CONTRIB = "contrib"
549 MODULES = "modules"
551 @dataclass
552 class ScapyExtSpec:
553 fullname: str
554 mode: 'ScapyExt.MODE'
555 spec: Any
556 default: bool
558 def __init__(self):
559 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {}
560 self.bash_completions = {}
562 def config(self, name, version):
563 self.name = name
564 self.version = version
566 def register(self, name, mode, path, default=None):
567 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !"
568 fullname = f"scapy.{mode.value}.{name}"
569 spec = importlib.util.spec_from_file_location(
570 fullname,
571 str(path),
572 )
573 spec = self.ScapyExtSpec(
574 fullname=fullname,
575 mode=mode,
576 spec=spec,
577 default=default or False,
578 )
579 if default is None:
580 spec.default = bool(importlib.util.find_spec(spec.fullname))
581 self.specs[fullname] = spec
583 def register_bashcompletion(self, script: pathlib.Path):
584 self.bash_completions[script.name] = script
586 def __repr__(self):
587 return "<ScapyExt %s %s (%s specs)>" % (
588 self.name,
589 self.version,
590 len(self.specs),
591 )
594class ExtsManager(importlib.abc.MetaPathFinder):
595 __slots__ = ["exts", "all_specs"]
597 GPLV2_LICENCES = [
598 "GPL-2.0-only",
599 "GPL-2.0-or-later",
600 ]
602 def __init__(self):
603 self.exts: List[ScapyExt] = []
604 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {}
605 self._loaded: List[str] = []
606 # Add to meta_path as we are an import provider
607 if self not in sys.meta_path:
608 sys.meta_path.append(self)
610 def find_spec(self, fullname, path, target=None):
611 if fullname in self.all_specs:
612 return self.all_specs[fullname].spec
614 def invalidate_caches(self):
615 pass
617 def _register_spec(self, spec):
618 # Register to known specs
619 self.all_specs[spec.fullname] = spec
621 # If default=True, inject it in the currently loaded modules
622 if spec.default:
623 loader = importlib.util.LazyLoader(spec.spec.loader)
624 spec.spec.loader = loader
625 module = importlib.util.module_from_spec(spec.spec)
626 sys.modules[spec.fullname] = module
627 loader.exec_module(module)
629 def load(self, extension: str):
630 """
631 Load a scapy extension.
633 :param extension: the name of the extension, as installed.
634 """
635 if extension in self._loaded:
636 return
638 try:
639 import importlib.metadata
640 except ImportError:
641 log_loading.warning(
642 "'%s' not loaded. "
643 "Scapy extensions require at least Python 3.8+ !" % extension
644 )
645 return
647 # Get extension distribution
648 try:
649 distr = importlib.metadata.distribution(extension)
650 except importlib.metadata.PackageNotFoundError:
651 log_loading.warning("The extension '%s' was not found !" % extension)
652 return
654 # Check the classifiers
655 if (
656 distr.metadata.get('License-Expression', None) not in self.GPLV2_LICENCES
657 and distr.metadata.get('License', None) not in self.GPLV2_LICENCES
658 ):
659 log_loading.warning(
660 "'%s' has no GPLv2 classifier therefore cannot be loaded." % extension
661 )
662 return
664 # Create the extension
665 ext = ScapyExt()
667 # Get the top-level declared "import packages"
668 # HACK: not available nicely in importlib :/
669 packages = distr.read_text("top_level.txt").split()
671 for package in packages:
672 scapy_ext = importlib.import_module(package)
674 # We initialize the plugin by calling it's 'scapy_ext' function
675 try:
676 scapy_ext_func = scapy_ext.scapy_ext
677 except AttributeError:
678 log_loading.warning(
679 "'%s' does not look like a Scapy plugin !" % extension
680 )
681 return
682 try:
683 scapy_ext_func(ext)
684 except Exception as ex:
685 log_loading.warning(
686 "'%s' failed during initialization with %s" % (
687 extension,
688 ex
689 )
690 )
691 return
693 # Register all the specs provided by this extension
694 for spec in ext.specs.values():
695 self._register_spec(spec)
697 # Add to the extension list
698 self.exts.append(ext)
699 self._loaded.append(extension)
701 # If there are bash autocompletions, add them
702 if ext.bash_completions:
703 from scapy.main import _add_bash_autocompletion
705 for name, script in ext.bash_completions.items():
706 _add_bash_autocompletion(name, script)
708 def loadall(self) -> None:
709 """
710 Load all extensions registered in conf.
711 """
712 for extension in conf.load_extensions:
713 self.load(extension)
715 def __repr__(self):
716 from scapy.utils import pretty_list
717 return pretty_list(
718 [
719 (x.name, x.version, [y.fullname for y in x.specs.values()])
720 for x in self.exts
721 ],
722 [("Name", "Version", "Specs")],
723 sortBy=0,
724 )
727def _version_checker(module, minver):
728 # type: (ModuleType, Tuple[int, ...]) -> bool
729 """Checks that module has a higher version that minver.
731 params:
732 - module: a module to test
733 - minver: a tuple of versions
734 """
735 # We could use LooseVersion, but distutils imports imp which is deprecated
736 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?'
737 version_tags_r = re.match(
738 version_regexp,
739 getattr(module, "__version__", "")
740 )
741 if not version_tags_r:
742 return False
743 version_tags_i = version_tags_r.group(1).split(".")
744 version_tags = tuple(int(x) for x in version_tags_i)
745 return bool(version_tags >= minver)
748def isCryptographyValid():
749 # type: () -> bool
750 """
751 Check if the cryptography module >= 2.0.0 is present. This is the minimum
752 version for most usages in Scapy.
753 """
754 # Check import
755 try:
756 import cryptography
757 except ImportError:
758 return False
760 # Check minimum version
761 return _version_checker(cryptography, (2, 0, 0))
764def isCryptographyAdvanced():
765 # type: () -> bool
766 """
767 Check if the cryptography module is present, and if it supports X25519,
768 ChaCha20Poly1305 and such.
770 Notes:
771 - cryptography >= 2.0 is required
772 - OpenSSL >= 1.1.0 is required
773 """
774 try:
775 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501
776 X25519PrivateKey.generate()
777 except Exception:
778 return False
779 else:
780 return True
783def isCryptographyBackendCompatible() -> bool:
784 """
785 Check if the cryptography backend is compatible
786 """
787 # Check for LibreSSL
788 try:
789 from cryptography.hazmat.backends import default_backend
790 if "LibreSSL" in default_backend().openssl_version_text():
791 # BUG: LibreSSL - https://marc.info/?l=libressl&m=173846028619304&w=2
792 # It takes 5 whole minutes to import RFC3526's modp parameters. This is
793 # not okay.
794 return False
795 return True
796 except Exception:
797 return True
800def isPyPy():
801 # type: () -> bool
802 """Returns either scapy is running under PyPy or not"""
803 try:
804 import __pypy__ # noqa: F401
805 return True
806 except ImportError:
807 return False
810def _prompt_changer(attr, val, old):
811 # type: (str, Any, Any) -> Any
812 """Change the current prompt theme"""
813 Interceptor.set_from_hook(conf, attr, val)
814 try:
815 sys.ps1 = conf.color_theme.prompt(conf.prompt)
816 except Exception:
817 pass
818 try:
819 apply_ipython_style(
820 get_ipython() # type: ignore
821 )
822 except NameError:
823 pass
824 return getattr(conf, attr, old)
827def _set_conf_sockets():
828 # type: () -> None
829 """Populate the conf.L2Socket and conf.L3Socket
830 according to the various use_* parameters
831 """
832 if conf.use_bpf and not BSD:
833 Interceptor.set_from_hook(conf, "use_bpf", False)
834 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !")
835 if not conf.use_pcap and SOLARIS:
836 Interceptor.set_from_hook(conf, "use_pcap", True)
837 raise ScapyInvalidPlatformException(
838 "Scapy only supports libpcap on Solaris !"
839 )
840 # we are already in an Interceptor hook, use Interceptor.set_from_hook
841 if conf.use_pcap:
842 try:
843 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \
844 L3pcapSocket
845 except (OSError, ImportError):
846 log_loading.warning("No libpcap provider available ! pcap won't be used")
847 Interceptor.set_from_hook(conf, "use_pcap", False)
848 else:
849 conf.L3socket = L3pcapSocket
850 conf.L3socket6 = functools.partial(
851 L3pcapSocket, filter="ip6")
852 conf.L2socket = L2pcapSocket
853 conf.L2listen = L2pcapListenSocket
854 elif conf.use_bpf:
855 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \
856 L2bpfSocket, L3bpfSocket
857 conf.L3socket = L3bpfSocket
858 conf.L3socket6 = functools.partial(
859 L3bpfSocket, filter="ip6")
860 conf.L2socket = L2bpfSocket
861 conf.L2listen = L2bpfListenSocket
862 elif LINUX:
863 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket
864 conf.L3socket = L3PacketSocket
865 conf.L3socket6 = cast(
866 "Type[SuperSocket]",
867 functools.partial(
868 L3PacketSocket,
869 filter="ip6"
870 )
871 )
872 conf.L2socket = L2Socket
873 conf.L2listen = L2ListenSocket
874 elif WINDOWS:
875 from scapy.arch.windows import _NotAvailableSocket
876 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6
877 conf.L3socket = L3WinSocket
878 conf.L3socket6 = L3WinSocket6
879 conf.L2socket = _NotAvailableSocket
880 conf.L2listen = _NotAvailableSocket
881 else:
882 from scapy.supersocket import L3RawSocket, L3RawSocket6
883 conf.L3socket = L3RawSocket
884 conf.L3socket6 = L3RawSocket6
885 # Reload the interfaces
886 conf.ifaces.reload()
889def _socket_changer(attr, val, old):
890 # type: (str, bool, bool) -> Any
891 if not isinstance(val, bool):
892 raise TypeError("This argument should be a boolean")
893 Interceptor.set_from_hook(conf, attr, val)
894 dependencies = { # Things that will be turned off
895 "use_pcap": ["use_bpf"],
896 "use_bpf": ["use_pcap"],
897 }
898 restore = {k: getattr(conf, k) for k in dependencies}
899 del restore[attr] # This is handled directly by _set_conf_sockets
900 if val: # Only if True
901 for param in dependencies[attr]:
902 Interceptor.set_from_hook(conf, param, False)
903 try:
904 _set_conf_sockets()
905 except (ScapyInvalidPlatformException, ImportError) as e:
906 for key, value in restore.items():
907 Interceptor.set_from_hook(conf, key, value)
908 if isinstance(e, ScapyInvalidPlatformException):
909 raise
910 return getattr(conf, attr)
913def _loglevel_changer(attr, val, old):
914 # type: (str, int, int) -> int
915 """Handle a change of conf.logLevel"""
916 log_scapy.setLevel(val)
917 return val
920def _iface_changer(attr, val, old):
921 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface'
922 """Resolves the interface in conf.iface"""
923 if isinstance(val, str):
924 from scapy.interfaces import resolve_iface
925 iface = resolve_iface(val)
926 if old and iface.dummy:
927 warning(
928 "This interface is not specified in any provider ! "
929 "See conf.ifaces output"
930 )
931 return iface
932 return val
935def _reset_tls_nss_keys(attr, val, old):
936 # type: (str, Any, Any) -> Any
937 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes"""
938 conf.tls_nss_keys = None
939 return val
942class Conf(ConfClass):
943 """
944 This object contains the configuration of Scapy.
945 """
946 version: str = ReadOnlyAttribute("version", VERSION)
947 session: str = "" #: filename where the session will be saved
948 interactive = False
949 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto".
950 #: Default: Auto
951 interactive_shell = "auto"
952 #: Configuration for "ipython" to use jedi (disabled by default)
953 ipython_use_jedi = False
954 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...)
955 stealth = "not implemented"
956 #: selects the default output interface for srp() and sendp().
957 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501
958 layers: LayersList = LayersList()
959 commands = CommandsList() # type: CommandsList
960 #: Codec used by default for ASN1 objects
961 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec'
962 #: Default size for ASN1 objects
963 ASN1_default_long_size = 0
964 #: choose the AS resolver class to use
965 AS_resolver = None # type: scapy.as_resolvers.AS_resolver
966 dot15d4_protocol = None # Used in dot15d4.py
967 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer)
968 #: if 0, doesn't check that IPID matches between IP sent and
969 #: ICMP IP citation received
970 #: if 1, checks that they either are equal or byte swapped
971 #: equals (bug in some IP stacks)
972 #: if 2, strictly checks that they are equals
973 checkIPID = False
974 #: if 1, checks IP src in IP and ICMP IP citation match
975 #: (bug in some NAT stacks)
976 checkIPsrc = True
977 checkIPaddr = True
978 #: if True, checks that IP-in-IP layers match. If False, do
979 #: not check IP layers that encapsulates another IP layer
980 checkIPinIP = True
981 #: if 1, also check that TCP seq and ack match the
982 #: ones in ICMP citation
983 check_TCPerror_seqack = False
984 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose)
985 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer)
986 #: default mode for the promiscuous mode of a socket (to get answers if you
987 #: spoof on a lan)
988 sniff_promisc = True # type: bool
989 raw_layer = None # type: Type[Packet]
990 raw_summary = False # type: Union[bool, Callable[[bytes], Any]]
991 padding_layer = None # type: Type[Packet]
992 default_l2 = None # type: Type[Packet]
993 l2types: Num2Layer = Num2Layer()
994 l3types: Num2Layer = Num2Layer()
995 L3socket = None # type: Type[scapy.supersocket.SuperSocket]
996 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket]
997 L2socket = None # type: Type[scapy.supersocket.SuperSocket]
998 L2listen = None # type: Type[scapy.supersocket.SuperSocket]
999 BTsocket = None # type: Type[scapy.supersocket.SuperSocket]
1000 min_pkt_size = 60
1001 #: holds MIB direct access dictionary
1002 mib = None # type: 'scapy.asn1.mib.MIBDict'
1003 bufsize = 2**16
1004 #: history file
1005 histfile: str = os.getenv(
1006 'SCAPY_HISTFILE',
1007 os.path.join(
1008 os.path.expanduser("~"),
1009 ".config", "scapy", "history"
1010 )
1011 )
1012 #: includes padding in disassembled packets
1013 padding = 1
1014 #: BPF filter for packets to ignore
1015 except_filter = ""
1016 #: bpf filter added to every sniffing socket to exclude traffic
1017 #: from analysis
1018 filter = ""
1019 #: when 1, store received packet that are not matched into `debug.recv`
1020 debug_match = False
1021 #: When 1, print some TLS session secrets when they are computed, and
1022 #: warn about the session recognition.
1023 debug_tls = False
1024 wepkey = ""
1025 #: holds the Scapy interface list and manager
1026 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict'
1027 #: holds the cache of interfaces loaded from Libpcap
1028 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str, int]]
1029 # `neighbor` will be filed by scapy.layers.l2
1030 neighbor = None # type: 'scapy.layers.l2.Neighbor'
1031 #: holds the name servers IP/hosts used for custom DNS resolution
1032 nameservers = None # type: str
1033 #: automatically load IPv4 routes on startup. Disable this if your
1034 #: routing table is too big.
1035 route_autoload = True
1036 #: automatically load IPv6 routes on startup. Disable this if your
1037 #: routing table is too big.
1038 route6_autoload = True
1039 #: holds the Scapy IPv4 routing table and provides methods to
1040 #: manipulate it
1041 route = None # type: 'scapy.route.Route'
1042 # `route` will be filed by route.py
1043 #: holds the Scapy IPv6 routing table and provides methods to
1044 #: manipulate it
1045 route6 = None # type: 'scapy.route6.Route6'
1046 manufdb = None # type: 'scapy.data.ManufDA'
1047 ethertypes = None # type: 'scapy.data.EtherDA'
1048 protocols = None # type: 'scapy.dadict.DADict[int, str]'
1049 services_udp = None # type: 'scapy.dadict.DADict[int, str]'
1050 services_tcp = None # type: 'scapy.dadict.DADict[int, str]'
1051 services_sctp = None # type: 'scapy.dadict.DADict[int, str]'
1052 # 'route6' will be filed by route6.py
1053 teredoPrefix = "" # type: str
1054 teredoServerPort = None # type: int
1055 auto_fragment = True
1056 #: raise exception when a packet dissector raises an exception
1057 debug_dissector = False
1058 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer)
1059 #: how much time between warnings from the same place
1060 warning_threshold = 5
1061 prog: ProgPath = ProgPath()
1062 #: holds list of fields for which resolution should be done
1063 resolve: Resolve = Resolve()
1064 #: holds list of enum fields for which conversion to string
1065 #: should NOT be done
1066 noenum: Resolve = Resolve()
1067 emph: Emphasize = Emphasize()
1068 #: read only attribute to show if PyPy is in use
1069 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy())
1070 #: use libpcap integration or not. Changing this value will update
1071 #: the conf.L[2/3] sockets
1072 use_pcap: bool = Interceptor(
1073 "use_pcap",
1074 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"),
1075 _socket_changer
1076 )
1077 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer)
1078 use_npcap = False
1079 ipv6_enabled: bool = socket.has_ipv6
1080 stats_classic_protocols = [] # type: List[Type[Packet]]
1081 stats_dot11_protocols = [] # type: List[Type[Packet]]
1082 temp_files = [] # type: List[str]
1083 #: netcache holds time-based caches for net operations
1084 netcache: NetCache = NetCache()
1085 geoip_city = None
1086 #: Scapy extensions that are loaded automatically on load
1087 load_extensions: List[str] = []
1088 # can, tls, http and a few others are not loaded by default
1089 load_layers: List[str] = [
1090 'bluetooth',
1091 'bluetooth4LE',
1092 'dcerpc',
1093 'dhcp',
1094 'dhcp6',
1095 'dns',
1096 'dot11',
1097 'dot15d4',
1098 'eap',
1099 'gprs',
1100 'gssapi',
1101 'hsrp',
1102 'inet',
1103 'inet6',
1104 'ipsec',
1105 'ir',
1106 'isakmp',
1107 'kerberos',
1108 'l2',
1109 'l2tp',
1110 'ldap',
1111 'llmnr',
1112 'lltd',
1113 'mgcp',
1114 'msrpce.rpcclient',
1115 'msrpce.rpcserver',
1116 'mobileip',
1117 'netbios',
1118 'netflow',
1119 'ntlm',
1120 'ntp',
1121 'ppi',
1122 'ppp',
1123 'pptp',
1124 'radius',
1125 'rip',
1126 'rtp',
1127 'sctp',
1128 'sixlowpan',
1129 'skinny',
1130 'smb',
1131 'smb2',
1132 'smbclient',
1133 'smbserver',
1134 'snmp',
1135 'spnego',
1136 'tftp',
1137 'vrrp',
1138 'vxlan',
1139 'x509',
1140 'zigbee'
1141 ]
1142 #: a dict which can be used by contrib layers to store local
1143 #: configuration
1144 contribs = dict() # type: Dict[str, Any]
1145 exts: ExtsManager = ExtsManager()
1146 crypto_valid = isCryptographyValid()
1147 crypto_valid_advanced = isCryptographyAdvanced()
1148 #: controls whether or not to display the fancy banner
1149 fancy_banner = True
1150 #: controls whether tables (conf.iface, conf.route...) should be cropped
1151 #: to fit the terminal
1152 auto_crop_tables = True
1153 #: how often to check for new packets.
1154 #: Defaults to 0.05s.
1155 recv_poll_rate = 0.05
1156 #: When True, raise exception if no dst MAC found otherwise broadcast.
1157 #: Default is False.
1158 raise_no_dst_mac = False
1159 loopback_name: str = "lo" if LINUX else "lo0"
1160 nmap_base = "" # type: str
1161 nmap_kdb = None # type: Optional[NmapKnowledgeBase]
1162 #: a safety mechanism: the maximum amount of items included in a PacketListField
1163 #: or a FieldListField
1164 max_list_count = 100
1165 #: When the TLS module is loaded (not by default), the following turns on sessions
1166 tls_session_enable = False
1167 #: Filename containing NSS Keys Log
1168 tls_nss_filename = Interceptor(
1169 "tls_nss_filename",
1170 None,
1171 _reset_tls_nss_keys
1172 )
1173 #: Dictionary containing parsed NSS Keys
1174 tls_nss_keys: Dict[str, bytes] = None
1175 #: Whether to use NDR64 by default instead of NDR 32
1176 ndr64: bool = True
1177 #: When TCPSession is used, parse DCE/RPC sessions automatically.
1178 #: This should be used for passive sniffing.
1179 dcerpc_session_enable = False
1180 #: If a capture is missing the first DCE/RPC binding message, we might incorrectly
1181 #: assume that header signing isn't used. This forces it on.
1182 dcerpc_force_header_signing = False
1183 #: Windows SSPs for sniffing. This is used with
1184 #: dcerpc_session_enable
1185 winssps_passive = []
1186 #: Disables auto-stripping of StrFixedLenField for debugging purposes
1187 debug_strfixedlenfield = False
1189 def __getattribute__(self, attr):
1190 # type: (str) -> Any
1191 # Those are loaded on runtime to avoid import loops
1192 if attr == "manufdb":
1193 from scapy.data import MANUFDB
1194 return MANUFDB
1195 if attr == "ethertypes":
1196 from scapy.data import ETHER_TYPES
1197 return ETHER_TYPES
1198 if attr == "protocols":
1199 from scapy.data import IP_PROTOS
1200 return IP_PROTOS
1201 if attr == "services_udp":
1202 from scapy.data import UDP_SERVICES
1203 return UDP_SERVICES
1204 if attr == "services_tcp":
1205 from scapy.data import TCP_SERVICES
1206 return TCP_SERVICES
1207 if attr == "services_sctp":
1208 from scapy.data import SCTP_SERVICES
1209 return SCTP_SERVICES
1210 if attr == "iface6":
1211 warnings.warn(
1212 "conf.iface6 is deprecated in favor of conf.iface",
1213 DeprecationWarning
1214 )
1215 attr = "iface"
1216 return object.__getattribute__(self, attr)
1219if not Conf.ipv6_enabled:
1220 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501
1221 for m in ["inet6", "dhcp6", "sixlowpan"]:
1222 if m in Conf.load_layers:
1223 Conf.load_layers.remove(m)
1225conf = Conf() # type: Conf
1228if not isCryptographyBackendCompatible():
1229 conf.crypto_valid = False
1230 conf.crypto_valid_advanced = False
1231 log_scapy.error(
1232 "Scapy does not support LibreSSL as a backend to cryptography ! "
1233 "See https://cryptography.io/en/latest/installation/#static-wheels "
1234 "for instructions on how to recompile cryptography with another "
1235 "backend."
1236 )
1239def crypto_validator(func):
1240 # type: (DecoratorCallable) -> DecoratorCallable
1241 """
1242 This a decorator to be used for any method relying on the cryptography library. # noqa: E501
1243 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'.
1244 """
1245 def func_in(*args, **kwargs):
1246 # type: (*Any, **Any) -> Any
1247 if not conf.crypto_valid:
1248 raise ImportError("Cannot execute crypto-related method! "
1249 "Please install python-cryptography v2.0 or later.") # noqa: E501
1250 return func(*args, **kwargs)
1251 return func_in
1254def scapy_delete_temp_files():
1255 # type: () -> None
1256 for f in conf.temp_files:
1257 try:
1258 os.unlink(f)
1259 except Exception:
1260 pass
1261 del conf.temp_files[:]
1264atexit.register(scapy_delete_temp_files)