Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/config.py: 54%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Implementation of the configuration object.
8"""
11import atexit
12import copy
13import functools
14import os
15import re
16import socket
17import sys
18import time
19import warnings
21from dataclasses import dataclass
22from enum import Enum
24import importlib
25import importlib.abc
26import importlib.util
28import scapy
29from scapy import VERSION
30from scapy.base_classes import BasePacket
31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS
32from scapy.error import (
33 log_loading,
34 log_scapy,
35 ScapyInvalidPlatformException,
36 warning,
37)
38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style
40# Typing imports
41from typing import (
42 cast,
43 Any,
44 Callable,
45 Dict,
46 Iterator,
47 List,
48 NoReturn,
49 Optional,
50 Set,
51 Tuple,
52 Type,
53 Union,
54 overload,
55 TYPE_CHECKING,
56)
57from types import ModuleType
58from scapy.compat import DecoratorCallable
60if TYPE_CHECKING:
61 # Do not import at runtime
62 import scapy.as_resolvers
63 from scapy.modules.nmap import NmapKnowledgeBase
64 from scapy.packet import Packet
65 from scapy.supersocket import SuperSocket # noqa: F401
66 import scapy.asn1.asn1
67 import scapy.asn1.mib
69############
70# Config #
71############
74class ConfClass(object):
75 def configure(self, cnf):
76 # type: (ConfClass) -> None
77 self.__dict__ = cnf.__dict__.copy()
79 def __repr__(self):
80 # type: () -> str
81 return str(self)
83 def __str__(self):
84 # type: () -> str
85 s = ""
86 dkeys = self.__class__.__dict__.copy()
87 dkeys.update(self.__dict__)
88 keys = sorted(dkeys)
89 for i in keys:
90 if i[0] != "_":
91 r = repr(getattr(self, i))
92 r = " ".join(r.split())
93 wlen = 76 - max(len(i), 10)
94 if len(r) > wlen:
95 r = r[:wlen - 3] + "..."
96 s += "%-10s = %s\n" % (i, r)
97 return s[:-1]
100class Interceptor(object):
101 def __init__(self,
102 name, # type: str
103 default, # type: Any
104 hook, # type: Callable[..., Any]
105 args=None, # type: Optional[List[Any]]
106 kargs=None # type: Optional[Dict[str, Any]]
107 ):
108 # type: (...) -> None
109 self.name = name
110 self.intname = "_intercepted_%s" % name
111 self.default = default
112 self.hook = hook
113 self.args = args if args is not None else []
114 self.kargs = kargs if kargs is not None else {}
116 def __get__(self, obj, typ=None):
117 # type: (Conf, Optional[type]) -> Any
118 if not hasattr(obj, self.intname):
119 setattr(obj, self.intname, self.default)
120 return getattr(obj, self.intname)
122 @staticmethod
123 def set_from_hook(obj, name, val):
124 # type: (Conf, str, bool) -> None
125 int_name = "_intercepted_%s" % name
126 setattr(obj, int_name, val)
128 def __set__(self, obj, val):
129 # type: (Conf, Any) -> None
130 old = getattr(obj, self.intname, self.default)
131 val = self.hook(self.name, val, old, *self.args, **self.kargs)
132 setattr(obj, self.intname, val)
135def _readonly(name):
136 # type: (str) -> NoReturn
137 default = Conf.__dict__[name].default
138 Interceptor.set_from_hook(conf, name, default)
139 raise ValueError("Read-only value !")
142ReadOnlyAttribute = functools.partial(
143 Interceptor,
144 hook=(lambda name, *args, **kwargs: _readonly(name))
145)
146ReadOnlyAttribute.__doc__ = "Read-only class attribute"
149class ProgPath(ConfClass):
150 _default: str = "<System default>"
151 universal_open: str = "open" if DARWIN else "xdg-open"
152 pdfreader: str = universal_open
153 psreader: str = universal_open
154 svgreader: str = universal_open
155 dot: str = "dot"
156 display: str = "display"
157 tcpdump: str = "tcpdump"
158 tcpreplay: str = "tcpreplay"
159 hexedit: str = "hexer"
160 tshark: str = "tshark"
161 wireshark: str = "wireshark"
162 ifconfig: str = "ifconfig"
163 extcap_folders: List[str] = [
164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"),
165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap",
166 ]
169class ConfigFieldList:
170 def __init__(self):
171 # type: () -> None
172 self.fields = set() # type: Set[Any]
173 self.layers = set() # type: Set[Any]
175 @staticmethod
176 def _is_field(f):
177 # type: (Any) -> bool
178 return hasattr(f, "owners")
180 def _recalc_layer_list(self):
181 # type: () -> None
182 self.layers = {owner for f in self.fields for owner in f.owners}
184 def add(self, *flds):
185 # type: (*Any) -> None
186 self.fields |= {f for f in flds if self._is_field(f)}
187 self._recalc_layer_list()
189 def remove(self, *flds):
190 # type: (*Any) -> None
191 self.fields -= set(flds)
192 self._recalc_layer_list()
194 def __contains__(self, elt):
195 # type: (Any) -> bool
196 if isinstance(elt, BasePacket):
197 return elt in self.layers
198 return elt in self.fields
200 def __repr__(self):
201 # type: () -> str
202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501
205class Emphasize(ConfigFieldList):
206 pass
209class Resolve(ConfigFieldList):
210 pass
213class Num2Layer:
214 def __init__(self):
215 # type: () -> None
216 self.num2layer = {} # type: Dict[int, Type[Packet]]
217 self.layer2num = {} # type: Dict[Type[Packet], int]
219 def register(self, num, layer):
220 # type: (int, Type[Packet]) -> None
221 self.register_num2layer(num, layer)
222 self.register_layer2num(num, layer)
224 def register_num2layer(self, num, layer):
225 # type: (int, Type[Packet]) -> None
226 self.num2layer[num] = layer
228 def register_layer2num(self, num, layer):
229 # type: (int, Type[Packet]) -> None
230 self.layer2num[layer] = num
232 @overload
233 def __getitem__(self, item):
234 # type: (Type[Packet]) -> int
235 pass
237 @overload
238 def __getitem__(self, item): # noqa: F811
239 # type: (int) -> Type[Packet]
240 pass
242 def __getitem__(self, item): # noqa: F811
243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]]
244 if isinstance(item, int):
245 return self.num2layer[item]
246 else:
247 return self.layer2num[item]
249 def __contains__(self, item):
250 # type: (Union[int, Type[Packet]]) -> bool
251 if isinstance(item, int):
252 return item in self.num2layer
253 else:
254 return item in self.layer2num
256 def get(self,
257 item, # type: Union[int, Type[Packet]]
258 default=None, # type: Optional[Type[Packet]]
259 ):
260 # type: (...) -> Optional[Union[int, Type[Packet]]]
261 return self[item] if item in self else default
263 def __repr__(self):
264 # type: () -> str
265 lst = []
266 for num, layer in self.num2layer.items():
267 if layer in self.layer2num and self.layer2num[layer] == num:
268 dir = "<->"
269 else:
270 dir = " ->"
271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__,
272 layer._name)))
273 for layer, num in self.layer2num.items():
274 if num not in self.num2layer or self.num2layer[num] != layer:
275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__,
276 layer._name)))
277 lst.sort()
278 return "\n".join(y for x, y in lst)
281class LayersList(List[Type['scapy.packet.Packet']]):
282 def __init__(self):
283 # type: () -> None
284 list.__init__(self)
285 self.ldict = {} # type: Dict[str, List[Type[Packet]]]
286 self.filtered = False
287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501
289 def __repr__(self):
290 # type: () -> str
291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name)
292 for layer in self)
294 def register(self, layer):
295 # type: (Type[Packet]) -> None
296 self.append(layer)
298 # Skip arch* modules
299 if layer.__module__.startswith("scapy.arch."):
300 return
302 # Register in module
303 if layer.__module__ not in self.ldict:
304 self.ldict[layer.__module__] = []
305 self.ldict[layer.__module__].append(layer)
307 def layers(self):
308 # type: () -> List[Tuple[str, str]]
309 result = []
310 # This import may feel useless, but it is required for the eval below
311 import scapy # noqa: F401
312 try:
313 import builtins # noqa: F401
314 except ImportError:
315 import __builtin__ # noqa: F401
316 for lay in self.ldict:
317 doc = eval(lay).__doc__
318 result.append((lay, doc.strip().split("\n")[0] if doc else lay))
319 return result
321 def filter(self, items):
322 # type: (List[Type[Packet]]) -> None
323 """Disable dissection of unused layers to speed up dissection"""
324 if self.filtered:
325 raise ValueError("Already filtered. Please disable it first")
326 for lay in self.ldict.values():
327 for cls in lay:
328 if cls not in self._backup_dict:
329 self._backup_dict[cls] = cls.payload_guess[:]
330 cls.payload_guess = [
331 y for y in cls.payload_guess if y[1] in items
332 ]
333 self.filtered = True
335 def unfilter(self):
336 # type: () -> None
337 """Re-enable dissection for all layers"""
338 if not self.filtered:
339 raise ValueError("Not filtered. Please filter first")
340 for lay in self.ldict.values():
341 for cls in lay:
342 cls.payload_guess = self._backup_dict[cls]
343 self._backup_dict.clear()
344 self.filtered = False
347class CommandsList(List[Callable[..., Any]]):
348 def __repr__(self):
349 # type: () -> str
350 s = []
351 for li in sorted(self, key=lambda x: x.__name__):
352 doc = li.__doc__ if li.__doc__ else "--"
353 doc = doc.lstrip().split('\n', 1)[0]
354 s.append("%-22s: %s" % (li.__name__, doc))
355 return "\n".join(s)
357 def register(self, cmd):
358 # type: (DecoratorCallable) -> DecoratorCallable
359 self.append(cmd)
360 return cmd # return cmd so that method can be used as a decorator
363def lsc():
364 # type: () -> None
365 """Displays Scapy's default commands"""
366 print(repr(conf.commands))
369class CacheInstance(Dict[str, Any]):
370 __slots__ = ["timeout", "name", "_timetable"]
372 def __init__(self, name="noname", timeout=None):
373 # type: (str, Optional[int]) -> None
374 self.timeout = timeout
375 self.name = name
376 self._timetable = {} # type: Dict[str, float]
378 def flush(self):
379 # type: () -> None
380 self._timetable.clear()
381 self.clear()
383 def __getitem__(self, item):
384 # type: (str) -> Any
385 if item in self.__slots__:
386 return object.__getattribute__(self, item)
387 if not self.__contains__(item):
388 raise KeyError(item)
389 return super(CacheInstance, self).__getitem__(item)
391 def __contains__(self, item):
392 if not super(CacheInstance, self).__contains__(item):
393 return False
394 if self.timeout is not None:
395 t = self._timetable[item]
396 if time.time() - t > self.timeout:
397 return False
398 return True
400 def get(self, item, default=None):
401 # type: (str, Optional[Any]) -> Any
402 # overloading this method is needed to force the dict to go through
403 # the timetable check
404 try:
405 return self[item]
406 except KeyError:
407 return default
409 def __setitem__(self, item, v):
410 # type: (str, str) -> None
411 if item in self.__slots__:
412 return object.__setattr__(self, item, v)
413 self._timetable[item] = time.time()
414 super(CacheInstance, self).__setitem__(item, v)
416 def update(self,
417 other, # type: Any
418 **kwargs # type: Any
419 ):
420 # type: (...) -> None
421 for key, value in other.items():
422 # We only update an element from `other` either if it does
423 # not exist in `self` or if the entry in `self` is older.
424 if key not in self or self._timetable[key] < other._timetable[key]:
425 dict.__setitem__(self, key, value)
426 self._timetable[key] = other._timetable[key]
428 def iteritems(self):
429 # type: () -> Iterator[Tuple[str, Any]]
430 if self.timeout is None:
431 return super(CacheInstance, self).items()
432 t0 = time.time()
433 return (
434 (k, v)
435 for (k, v) in super(CacheInstance, self).items()
436 if t0 - self._timetable[k] < self.timeout
437 )
439 def iterkeys(self):
440 # type: () -> Iterator[str]
441 if self.timeout is None:
442 return super(CacheInstance, self).keys()
443 t0 = time.time()
444 return (
445 k
446 for k in super(CacheInstance, self).keys()
447 if t0 - self._timetable[k] < self.timeout
448 )
450 def __iter__(self):
451 # type: () -> Iterator[str]
452 return self.iterkeys()
454 def itervalues(self):
455 # type: () -> Iterator[Tuple[str, Any]]
456 if self.timeout is None:
457 return super(CacheInstance, self).values()
458 t0 = time.time()
459 return (
460 v
461 for (k, v) in super(CacheInstance, self).items()
462 if t0 - self._timetable[k] < self.timeout
463 )
465 def items(self):
466 # type: () -> Any
467 return list(self.iteritems())
469 def keys(self):
470 # type: () -> Any
471 return list(self.iterkeys())
473 def values(self):
474 # type: () -> Any
475 return list(self.itervalues())
477 def __len__(self):
478 # type: () -> int
479 if self.timeout is None:
480 return super(CacheInstance, self).__len__()
481 return len(self.keys())
483 def summary(self):
484 # type: () -> str
485 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501
487 def __repr__(self):
488 # type: () -> str
489 s = []
490 if self:
491 mk = max(len(k) for k in self)
492 fmt = "%%-%is %%s" % (mk + 1)
493 for item in self.items():
494 s.append(fmt % item)
495 return "\n".join(s)
497 def copy(self):
498 # type: () -> CacheInstance
499 return copy.copy(self)
502class NetCache:
503 def __init__(self):
504 # type: () -> None
505 self._caches_list = [] # type: List[CacheInstance]
507 def add_cache(self, cache):
508 # type: (CacheInstance) -> None
509 self._caches_list.append(cache)
510 setattr(self, cache.name, cache)
512 def new_cache(self, name, timeout=None):
513 # type: (str, Optional[int]) -> CacheInstance
514 c = CacheInstance(name=name, timeout=timeout)
515 self.add_cache(c)
516 return c
518 def __delattr__(self, attr):
519 # type: (str) -> NoReturn
520 raise AttributeError("Cannot delete attributes")
522 def update(self, other):
523 # type: (NetCache) -> None
524 for co in other._caches_list:
525 if hasattr(self, co.name):
526 getattr(self, co.name).update(co)
527 else:
528 self.add_cache(co.copy())
530 def flush(self):
531 # type: () -> None
532 for c in self._caches_list:
533 c.flush()
535 def __repr__(self):
536 # type: () -> str
537 return "\n".join(c.summary() for c in self._caches_list)
540class ScapyExt:
541 __slots__ = ["specs", "name", "version"]
543 class MODE(Enum):
544 LAYERS = "layers"
545 CONTRIB = "contrib"
546 MODULES = "modules"
548 @dataclass
549 class ScapyExtSpec:
550 fullname: str
551 mode: 'ScapyExt.MODE'
552 spec: Any
553 default: bool
555 def __init__(self):
556 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {}
558 def config(self, name, version):
559 self.name = name
560 self.version = version
562 def register(self, name, mode, path, default=None):
563 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !"
564 fullname = f"scapy.{mode.value}.{name}"
565 spec = importlib.util.spec_from_file_location(
566 fullname,
567 str(path),
568 )
569 spec = self.ScapyExtSpec(
570 fullname=fullname,
571 mode=mode,
572 spec=spec,
573 default=default or False,
574 )
575 if default is None:
576 spec.default = bool(importlib.util.find_spec(spec.fullname))
577 self.specs[fullname] = spec
579 def __repr__(self):
580 return "<ScapyExt %s %s (%s specs)>" % (
581 self.name,
582 self.version,
583 len(self.specs),
584 )
587class ExtsManager(importlib.abc.MetaPathFinder):
588 __slots__ = ["exts", "_loaded", "all_specs"]
590 SCAPY_PLUGIN_CLASSIFIER = 'Framework :: Scapy'
591 GPLV2_CLASSIFIERS = [
592 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)',
593 'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)',
594 ]
596 def __init__(self):
597 self.exts: List[ScapyExt] = []
598 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {}
599 self._loaded = []
601 def find_spec(self, fullname, path, target=None):
602 if fullname in self.all_specs:
603 return self.all_specs[fullname].spec
605 def invalidate_caches(self):
606 pass
608 def _register_spec(self, spec):
609 self.all_specs[spec.fullname] = spec
610 if spec.default:
611 loader = importlib.util.LazyLoader(spec.spec.loader)
612 spec.spec.loader = loader
613 module = importlib.util.module_from_spec(spec.spec)
614 sys.modules[spec.fullname] = module
615 loader.exec_module(module)
617 def load(self):
618 try:
619 import importlib.metadata
620 except ImportError:
621 return
622 for distr in importlib.metadata.distributions():
623 if any(
624 v == self.SCAPY_PLUGIN_CLASSIFIER
625 for k, v in distr.metadata.items() if k == 'Classifier'
626 ):
627 try:
628 # Python 3.13 raises an internal warning when calling this
629 with warnings.catch_warnings():
630 warnings.filterwarnings("ignore", category=DeprecationWarning)
631 pkg = next(
632 k
633 for k, v in
634 importlib.metadata.packages_distributions().items()
635 if distr.name in v
636 )
637 except KeyError:
638 pkg = distr.name
639 if pkg in self._loaded:
640 continue
641 if not any(
642 v in self.GPLV2_CLASSIFIERS
643 for k, v in distr.metadata.items() if k == 'Classifier'
644 ):
645 log_loading.warning(
646 "'%s' has no GPLv2 classifier therefore cannot be loaded." % pkg # noqa: E501
647 )
648 continue
649 self._loaded.append(pkg)
650 ext = ScapyExt()
651 try:
652 scapy_ext = importlib.import_module(pkg)
653 except Exception as ex:
654 log_loading.warning(
655 "'%s' failed during import with %s" % (
656 pkg,
657 ex
658 )
659 )
660 continue
661 try:
662 scapy_ext_func = scapy_ext.scapy_ext
663 except AttributeError:
664 log_loading.info(
665 "'%s' included the Scapy Framework specifier "
666 "but did not include a scapy_ext" % pkg
667 )
668 continue
669 try:
670 scapy_ext_func(ext)
671 except Exception as ex:
672 log_loading.warning(
673 "'%s' failed during initialization with %s" % (
674 pkg,
675 ex
676 )
677 )
678 continue
679 for spec in ext.specs.values():
680 self._register_spec(spec)
681 self.exts.append(ext)
682 if self not in sys.meta_path:
683 sys.meta_path.append(self)
685 def __repr__(self):
686 from scapy.utils import pretty_list
687 return pretty_list(
688 [
689 (x.name, x.version, [y.fullname for y in x.specs.values()])
690 for x in self.exts
691 ],
692 [("Name", "Version", "Specs")],
693 sortBy=0,
694 )
697def _version_checker(module, minver):
698 # type: (ModuleType, Tuple[int, ...]) -> bool
699 """Checks that module has a higher version that minver.
701 params:
702 - module: a module to test
703 - minver: a tuple of versions
704 """
705 # We could use LooseVersion, but distutils imports imp which is deprecated
706 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?'
707 version_tags_r = re.match(
708 version_regexp,
709 getattr(module, "__version__", "")
710 )
711 if not version_tags_r:
712 return False
713 version_tags_i = version_tags_r.group(1).split(".")
714 version_tags = tuple(int(x) for x in version_tags_i)
715 return bool(version_tags >= minver)
718def isCryptographyValid():
719 # type: () -> bool
720 """
721 Check if the cryptography module >= 2.0.0 is present. This is the minimum
722 version for most usages in Scapy.
723 """
724 try:
725 import cryptography
726 except ImportError:
727 return False
728 return _version_checker(cryptography, (2, 0, 0))
731def isCryptographyAdvanced():
732 # type: () -> bool
733 """
734 Check if the cryptography module is present, and if it supports X25519,
735 ChaCha20Poly1305 and such.
737 Notes:
738 - cryptography >= 2.0 is required
739 - OpenSSL >= 1.1.0 is required
740 """
741 try:
742 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501
743 X25519PrivateKey.generate()
744 except Exception:
745 return False
746 else:
747 return True
750def isPyPy():
751 # type: () -> bool
752 """Returns either scapy is running under PyPy or not"""
753 try:
754 import __pypy__ # noqa: F401
755 return True
756 except ImportError:
757 return False
760def _prompt_changer(attr, val, old):
761 # type: (str, Any, Any) -> Any
762 """Change the current prompt theme"""
763 Interceptor.set_from_hook(conf, attr, val)
764 try:
765 sys.ps1 = conf.color_theme.prompt(conf.prompt)
766 except Exception:
767 pass
768 try:
769 apply_ipython_style(
770 get_ipython() # type: ignore
771 )
772 except NameError:
773 pass
774 return getattr(conf, attr, old)
777def _set_conf_sockets():
778 # type: () -> None
779 """Populate the conf.L2Socket and conf.L3Socket
780 according to the various use_* parameters
781 """
782 if conf.use_bpf and not BSD:
783 Interceptor.set_from_hook(conf, "use_bpf", False)
784 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !")
785 if not conf.use_pcap and SOLARIS:
786 Interceptor.set_from_hook(conf, "use_pcap", True)
787 raise ScapyInvalidPlatformException(
788 "Scapy only supports libpcap on Solaris !"
789 )
790 # we are already in an Interceptor hook, use Interceptor.set_from_hook
791 if conf.use_pcap:
792 try:
793 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \
794 L3pcapSocket
795 except (OSError, ImportError):
796 log_loading.warning("No libpcap provider available ! pcap won't be used")
797 Interceptor.set_from_hook(conf, "use_pcap", False)
798 else:
799 conf.L3socket = L3pcapSocket
800 conf.L3socket6 = functools.partial(
801 L3pcapSocket, filter="ip6")
802 conf.L2socket = L2pcapSocket
803 conf.L2listen = L2pcapListenSocket
804 elif conf.use_bpf:
805 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \
806 L2bpfSocket, L3bpfSocket
807 conf.L3socket = L3bpfSocket
808 conf.L3socket6 = functools.partial(
809 L3bpfSocket, filter="ip6")
810 conf.L2socket = L2bpfSocket
811 conf.L2listen = L2bpfListenSocket
812 elif LINUX:
813 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket
814 conf.L3socket = L3PacketSocket
815 conf.L3socket6 = cast(
816 "Type[SuperSocket]",
817 functools.partial(
818 L3PacketSocket,
819 filter="ip6"
820 )
821 )
822 conf.L2socket = L2Socket
823 conf.L2listen = L2ListenSocket
824 elif WINDOWS:
825 from scapy.arch.windows import _NotAvailableSocket
826 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6
827 conf.L3socket = L3WinSocket
828 conf.L3socket6 = L3WinSocket6
829 conf.L2socket = _NotAvailableSocket
830 conf.L2listen = _NotAvailableSocket
831 else:
832 from scapy.supersocket import L3RawSocket, L3RawSocket6
833 conf.L3socket = L3RawSocket
834 conf.L3socket6 = L3RawSocket6
835 # Reload the interfaces
836 conf.ifaces.reload()
839def _socket_changer(attr, val, old):
840 # type: (str, bool, bool) -> Any
841 if not isinstance(val, bool):
842 raise TypeError("This argument should be a boolean")
843 Interceptor.set_from_hook(conf, attr, val)
844 dependencies = { # Things that will be turned off
845 "use_pcap": ["use_bpf"],
846 "use_bpf": ["use_pcap"],
847 }
848 restore = {k: getattr(conf, k) for k in dependencies}
849 del restore[attr] # This is handled directly by _set_conf_sockets
850 if val: # Only if True
851 for param in dependencies[attr]:
852 Interceptor.set_from_hook(conf, param, False)
853 try:
854 _set_conf_sockets()
855 except (ScapyInvalidPlatformException, ImportError) as e:
856 for key, value in restore.items():
857 Interceptor.set_from_hook(conf, key, value)
858 if isinstance(e, ScapyInvalidPlatformException):
859 raise
860 return getattr(conf, attr)
863def _loglevel_changer(attr, val, old):
864 # type: (str, int, int) -> int
865 """Handle a change of conf.logLevel"""
866 log_scapy.setLevel(val)
867 return val
870def _iface_changer(attr, val, old):
871 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface'
872 """Resolves the interface in conf.iface"""
873 if isinstance(val, str):
874 from scapy.interfaces import resolve_iface
875 iface = resolve_iface(val)
876 if old and iface.dummy:
877 warning(
878 "This interface is not specified in any provider ! "
879 "See conf.ifaces output"
880 )
881 return iface
882 return val
885def _reset_tls_nss_keys(attr, val, old):
886 # type: (str, Any, Any) -> Any
887 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes"""
888 conf.tls_nss_keys = None
889 return val
892class Conf(ConfClass):
893 """
894 This object contains the configuration of Scapy.
895 """
896 version: str = ReadOnlyAttribute("version", VERSION)
897 session: str = "" #: filename where the session will be saved
898 interactive = False
899 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto".
900 #: Default: Auto
901 interactive_shell = "auto"
902 #: Configuration for "ipython" to use jedi (disabled by default)
903 ipython_use_jedi = False
904 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...)
905 stealth = "not implemented"
906 #: selects the default output interface for srp() and sendp().
907 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501
908 layers: LayersList = LayersList()
909 commands = CommandsList() # type: CommandsList
910 #: Codec used by default for ASN1 objects
911 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec'
912 #: Default size for ASN1 objects
913 ASN1_default_long_size = 0
914 #: choose the AS resolver class to use
915 AS_resolver = None # type: scapy.as_resolvers.AS_resolver
916 dot15d4_protocol = None # Used in dot15d4.py
917 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer)
918 #: if 0, doesn't check that IPID matches between IP sent and
919 #: ICMP IP citation received
920 #: if 1, checks that they either are equal or byte swapped
921 #: equals (bug in some IP stacks)
922 #: if 2, strictly checks that they are equals
923 checkIPID = False
924 #: if 1, checks IP src in IP and ICMP IP citation match
925 #: (bug in some NAT stacks)
926 checkIPsrc = True
927 checkIPaddr = True
928 #: if True, checks that IP-in-IP layers match. If False, do
929 #: not check IP layers that encapsulates another IP layer
930 checkIPinIP = True
931 #: if 1, also check that TCP seq and ack match the
932 #: ones in ICMP citation
933 check_TCPerror_seqack = False
934 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose)
935 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer)
936 #: default mode for the promiscuous mode of a socket (to get answers if you
937 #: spoof on a lan)
938 sniff_promisc = True # type: bool
939 raw_layer = None # type: Type[Packet]
940 raw_summary = False # type: Union[bool, Callable[[bytes], Any]]
941 padding_layer = None # type: Type[Packet]
942 default_l2 = None # type: Type[Packet]
943 l2types: Num2Layer = Num2Layer()
944 l3types: Num2Layer = Num2Layer()
945 L3socket = None # type: Type[scapy.supersocket.SuperSocket]
946 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket]
947 L2socket = None # type: Type[scapy.supersocket.SuperSocket]
948 L2listen = None # type: Type[scapy.supersocket.SuperSocket]
949 BTsocket = None # type: Type[scapy.supersocket.SuperSocket]
950 min_pkt_size = 60
951 #: holds MIB direct access dictionary
952 mib = None # type: 'scapy.asn1.mib.MIBDict'
953 bufsize = 2**16
954 #: history file
955 histfile: str = os.getenv(
956 'SCAPY_HISTFILE',
957 os.path.join(
958 os.path.expanduser("~"),
959 ".config", "scapy", "history"
960 )
961 )
962 #: includes padding in disassembled packets
963 padding = 1
964 #: BPF filter for packets to ignore
965 except_filter = ""
966 #: bpf filter added to every sniffing socket to exclude traffic
967 #: from analysis
968 filter = ""
969 #: when 1, store received packet that are not matched into `debug.recv`
970 debug_match = False
971 #: When 1, print some TLS session secrets when they are computed, and
972 #: warn about the session recognition.
973 debug_tls = False
974 wepkey = ""
975 #: holds the Scapy interface list and manager
976 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict'
977 #: holds the cache of interfaces loaded from Libpcap
978 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str, int]]
979 # `neighbor` will be filed by scapy.layers.l2
980 neighbor = None # type: 'scapy.layers.l2.Neighbor'
981 #: holds the name servers IP/hosts used for custom DNS resolution
982 nameservers = None # type: str
983 #: automatically load IPv4 routes on startup. Disable this if your
984 #: routing table is too big.
985 route_autoload = True
986 #: automatically load IPv6 routes on startup. Disable this if your
987 #: routing table is too big.
988 route6_autoload = True
989 #: holds the Scapy IPv4 routing table and provides methods to
990 #: manipulate it
991 route = None # type: 'scapy.route.Route'
992 # `route` will be filed by route.py
993 #: holds the Scapy IPv6 routing table and provides methods to
994 #: manipulate it
995 route6 = None # type: 'scapy.route6.Route6'
996 manufdb = None # type: 'scapy.data.ManufDA'
997 ethertypes = None # type: 'scapy.data.EtherDA'
998 protocols = None # type: 'scapy.dadict.DADict[int, str]'
999 services_udp = None # type: 'scapy.dadict.DADict[int, str]'
1000 services_tcp = None # type: 'scapy.dadict.DADict[int, str]'
1001 services_sctp = None # type: 'scapy.dadict.DADict[int, str]'
1002 # 'route6' will be filed by route6.py
1003 teredoPrefix = "" # type: str
1004 teredoServerPort = None # type: int
1005 auto_fragment = True
1006 #: raise exception when a packet dissector raises an exception
1007 debug_dissector = False
1008 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer)
1009 #: how much time between warnings from the same place
1010 warning_threshold = 5
1011 prog: ProgPath = ProgPath()
1012 #: holds list of fields for which resolution should be done
1013 resolve: Resolve = Resolve()
1014 #: holds list of enum fields for which conversion to string
1015 #: should NOT be done
1016 noenum: Resolve = Resolve()
1017 emph: Emphasize = Emphasize()
1018 #: read only attribute to show if PyPy is in use
1019 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy())
1020 #: use libpcap integration or not. Changing this value will update
1021 #: the conf.L[2/3] sockets
1022 use_pcap: bool = Interceptor(
1023 "use_pcap",
1024 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"),
1025 _socket_changer
1026 )
1027 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer)
1028 use_npcap = False
1029 ipv6_enabled: bool = socket.has_ipv6
1030 stats_classic_protocols = [] # type: List[Type[Packet]]
1031 stats_dot11_protocols = [] # type: List[Type[Packet]]
1032 temp_files = [] # type: List[str]
1033 #: netcache holds time-based caches for net operations
1034 netcache: NetCache = NetCache()
1035 geoip_city = None
1036 # can, tls, http and a few others are not loaded by default
1037 load_layers: List[str] = [
1038 'bluetooth',
1039 'bluetooth4LE',
1040 'dcerpc',
1041 'dhcp',
1042 'dhcp6',
1043 'dns',
1044 'dot11',
1045 'dot15d4',
1046 'eap',
1047 'gprs',
1048 'gssapi',
1049 'hsrp',
1050 'inet',
1051 'inet6',
1052 'ipsec',
1053 'ir',
1054 'isakmp',
1055 'kerberos',
1056 'l2',
1057 'l2tp',
1058 'ldap',
1059 'llmnr',
1060 'lltd',
1061 'mgcp',
1062 'mobileip',
1063 'netbios',
1064 'netflow',
1065 'ntlm',
1066 'ntp',
1067 'ppi',
1068 'ppp',
1069 'pptp',
1070 'radius',
1071 'rip',
1072 'rtp',
1073 'sctp',
1074 'sixlowpan',
1075 'skinny',
1076 'smb',
1077 'smb2',
1078 'smbclient',
1079 'smbserver',
1080 'snmp',
1081 'spnego',
1082 'tftp',
1083 'vrrp',
1084 'vxlan',
1085 'x509',
1086 'zigbee'
1087 ]
1088 #: a dict which can be used by contrib layers to store local
1089 #: configuration
1090 contribs = dict() # type: Dict[str, Any]
1091 exts: ExtsManager = ExtsManager()
1092 crypto_valid = isCryptographyValid()
1093 crypto_valid_advanced = isCryptographyAdvanced()
1094 #: controls whether or not to display the fancy banner
1095 fancy_banner = True
1096 #: controls whether tables (conf.iface, conf.route...) should be cropped
1097 #: to fit the terminal
1098 auto_crop_tables = True
1099 #: how often to check for new packets.
1100 #: Defaults to 0.05s.
1101 recv_poll_rate = 0.05
1102 #: When True, raise exception if no dst MAC found otherwise broadcast.
1103 #: Default is False.
1104 raise_no_dst_mac = False
1105 loopback_name: str = "lo" if LINUX else "lo0"
1106 nmap_base = "" # type: str
1107 nmap_kdb = None # type: Optional[NmapKnowledgeBase]
1108 #: a safety mechanism: the maximum amount of items included in a PacketListField
1109 #: or a FieldListField
1110 max_list_count = 100
1111 #: When the TLS module is loaded (not by default), the following turns on sessions
1112 tls_session_enable = False
1113 #: Filename containing NSS Keys Log
1114 tls_nss_filename = Interceptor(
1115 "tls_nss_filename",
1116 None,
1117 _reset_tls_nss_keys
1118 )
1119 #: Dictionary containing parsed NSS Keys
1120 tls_nss_keys: Dict[str, bytes] = None
1121 #: When TCPSession is used, parse DCE/RPC sessions automatically.
1122 #: This should be used for passive sniffing.
1123 dcerpc_session_enable = False
1124 #: If a capture is missing the first DCE/RPC binding message, we might incorrectly
1125 #: assume that header signing isn't used. This forces it on.
1126 dcerpc_force_header_signing = False
1127 #: Windows SSPs for sniffing. This is used with
1128 #: dcerpc_session_enable
1129 winssps_passive = []
1131 def __getattribute__(self, attr):
1132 # type: (str) -> Any
1133 # Those are loaded on runtime to avoid import loops
1134 if attr == "manufdb":
1135 from scapy.data import MANUFDB
1136 return MANUFDB
1137 if attr == "ethertypes":
1138 from scapy.data import ETHER_TYPES
1139 return ETHER_TYPES
1140 if attr == "protocols":
1141 from scapy.data import IP_PROTOS
1142 return IP_PROTOS
1143 if attr == "services_udp":
1144 from scapy.data import UDP_SERVICES
1145 return UDP_SERVICES
1146 if attr == "services_tcp":
1147 from scapy.data import TCP_SERVICES
1148 return TCP_SERVICES
1149 if attr == "services_sctp":
1150 from scapy.data import SCTP_SERVICES
1151 return SCTP_SERVICES
1152 if attr == "iface6":
1153 warnings.warn(
1154 "conf.iface6 is deprecated in favor of conf.iface",
1155 DeprecationWarning
1156 )
1157 attr = "iface"
1158 return object.__getattribute__(self, attr)
1161if not Conf.ipv6_enabled:
1162 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501
1163 for m in ["inet6", "dhcp6", "sixlowpan"]:
1164 if m in Conf.load_layers:
1165 Conf.load_layers.remove(m)
1167conf = Conf() # type: Conf
1169# Python 3.8 Only
1170if sys.version_info >= (3, 8):
1171 conf.exts.load()
1174def crypto_validator(func):
1175 # type: (DecoratorCallable) -> DecoratorCallable
1176 """
1177 This a decorator to be used for any method relying on the cryptography library. # noqa: E501
1178 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'.
1179 """
1180 def func_in(*args, **kwargs):
1181 # type: (*Any, **Any) -> Any
1182 if not conf.crypto_valid:
1183 raise ImportError("Cannot execute crypto-related method! "
1184 "Please install python-cryptography v1.7 or later.") # noqa: E501
1185 return func(*args, **kwargs)
1186 return func_in
1189def scapy_delete_temp_files():
1190 # type: () -> None
1191 for f in conf.temp_files:
1192 try:
1193 os.unlink(f)
1194 except Exception:
1195 pass
1196 del conf.temp_files[:]
1199atexit.register(scapy_delete_temp_files)