Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/config.py: 54%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Implementation of the configuration object.
8"""
11import atexit
12import copy
13import functools
14import os
15import re
16import socket
17import sys
18import time
19import warnings
21from dataclasses import dataclass
22from enum import Enum
24import importlib
25import importlib.abc
26import importlib.util
28import scapy
29from scapy import VERSION
30from scapy.base_classes import BasePacket
31from scapy.consts import DARWIN, WINDOWS, LINUX, BSD, SOLARIS
32from scapy.error import (
33 log_loading,
34 log_scapy,
35 ScapyInvalidPlatformException,
36 warning,
37)
38from scapy.themes import ColorTheme, NoTheme, apply_ipython_style
40# Typing imports
41from typing import (
42 cast,
43 Any,
44 Callable,
45 Dict,
46 Iterator,
47 List,
48 NoReturn,
49 Optional,
50 Set,
51 Tuple,
52 Type,
53 Union,
54 overload,
55 TYPE_CHECKING,
56)
57from types import ModuleType
58from scapy.compat import DecoratorCallable
60if TYPE_CHECKING:
61 # Do not import at runtime
62 import scapy.as_resolvers
63 from scapy.modules.nmap import NmapKnowledgeBase
64 from scapy.packet import Packet
65 from scapy.supersocket import SuperSocket # noqa: F401
66 import scapy.asn1.asn1
67 import scapy.asn1.mib
69############
70# Config #
71############
74class ConfClass(object):
75 def configure(self, cnf):
76 # type: (ConfClass) -> None
77 self.__dict__ = cnf.__dict__.copy()
79 def __repr__(self):
80 # type: () -> str
81 return str(self)
83 def __str__(self):
84 # type: () -> str
85 s = ""
86 dkeys = self.__class__.__dict__.copy()
87 dkeys.update(self.__dict__)
88 keys = sorted(dkeys)
89 for i in keys:
90 if i[0] != "_":
91 r = repr(getattr(self, i))
92 r = " ".join(r.split())
93 wlen = 76 - max(len(i), 10)
94 if len(r) > wlen:
95 r = r[:wlen - 3] + "..."
96 s += "%-10s = %s\n" % (i, r)
97 return s[:-1]
100class Interceptor(object):
101 def __init__(self,
102 name, # type: str
103 default, # type: Any
104 hook, # type: Callable[..., Any]
105 args=None, # type: Optional[List[Any]]
106 kargs=None # type: Optional[Dict[str, Any]]
107 ):
108 # type: (...) -> None
109 self.name = name
110 self.intname = "_intercepted_%s" % name
111 self.default = default
112 self.hook = hook
113 self.args = args if args is not None else []
114 self.kargs = kargs if kargs is not None else {}
116 def __get__(self, obj, typ=None):
117 # type: (Conf, Optional[type]) -> Any
118 if not hasattr(obj, self.intname):
119 setattr(obj, self.intname, self.default)
120 return getattr(obj, self.intname)
122 @staticmethod
123 def set_from_hook(obj, name, val):
124 # type: (Conf, str, bool) -> None
125 int_name = "_intercepted_%s" % name
126 setattr(obj, int_name, val)
128 def __set__(self, obj, val):
129 # type: (Conf, Any) -> None
130 old = getattr(obj, self.intname, self.default)
131 val = self.hook(self.name, val, old, *self.args, **self.kargs)
132 setattr(obj, self.intname, val)
135def _readonly(name):
136 # type: (str) -> NoReturn
137 default = Conf.__dict__[name].default
138 Interceptor.set_from_hook(conf, name, default)
139 raise ValueError("Read-only value !")
142ReadOnlyAttribute = functools.partial(
143 Interceptor,
144 hook=(lambda name, *args, **kwargs: _readonly(name))
145)
146ReadOnlyAttribute.__doc__ = "Read-only class attribute"
149class ProgPath(ConfClass):
150 _default: str = "<System default>"
151 universal_open: str = "open" if DARWIN else "xdg-open"
152 pdfreader: str = universal_open
153 psreader: str = universal_open
154 svgreader: str = universal_open
155 dot: str = "dot"
156 display: str = "display"
157 tcpdump: str = "tcpdump"
158 tcpreplay: str = "tcpreplay"
159 hexedit: str = "hexer"
160 tshark: str = "tshark"
161 wireshark: str = "wireshark"
162 ifconfig: str = "ifconfig"
163 extcap_folders: List[str] = [
164 os.path.join(os.path.expanduser("~"), ".config", "wireshark", "extcap"),
165 "/usr/lib/x86_64-linux-gnu/wireshark/extcap",
166 ]
169class ConfigFieldList:
170 def __init__(self):
171 # type: () -> None
172 self.fields = set() # type: Set[Any]
173 self.layers = set() # type: Set[Any]
175 @staticmethod
176 def _is_field(f):
177 # type: (Any) -> bool
178 return hasattr(f, "owners")
180 def _recalc_layer_list(self):
181 # type: () -> None
182 self.layers = {owner for f in self.fields for owner in f.owners}
184 def add(self, *flds):
185 # type: (*Any) -> None
186 self.fields |= {f for f in flds if self._is_field(f)}
187 self._recalc_layer_list()
189 def remove(self, *flds):
190 # type: (*Any) -> None
191 self.fields -= set(flds)
192 self._recalc_layer_list()
194 def __contains__(self, elt):
195 # type: (Any) -> bool
196 if isinstance(elt, BasePacket):
197 return elt in self.layers
198 return elt in self.fields
200 def __repr__(self):
201 # type: () -> str
202 return "<%s [%s]>" % (self.__class__.__name__, " ".join(str(x) for x in self.fields)) # noqa: E501
205class Emphasize(ConfigFieldList):
206 pass
209class Resolve(ConfigFieldList):
210 pass
213class Num2Layer:
214 def __init__(self):
215 # type: () -> None
216 self.num2layer = {} # type: Dict[int, Type[Packet]]
217 self.layer2num = {} # type: Dict[Type[Packet], int]
219 def register(self, num, layer):
220 # type: (int, Type[Packet]) -> None
221 self.register_num2layer(num, layer)
222 self.register_layer2num(num, layer)
224 def register_num2layer(self, num, layer):
225 # type: (int, Type[Packet]) -> None
226 self.num2layer[num] = layer
228 def register_layer2num(self, num, layer):
229 # type: (int, Type[Packet]) -> None
230 self.layer2num[layer] = num
232 @overload
233 def __getitem__(self, item):
234 # type: (Type[Packet]) -> int
235 pass
237 @overload
238 def __getitem__(self, item): # noqa: F811
239 # type: (int) -> Type[Packet]
240 pass
242 def __getitem__(self, item): # noqa: F811
243 # type: (Union[int, Type[Packet]]) -> Union[int, Type[Packet]]
244 if isinstance(item, int):
245 return self.num2layer[item]
246 else:
247 return self.layer2num[item]
249 def __contains__(self, item):
250 # type: (Union[int, Type[Packet]]) -> bool
251 if isinstance(item, int):
252 return item in self.num2layer
253 else:
254 return item in self.layer2num
256 def get(self,
257 item, # type: Union[int, Type[Packet]]
258 default=None, # type: Optional[Type[Packet]]
259 ):
260 # type: (...) -> Optional[Union[int, Type[Packet]]]
261 return self[item] if item in self else default
263 def __repr__(self):
264 # type: () -> str
265 lst = []
266 for num, layer in self.num2layer.items():
267 if layer in self.layer2num and self.layer2num[layer] == num:
268 dir = "<->"
269 else:
270 dir = " ->"
271 lst.append((num, "%#6x %s %-20s (%s)" % (num, dir, layer.__name__,
272 layer._name)))
273 for layer, num in self.layer2num.items():
274 if num not in self.num2layer or self.num2layer[num] != layer:
275 lst.append((num, "%#6x <- %-20s (%s)" % (num, layer.__name__,
276 layer._name)))
277 lst.sort()
278 return "\n".join(y for x, y in lst)
281class LayersList(List[Type['scapy.packet.Packet']]):
282 def __init__(self):
283 # type: () -> None
284 list.__init__(self)
285 self.ldict = {} # type: Dict[str, List[Type[Packet]]]
286 self.filtered = False
287 self._backup_dict = {} # type: Dict[Type[Packet], List[Tuple[Dict[str, Any], Type[Packet]]]] # noqa: E501
289 def __repr__(self):
290 # type: () -> str
291 return "\n".join("%-20s: %s" % (layer.__name__, layer.name)
292 for layer in self)
294 def register(self, layer):
295 # type: (Type[Packet]) -> None
296 self.append(layer)
297 if layer.__module__ not in self.ldict:
298 self.ldict[layer.__module__] = []
299 self.ldict[layer.__module__].append(layer)
301 def layers(self):
302 # type: () -> List[Tuple[str, str]]
303 result = []
304 # This import may feel useless, but it is required for the eval below
305 import scapy # noqa: F401
306 try:
307 import builtins # noqa: F401
308 except ImportError:
309 import __builtin__ # noqa: F401
310 for lay in self.ldict:
311 doc = eval(lay).__doc__
312 result.append((lay, doc.strip().split("\n")[0] if doc else lay))
313 return result
315 def filter(self, items):
316 # type: (List[Type[Packet]]) -> None
317 """Disable dissection of unused layers to speed up dissection"""
318 if self.filtered:
319 raise ValueError("Already filtered. Please disable it first")
320 for lay in self.ldict.values():
321 for cls in lay:
322 if cls not in self._backup_dict:
323 self._backup_dict[cls] = cls.payload_guess[:]
324 cls.payload_guess = [
325 y for y in cls.payload_guess if y[1] in items
326 ]
327 self.filtered = True
329 def unfilter(self):
330 # type: () -> None
331 """Re-enable dissection for all layers"""
332 if not self.filtered:
333 raise ValueError("Not filtered. Please filter first")
334 for lay in self.ldict.values():
335 for cls in lay:
336 cls.payload_guess = self._backup_dict[cls]
337 self._backup_dict.clear()
338 self.filtered = False
341class CommandsList(List[Callable[..., Any]]):
342 def __repr__(self):
343 # type: () -> str
344 s = []
345 for li in sorted(self, key=lambda x: x.__name__):
346 doc = li.__doc__ if li.__doc__ else "--"
347 doc = doc.lstrip().split('\n', 1)[0]
348 s.append("%-22s: %s" % (li.__name__, doc))
349 return "\n".join(s)
351 def register(self, cmd):
352 # type: (DecoratorCallable) -> DecoratorCallable
353 self.append(cmd)
354 return cmd # return cmd so that method can be used as a decorator
357def lsc():
358 # type: () -> None
359 """Displays Scapy's default commands"""
360 print(repr(conf.commands))
363class CacheInstance(Dict[str, Any]):
364 __slots__ = ["timeout", "name", "_timetable"]
366 def __init__(self, name="noname", timeout=None):
367 # type: (str, Optional[int]) -> None
368 self.timeout = timeout
369 self.name = name
370 self._timetable = {} # type: Dict[str, float]
372 def flush(self):
373 # type: () -> None
374 self._timetable.clear()
375 self.clear()
377 def __getitem__(self, item):
378 # type: (str) -> Any
379 if item in self.__slots__:
380 return object.__getattribute__(self, item)
381 if not self.__contains__(item):
382 raise KeyError(item)
383 return super(CacheInstance, self).__getitem__(item)
385 def __contains__(self, item):
386 if not super(CacheInstance, self).__contains__(item):
387 return False
388 if self.timeout is not None:
389 t = self._timetable[item]
390 if time.time() - t > self.timeout:
391 return False
392 return True
394 def get(self, item, default=None):
395 # type: (str, Optional[Any]) -> Any
396 # overloading this method is needed to force the dict to go through
397 # the timetable check
398 try:
399 return self[item]
400 except KeyError:
401 return default
403 def __setitem__(self, item, v):
404 # type: (str, str) -> None
405 if item in self.__slots__:
406 return object.__setattr__(self, item, v)
407 self._timetable[item] = time.time()
408 super(CacheInstance, self).__setitem__(item, v)
410 def update(self,
411 other, # type: Any
412 **kwargs # type: Any
413 ):
414 # type: (...) -> None
415 for key, value in other.items():
416 # We only update an element from `other` either if it does
417 # not exist in `self` or if the entry in `self` is older.
418 if key not in self or self._timetable[key] < other._timetable[key]:
419 dict.__setitem__(self, key, value)
420 self._timetable[key] = other._timetable[key]
422 def iteritems(self):
423 # type: () -> Iterator[Tuple[str, Any]]
424 if self.timeout is None:
425 return super(CacheInstance, self).items()
426 t0 = time.time()
427 return (
428 (k, v)
429 for (k, v) in super(CacheInstance, self).items()
430 if t0 - self._timetable[k] < self.timeout
431 )
433 def iterkeys(self):
434 # type: () -> Iterator[str]
435 if self.timeout is None:
436 return super(CacheInstance, self).keys()
437 t0 = time.time()
438 return (
439 k
440 for k in super(CacheInstance, self).keys()
441 if t0 - self._timetable[k] < self.timeout
442 )
444 def __iter__(self):
445 # type: () -> Iterator[str]
446 return self.iterkeys()
448 def itervalues(self):
449 # type: () -> Iterator[Tuple[str, Any]]
450 if self.timeout is None:
451 return super(CacheInstance, self).values()
452 t0 = time.time()
453 return (
454 v
455 for (k, v) in super(CacheInstance, self).items()
456 if t0 - self._timetable[k] < self.timeout
457 )
459 def items(self):
460 # type: () -> Any
461 return list(self.iteritems())
463 def keys(self):
464 # type: () -> Any
465 return list(self.iterkeys())
467 def values(self):
468 # type: () -> Any
469 return list(self.itervalues())
471 def __len__(self):
472 # type: () -> int
473 if self.timeout is None:
474 return super(CacheInstance, self).__len__()
475 return len(self.keys())
477 def summary(self):
478 # type: () -> str
479 return "%s: %i valid items. Timeout=%rs" % (self.name, len(self), self.timeout) # noqa: E501
481 def __repr__(self):
482 # type: () -> str
483 s = []
484 if self:
485 mk = max(len(k) for k in self)
486 fmt = "%%-%is %%s" % (mk + 1)
487 for item in self.items():
488 s.append(fmt % item)
489 return "\n".join(s)
491 def copy(self):
492 # type: () -> CacheInstance
493 return copy.copy(self)
496class NetCache:
497 def __init__(self):
498 # type: () -> None
499 self._caches_list = [] # type: List[CacheInstance]
501 def add_cache(self, cache):
502 # type: (CacheInstance) -> None
503 self._caches_list.append(cache)
504 setattr(self, cache.name, cache)
506 def new_cache(self, name, timeout=None):
507 # type: (str, Optional[int]) -> CacheInstance
508 c = CacheInstance(name=name, timeout=timeout)
509 self.add_cache(c)
510 return c
512 def __delattr__(self, attr):
513 # type: (str) -> NoReturn
514 raise AttributeError("Cannot delete attributes")
516 def update(self, other):
517 # type: (NetCache) -> None
518 for co in other._caches_list:
519 if hasattr(self, co.name):
520 getattr(self, co.name).update(co)
521 else:
522 self.add_cache(co.copy())
524 def flush(self):
525 # type: () -> None
526 for c in self._caches_list:
527 c.flush()
529 def __repr__(self):
530 # type: () -> str
531 return "\n".join(c.summary() for c in self._caches_list)
534class ScapyExt:
535 __slots__ = ["specs", "name", "version"]
537 class MODE(Enum):
538 LAYERS = "layers"
539 CONTRIB = "contrib"
540 MODULES = "modules"
542 @dataclass
543 class ScapyExtSpec:
544 fullname: str
545 mode: 'ScapyExt.MODE'
546 spec: Any
547 default: bool
549 def __init__(self):
550 self.specs: Dict[str, 'ScapyExt.ScapyExtSpec'] = {}
552 def config(self, name, version):
553 self.name = name
554 self.version = version
556 def register(self, name, mode, path, default=None):
557 assert mode in self.MODE, "mode must be one of ScapyExt.MODE !"
558 fullname = f"scapy.{mode.value}.{name}"
559 spec = importlib.util.spec_from_file_location(
560 fullname,
561 str(path),
562 )
563 spec = self.ScapyExtSpec(
564 fullname=fullname,
565 mode=mode,
566 spec=spec,
567 default=default or False,
568 )
569 if default is None:
570 spec.default = bool(importlib.util.find_spec(spec.fullname))
571 self.specs[fullname] = spec
573 def __repr__(self):
574 return "<ScapyExt %s %s (%s specs)>" % (
575 self.name,
576 self.version,
577 len(self.specs),
578 )
581class ExtsManager(importlib.abc.MetaPathFinder):
582 __slots__ = ["exts", "_loaded", "all_specs"]
584 SCAPY_PLUGIN_CLASSIFIER = 'Framework :: Scapy'
585 GPLV2_CLASSIFIERS = [
586 'License :: OSI Approved :: GNU General Public License v2 (GPLv2)',
587 'License :: OSI Approved :: GNU General Public License v2 or later (GPLv2+)',
588 ]
590 def __init__(self):
591 self.exts: List[ScapyExt] = []
592 self.all_specs: Dict[str, ScapyExt.ScapyExtSpec] = {}
593 self._loaded = []
595 def find_spec(self, fullname, path, target=None):
596 if fullname in self.all_specs:
597 return self.all_specs[fullname].spec
599 def invalidate_caches(self):
600 pass
602 def _register_spec(self, spec):
603 self.all_specs[spec.fullname] = spec
604 if spec.default:
605 loader = importlib.util.LazyLoader(spec.spec.loader)
606 spec.spec.loader = loader
607 module = importlib.util.module_from_spec(spec.spec)
608 sys.modules[spec.fullname] = module
609 loader.exec_module(module)
611 def load(self):
612 try:
613 import importlib.metadata
614 except ImportError:
615 return
616 for distr in importlib.metadata.distributions():
617 if any(
618 v == self.SCAPY_PLUGIN_CLASSIFIER
619 for k, v in distr.metadata.items() if k == 'Classifier'
620 ):
621 try:
622 pkg = next(
623 k
624 for k, v in importlib.metadata.packages_distributions().items()
625 if distr.name in v
626 )
627 except KeyError:
628 pkg = distr.name
629 if pkg in self._loaded:
630 continue
631 if not any(
632 v in self.GPLV2_CLASSIFIERS
633 for k, v in distr.metadata.items() if k == 'Classifier'
634 ):
635 log_loading.warning(
636 "'%s' has no GPLv2 classifier therefore cannot be loaded." % pkg # noqa: E501
637 )
638 continue
639 self._loaded.append(pkg)
640 ext = ScapyExt()
641 try:
642 scapy_ext = importlib.import_module(pkg)
643 except Exception as ex:
644 log_loading.warning(
645 "'%s' failed during import with %s" % (
646 pkg,
647 ex
648 )
649 )
650 continue
651 try:
652 scapy_ext_func = scapy_ext.scapy_ext
653 except AttributeError:
654 log_loading.info(
655 "'%s' included the Scapy Framework specifier "
656 "but did not include a scapy_ext" % pkg
657 )
658 continue
659 try:
660 scapy_ext_func(ext)
661 except Exception as ex:
662 log_loading.warning(
663 "'%s' failed during initialization with %s" % (
664 pkg,
665 ex
666 )
667 )
668 continue
669 for spec in ext.specs.values():
670 self._register_spec(spec)
671 self.exts.append(ext)
672 if self not in sys.meta_path:
673 sys.meta_path.append(self)
675 def __repr__(self):
676 from scapy.utils import pretty_list
677 return pretty_list(
678 [
679 (x.name, x.version, [y.fullname for y in x.specs.values()])
680 for x in self.exts
681 ],
682 [("Name", "Version", "Specs")],
683 sortBy=0,
684 )
687def _version_checker(module, minver):
688 # type: (ModuleType, Tuple[int, ...]) -> bool
689 """Checks that module has a higher version that minver.
691 params:
692 - module: a module to test
693 - minver: a tuple of versions
694 """
695 # We could use LooseVersion, but distutils imports imp which is deprecated
696 version_regexp = r'[a-z]?((?:\d|\.)+\d+)(?:\.dev[0-9]+)?'
697 version_tags_r = re.match(
698 version_regexp,
699 getattr(module, "__version__", "")
700 )
701 if not version_tags_r:
702 return False
703 version_tags_i = version_tags_r.group(1).split(".")
704 version_tags = tuple(int(x) for x in version_tags_i)
705 return bool(version_tags >= minver)
708def isCryptographyValid():
709 # type: () -> bool
710 """
711 Check if the cryptography module >= 2.0.0 is present. This is the minimum
712 version for most usages in Scapy.
713 """
714 try:
715 import cryptography
716 except ImportError:
717 return False
718 return _version_checker(cryptography, (2, 0, 0))
721def isCryptographyAdvanced():
722 # type: () -> bool
723 """
724 Check if the cryptography module is present, and if it supports X25519,
725 ChaCha20Poly1305 and such.
727 Notes:
728 - cryptography >= 2.0 is required
729 - OpenSSL >= 1.1.0 is required
730 """
731 try:
732 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey # noqa: E501
733 X25519PrivateKey.generate()
734 except Exception:
735 return False
736 else:
737 return True
740def isPyPy():
741 # type: () -> bool
742 """Returns either scapy is running under PyPy or not"""
743 try:
744 import __pypy__ # noqa: F401
745 return True
746 except ImportError:
747 return False
750def _prompt_changer(attr, val, old):
751 # type: (str, Any, Any) -> Any
752 """Change the current prompt theme"""
753 Interceptor.set_from_hook(conf, attr, val)
754 try:
755 sys.ps1 = conf.color_theme.prompt(conf.prompt)
756 except Exception:
757 pass
758 try:
759 apply_ipython_style(
760 get_ipython() # type: ignore
761 )
762 except NameError:
763 pass
764 return getattr(conf, attr, old)
767def _set_conf_sockets():
768 # type: () -> None
769 """Populate the conf.L2Socket and conf.L3Socket
770 according to the various use_* parameters
771 """
772 if conf.use_bpf and not BSD:
773 Interceptor.set_from_hook(conf, "use_bpf", False)
774 raise ScapyInvalidPlatformException("BSD-like (OSX, *BSD...) only !")
775 if not conf.use_pcap and SOLARIS:
776 Interceptor.set_from_hook(conf, "use_pcap", True)
777 raise ScapyInvalidPlatformException(
778 "Scapy only supports libpcap on Solaris !"
779 )
780 # we are already in an Interceptor hook, use Interceptor.set_from_hook
781 if conf.use_pcap:
782 try:
783 from scapy.arch.libpcap import L2pcapListenSocket, L2pcapSocket, \
784 L3pcapSocket
785 except (OSError, ImportError):
786 log_loading.warning("No libpcap provider available ! pcap won't be used")
787 Interceptor.set_from_hook(conf, "use_pcap", False)
788 else:
789 conf.L3socket = L3pcapSocket
790 conf.L3socket6 = functools.partial(
791 L3pcapSocket, filter="ip6")
792 conf.L2socket = L2pcapSocket
793 conf.L2listen = L2pcapListenSocket
794 elif conf.use_bpf:
795 from scapy.arch.bpf.supersocket import L2bpfListenSocket, \
796 L2bpfSocket, L3bpfSocket
797 conf.L3socket = L3bpfSocket
798 conf.L3socket6 = functools.partial(
799 L3bpfSocket, filter="ip6")
800 conf.L2socket = L2bpfSocket
801 conf.L2listen = L2bpfListenSocket
802 elif LINUX:
803 from scapy.arch.linux import L3PacketSocket, L2Socket, L2ListenSocket
804 conf.L3socket = L3PacketSocket
805 conf.L3socket6 = cast(
806 "Type[SuperSocket]",
807 functools.partial(
808 L3PacketSocket,
809 filter="ip6"
810 )
811 )
812 conf.L2socket = L2Socket
813 conf.L2listen = L2ListenSocket
814 elif WINDOWS:
815 from scapy.arch.windows import _NotAvailableSocket
816 from scapy.arch.windows.native import L3WinSocket, L3WinSocket6
817 conf.L3socket = L3WinSocket
818 conf.L3socket6 = L3WinSocket6
819 conf.L2socket = _NotAvailableSocket
820 conf.L2listen = _NotAvailableSocket
821 else:
822 from scapy.supersocket import L3RawSocket, L3RawSocket6
823 conf.L3socket = L3RawSocket
824 conf.L3socket6 = L3RawSocket6
825 # Reload the interfaces
826 conf.ifaces.reload()
829def _socket_changer(attr, val, old):
830 # type: (str, bool, bool) -> Any
831 if not isinstance(val, bool):
832 raise TypeError("This argument should be a boolean")
833 Interceptor.set_from_hook(conf, attr, val)
834 dependencies = { # Things that will be turned off
835 "use_pcap": ["use_bpf"],
836 "use_bpf": ["use_pcap"],
837 }
838 restore = {k: getattr(conf, k) for k in dependencies}
839 del restore[attr] # This is handled directly by _set_conf_sockets
840 if val: # Only if True
841 for param in dependencies[attr]:
842 Interceptor.set_from_hook(conf, param, False)
843 try:
844 _set_conf_sockets()
845 except (ScapyInvalidPlatformException, ImportError) as e:
846 for key, value in restore.items():
847 Interceptor.set_from_hook(conf, key, value)
848 if isinstance(e, ScapyInvalidPlatformException):
849 raise
850 return getattr(conf, attr)
853def _loglevel_changer(attr, val, old):
854 # type: (str, int, int) -> int
855 """Handle a change of conf.logLevel"""
856 log_scapy.setLevel(val)
857 return val
860def _iface_changer(attr, val, old):
861 # type: (str, Any, Any) -> 'scapy.interfaces.NetworkInterface'
862 """Resolves the interface in conf.iface"""
863 if isinstance(val, str):
864 from scapy.interfaces import resolve_iface
865 iface = resolve_iface(val)
866 if old and iface.dummy:
867 warning(
868 "This interface is not specified in any provider ! "
869 "See conf.ifaces output"
870 )
871 return iface
872 return val
875def _reset_tls_nss_keys(attr, val, old):
876 # type: (str, Any, Any) -> Any
877 """Reset conf.tls_nss_keys when conf.tls_nss_filename changes"""
878 conf.tls_nss_keys = None
879 return val
882class Conf(ConfClass):
883 """
884 This object contains the configuration of Scapy.
885 """
886 version: str = ReadOnlyAttribute("version", VERSION)
887 session: str = "" #: filename where the session will be saved
888 interactive = False
889 #: can be "ipython", "bpython", "ptpython", "ptipython", "python" or "auto".
890 #: Default: Auto
891 interactive_shell = "auto"
892 #: Configuration for "ipython" to use jedi (disabled by default)
893 ipython_use_jedi = False
894 #: if 1, prevents any unwanted packet to go out (ARP, DNS, ...)
895 stealth = "not implemented"
896 #: selects the default output interface for srp() and sendp().
897 iface = Interceptor("iface", None, _iface_changer) # type: 'scapy.interfaces.NetworkInterface' # noqa: E501
898 layers: LayersList = LayersList()
899 commands = CommandsList() # type: CommandsList
900 #: Codec used by default for ASN1 objects
901 ASN1_default_codec = None # type: 'scapy.asn1.asn1.ASN1Codec'
902 #: Default size for ASN1 objects
903 ASN1_default_long_size = 0
904 #: choose the AS resolver class to use
905 AS_resolver = None # type: scapy.as_resolvers.AS_resolver
906 dot15d4_protocol = None # Used in dot15d4.py
907 logLevel: int = Interceptor("logLevel", log_scapy.level, _loglevel_changer)
908 #: if 0, doesn't check that IPID matches between IP sent and
909 #: ICMP IP citation received
910 #: if 1, checks that they either are equal or byte swapped
911 #: equals (bug in some IP stacks)
912 #: if 2, strictly checks that they are equals
913 checkIPID = False
914 #: if 1, checks IP src in IP and ICMP IP citation match
915 #: (bug in some NAT stacks)
916 checkIPsrc = True
917 checkIPaddr = True
918 #: if True, checks that IP-in-IP layers match. If False, do
919 #: not check IP layers that encapsulates another IP layer
920 checkIPinIP = True
921 #: if 1, also check that TCP seq and ack match the
922 #: ones in ICMP citation
923 check_TCPerror_seqack = False
924 verb = 2 #: level of verbosity, from 0 (almost mute) to 3 (verbose)
925 prompt: str = Interceptor("prompt", ">>> ", _prompt_changer)
926 #: default mode for the promiscuous mode of a socket (to get answers if you
927 #: spoof on a lan)
928 sniff_promisc = True # type: bool
929 raw_layer = None # type: Type[Packet]
930 raw_summary = False # type: Union[bool, Callable[[bytes], Any]]
931 padding_layer = None # type: Type[Packet]
932 default_l2 = None # type: Type[Packet]
933 l2types: Num2Layer = Num2Layer()
934 l3types: Num2Layer = Num2Layer()
935 L3socket = None # type: Type[scapy.supersocket.SuperSocket]
936 L3socket6 = None # type: Type[scapy.supersocket.SuperSocket]
937 L2socket = None # type: Type[scapy.supersocket.SuperSocket]
938 L2listen = None # type: Type[scapy.supersocket.SuperSocket]
939 BTsocket = None # type: Type[scapy.supersocket.SuperSocket]
940 min_pkt_size = 60
941 #: holds MIB direct access dictionary
942 mib = None # type: 'scapy.asn1.mib.MIBDict'
943 bufsize = 2**16
944 #: history file
945 histfile: str = os.getenv(
946 'SCAPY_HISTFILE',
947 os.path.join(
948 os.path.expanduser("~"),
949 ".config", "scapy", "history"
950 )
951 )
952 #: includes padding in disassembled packets
953 padding = 1
954 #: BPF filter for packets to ignore
955 except_filter = ""
956 #: bpf filter added to every sniffing socket to exclude traffic
957 #: from analysis
958 filter = ""
959 #: when 1, store received packet that are not matched into `debug.recv`
960 debug_match = False
961 #: When 1, print some TLS session secrets when they are computed, and
962 #: warn about the session recognition.
963 debug_tls = False
964 wepkey = ""
965 #: holds the Scapy interface list and manager
966 ifaces = None # type: 'scapy.interfaces.NetworkInterfaceDict'
967 #: holds the cache of interfaces loaded from Libpcap
968 cache_pcapiflist = {} # type: Dict[str, Tuple[str, List[str], Any, str]]
969 # `neighbor` will be filed by scapy.layers.l2
970 neighbor = None # type: 'scapy.layers.l2.Neighbor'
971 #: holds the name servers IP/hosts used for custom DNS resolution
972 nameservers = None # type: str
973 #: automatically load IPv4 routes on startup. Disable this if your
974 #: routing table is too big.
975 route_autoload = True
976 #: automatically load IPv6 routes on startup. Disable this if your
977 #: routing table is too big.
978 route6_autoload = True
979 #: holds the Scapy IPv4 routing table and provides methods to
980 #: manipulate it
981 route = None # type: 'scapy.route.Route'
982 # `route` will be filed by route.py
983 #: holds the Scapy IPv6 routing table and provides methods to
984 #: manipulate it
985 route6 = None # type: 'scapy.route6.Route6'
986 manufdb = None # type: 'scapy.data.ManufDA'
987 ethertypes = None # type: 'scapy.data.EtherDA'
988 protocols = None # type: 'scapy.dadict.DADict[int, str]'
989 services_udp = None # type: 'scapy.dadict.DADict[int, str]'
990 services_tcp = None # type: 'scapy.dadict.DADict[int, str]'
991 services_sctp = None # type: 'scapy.dadict.DADict[int, str]'
992 # 'route6' will be filed by route6.py
993 teredoPrefix = "" # type: str
994 teredoServerPort = None # type: int
995 auto_fragment = True
996 #: raise exception when a packet dissector raises an exception
997 debug_dissector = False
998 color_theme: ColorTheme = Interceptor("color_theme", NoTheme(), _prompt_changer)
999 #: how much time between warnings from the same place
1000 warning_threshold = 5
1001 prog: ProgPath = ProgPath()
1002 #: holds list of fields for which resolution should be done
1003 resolve: Resolve = Resolve()
1004 #: holds list of enum fields for which conversion to string
1005 #: should NOT be done
1006 noenum: Resolve = Resolve()
1007 emph: Emphasize = Emphasize()
1008 #: read only attribute to show if PyPy is in use
1009 use_pypy: bool = ReadOnlyAttribute("use_pypy", isPyPy())
1010 #: use libpcap integration or not. Changing this value will update
1011 #: the conf.L[2/3] sockets
1012 use_pcap: bool = Interceptor(
1013 "use_pcap",
1014 os.getenv("SCAPY_USE_LIBPCAP", "").lower().startswith("y"),
1015 _socket_changer
1016 )
1017 use_bpf: bool = Interceptor("use_bpf", False, _socket_changer)
1018 use_npcap = False
1019 ipv6_enabled: bool = socket.has_ipv6
1020 stats_classic_protocols = [] # type: List[Type[Packet]]
1021 stats_dot11_protocols = [] # type: List[Type[Packet]]
1022 temp_files = [] # type: List[str]
1023 #: netcache holds time-based caches for net operations
1024 netcache: NetCache = NetCache()
1025 geoip_city = None
1026 # can, tls, http and a few others are not loaded by default
1027 load_layers: List[str] = [
1028 'bluetooth',
1029 'bluetooth4LE',
1030 'dcerpc',
1031 'dhcp',
1032 'dhcp6',
1033 'dns',
1034 'dot11',
1035 'dot15d4',
1036 'eap',
1037 'gprs',
1038 'gssapi',
1039 'hsrp',
1040 'inet',
1041 'inet6',
1042 'ipsec',
1043 'ir',
1044 'isakmp',
1045 'kerberos',
1046 'l2',
1047 'l2tp',
1048 'ldap',
1049 'llmnr',
1050 'lltd',
1051 'mgcp',
1052 'mobileip',
1053 'netbios',
1054 'netflow',
1055 'ntlm',
1056 'ntp',
1057 'ppi',
1058 'ppp',
1059 'pptp',
1060 'radius',
1061 'rip',
1062 'rtp',
1063 'sctp',
1064 'sixlowpan',
1065 'skinny',
1066 'smb',
1067 'smb2',
1068 'smbclient',
1069 'smbserver',
1070 'snmp',
1071 'spnego',
1072 'tftp',
1073 'vrrp',
1074 'vxlan',
1075 'x509',
1076 'zigbee'
1077 ]
1078 #: a dict which can be used by contrib layers to store local
1079 #: configuration
1080 contribs = dict() # type: Dict[str, Any]
1081 exts: ExtsManager = ExtsManager()
1082 crypto_valid = isCryptographyValid()
1083 crypto_valid_advanced = isCryptographyAdvanced()
1084 #: controls whether or not to display the fancy banner
1085 fancy_banner = True
1086 #: controls whether tables (conf.iface, conf.route...) should be cropped
1087 #: to fit the terminal
1088 auto_crop_tables = True
1089 #: how often to check for new packets.
1090 #: Defaults to 0.05s.
1091 recv_poll_rate = 0.05
1092 #: When True, raise exception if no dst MAC found otherwise broadcast.
1093 #: Default is False.
1094 raise_no_dst_mac = False
1095 loopback_name: str = "lo" if LINUX else "lo0"
1096 nmap_base = "" # type: str
1097 nmap_kdb = None # type: Optional[NmapKnowledgeBase]
1098 #: a safety mechanism: the maximum amount of items included in a PacketListField
1099 #: or a FieldListField
1100 max_list_count = 100
1101 #: When the TLS module is loaded (not by default), the following turns on sessions
1102 tls_session_enable = False
1103 #: Filename containing NSS Keys Log
1104 tls_nss_filename = Interceptor(
1105 "tls_nss_filename",
1106 None,
1107 _reset_tls_nss_keys
1108 )
1109 #: Dictionary containing parsed NSS Keys
1110 tls_nss_keys: Dict[str, bytes] = None
1111 #: When TCPSession is used, parse DCE/RPC sessions automatically.
1112 #: This should be used for passive sniffing.
1113 dcerpc_session_enable = False
1114 #: If a capture is missing the first DCE/RPC bindin message, we might incorrectly
1115 #: assume that header signing isn't used. This forces it on.
1116 dcerpc_force_header_signing = False
1117 #: Windows SSPs for sniffing. This is used with
1118 #: dcerpc_session_enable
1119 winssps_passive = []
1121 def __getattribute__(self, attr):
1122 # type: (str) -> Any
1123 # Those are loaded on runtime to avoid import loops
1124 if attr == "manufdb":
1125 from scapy.data import MANUFDB
1126 return MANUFDB
1127 if attr == "ethertypes":
1128 from scapy.data import ETHER_TYPES
1129 return ETHER_TYPES
1130 if attr == "protocols":
1131 from scapy.data import IP_PROTOS
1132 return IP_PROTOS
1133 if attr == "services_udp":
1134 from scapy.data import UDP_SERVICES
1135 return UDP_SERVICES
1136 if attr == "services_tcp":
1137 from scapy.data import TCP_SERVICES
1138 return TCP_SERVICES
1139 if attr == "services_sctp":
1140 from scapy.data import SCTP_SERVICES
1141 return SCTP_SERVICES
1142 if attr == "iface6":
1143 warnings.warn(
1144 "conf.iface6 is deprecated in favor of conf.iface",
1145 DeprecationWarning
1146 )
1147 attr = "iface"
1148 return object.__getattribute__(self, attr)
1151if not Conf.ipv6_enabled:
1152 log_scapy.warning("IPv6 support disabled in Python. Cannot load Scapy IPv6 layers.") # noqa: E501
1153 for m in ["inet6", "dhcp6", "sixlowpan"]:
1154 if m in Conf.load_layers:
1155 Conf.load_layers.remove(m)
1157conf = Conf() # type: Conf
1159# Python 3.8 Only
1160if sys.version_info >= (3, 8):
1161 conf.exts.load()
1164def crypto_validator(func):
1165 # type: (DecoratorCallable) -> DecoratorCallable
1166 """
1167 This a decorator to be used for any method relying on the cryptography library. # noqa: E501
1168 Its behaviour depends on the 'crypto_valid' attribute of the global 'conf'.
1169 """
1170 def func_in(*args, **kwargs):
1171 # type: (*Any, **Any) -> Any
1172 if not conf.crypto_valid:
1173 raise ImportError("Cannot execute crypto-related method! "
1174 "Please install python-cryptography v1.7 or later.") # noqa: E501
1175 return func(*args, **kwargs)
1176 return func_in
1179def scapy_delete_temp_files():
1180 # type: () -> None
1181 for f in conf.temp_files:
1182 try:
1183 os.unlink(f)
1184 except Exception:
1185 pass
1186 del conf.temp_files[:]
1189atexit.register(scapy_delete_temp_files)