Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/data.py: 74%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Global variables and functions for handling external data sets.
8"""
10import calendar
11import hashlib
12import os
13import pickle
14import warnings
16from scapy.dadict import DADict, fixname
17from scapy.consts import FREEBSD, NETBSD, OPENBSD, WINDOWS
18from scapy.error import log_loading
20# Typing imports
21from typing import (
22 Any,
23 Callable,
24 Dict,
25 Iterator,
26 List,
27 Optional,
28 Tuple,
29 Union,
30 cast,
31)
32from scapy.compat import DecoratorCallable
35############
36# Consts #
37############
39ETHER_ANY = b"\x00" * 6
40ETHER_BROADCAST = b"\xff" * 6
42# From bits/socket.h
43SOL_PACKET = 263
44# From asm/socket.h
45SO_ATTACH_FILTER = 26
46SO_TIMESTAMPNS = 35 # SO_TIMESTAMPNS_OLD: not 2038 safe
48ETH_P_ALL = 3
49ETH_P_IP = 0x800
50ETH_P_ARP = 0x806
51ETH_P_IPV6 = 0x86dd
52ETH_P_MACSEC = 0x88e5
54# From net/if_arp.h
55ARPHDR_ETHER = 1
56ARPHDR_METRICOM = 23
57ARPHDR_PPP = 512
58ARPHDR_LOOPBACK = 772
59ARPHDR_TUN = 65534
61# From pcap/dlt.h
62DLT_NULL = 0
63DLT_EN10MB = 1
64DLT_EN3MB = 2
65DLT_AX25 = 3
66DLT_PRONET = 4
67DLT_CHAOS = 5
68DLT_IEEE802 = 6
69DLT_ARCNET = 7
70DLT_SLIP = 8
71DLT_PPP = 9
72DLT_FDDI = 10
73if OPENBSD:
74 DLT_RAW = 14
75else:
76 DLT_RAW = 12
77DLT_RAW_ALT = 101 # At least in Argus
78if FREEBSD or NETBSD:
79 DLT_SLIP_BSDOS = 13
80 DLT_PPP_BSDOS = 14
81else:
82 DLT_SLIP_BSDOS = 15
83 DLT_PPP_BSDOS = 16
84if FREEBSD:
85 DLT_PFSYNC = 121
86else:
87 DLT_PFSYNC = 18
88 DLT_HHDLC = 121
89DLT_ATM_CLIP = 19
90DLT_PPP_SERIAL = 50
91DLT_PPP_ETHER = 51
92DLT_SYMANTEC_FIREWALL = 99
93DLT_C_HDLC = 104
94DLT_IEEE802_11 = 105
95DLT_FRELAY = 107
96if OPENBSD:
97 DLT_LOOP = 12
98 DLT_ENC = 13
99else:
100 DLT_LOOP = 108
101 DLT_ENC = 109
102DLT_LINUX_SLL = 113
103DLT_LTALK = 114
104DLT_PFLOG = 117
105DLT_PRISM_HEADER = 119
106DLT_AIRONET_HEADER = 120
107DLT_IP_OVER_FC = 122
108DLT_IEEE802_11_RADIO = 127
109DLT_ARCNET_LINUX = 129
110DLT_LINUX_IRDA = 144
111DLT_IEEE802_11_RADIO_AVS = 163
112DLT_LINUX_LAPD = 177
113DLT_BLUETOOTH_HCI_H4 = 187
114DLT_USB_LINUX = 189
115DLT_PPI = 192
116DLT_IEEE802_15_4_WITHFCS = 195
117DLT_BLUETOOTH_HCI_H4_WITH_PHDR = 201
118DLT_AX25_KISS = 202
119DLT_PPP_WITH_DIR = 204
120DLT_FC_2 = 224
121DLT_CAN_SOCKETCAN = 227
122if OPENBSD:
123 DLT_IPV4 = DLT_RAW
124 DLT_IPV6 = DLT_RAW
125else:
126 DLT_IPV4 = 228
127 DLT_IPV6 = 229
128DLT_IEEE802_15_4_NOFCS = 230
129DLT_USBPCAP = 249
130DLT_NETLINK = 253
131DLT_USB_DARWIN = 266
132DLT_BLUETOOTH_LE_LL = 251
133DLT_BLUETOOTH_LINUX_MONITOR = 254
134DLT_BLUETOOTH_LE_LL_WITH_PHDR = 256
135DLT_VSOCK = 271
136DLT_NORDIC_BLE = 272
137DLT_ETHERNET_MPACKET = 274
138DLT_LINUX_SLL2 = 276
140# From net/ipv6.h on Linux (+ Additions)
141IPV6_ADDR_UNICAST = 0x01
142IPV6_ADDR_MULTICAST = 0x02
143IPV6_ADDR_CAST_MASK = 0x0F
144IPV6_ADDR_LOOPBACK = 0x10
145IPV6_ADDR_GLOBAL = 0x00
146IPV6_ADDR_LINKLOCAL = 0x20
147IPV6_ADDR_SITELOCAL = 0x40 # deprecated since Sept. 2004 by RFC 3879
148IPV6_ADDR_SCOPE_MASK = 0xF0
149# IPV6_ADDR_COMPATv4 = 0x80 # deprecated; i.e. ::/96
150# IPV6_ADDR_MAPPED = 0x1000 # i.e.; ::ffff:0.0.0.0/96
151IPV6_ADDR_6TO4 = 0x0100 # Added to have more specific info (should be 0x0101 ?) # noqa: E501
152IPV6_ADDR_UNSPECIFIED = 0x10000
154# from if_arp.h
155ARPHRD_ETHER = 1
156ARPHRD_EETHER = 2
157ARPHRD_AX25 = 3
158ARPHRD_PRONET = 4
159ARPHRD_CHAOS = 5
160ARPHRD_IEEE802 = 6
161ARPHRD_ARCNET = 7
162ARPHRD_DLCI = 15
163ARPHRD_ATM = 19
164ARPHRD_METRICOM = 23
165ARPHRD_SLIP = 256
166ARPHRD_CSLIP = 257
167ARPHRD_SLIP6 = 258
168ARPHRD_CSLIP6 = 259
169ARPHRD_ADAPT = 264
170ARPHRD_CAN = 280
171ARPHRD_PPP = 512
172ARPHRD_CISCO = 513
173ARPHRD_RAWHDLC = 518
174ARPHRD_TUNNEL = 768
175ARPHRD_FRAD = 770
176ARPHRD_LOOPBACK = 772
177ARPHRD_LOCALTLK = 773
178ARPHRD_FDDI = 774
179ARPHRD_SIT = 776
180ARPHRD_FCPP = 784
181ARPHRD_FCAL = 785
182ARPHRD_FCPL = 786
183ARPHRD_FCFABRIC = 787
184ARPHRD_IRDA = 783
185ARPHRD_IEEE802_TR = 800
186ARPHRD_IEEE80211 = 801
187ARPHRD_IEEE80211_PRISM = 802
188ARPHRD_IEEE80211_RADIOTAP = 803
189ARPHRD_IEEE802154 = 804
190ARPHRD_NETLINK = 824
191ARPHRD_VSOCKMON = 826 # from pcap/pcap-linux.c
192ARPHRD_LAPD = 8445 # from pcap/pcap-linux.c
193ARPHRD_NONE = 0xFFFE
195ARPHRD_TO_DLT = { # netlink -> datalink
196 ARPHRD_ETHER: DLT_EN10MB,
197 ARPHRD_METRICOM: DLT_EN10MB,
198 ARPHRD_LOOPBACK: DLT_EN10MB,
199 ARPHRD_EETHER: DLT_EN3MB,
200 ARPHRD_AX25: DLT_AX25_KISS,
201 ARPHRD_PRONET: DLT_PRONET,
202 ARPHRD_CHAOS: DLT_CHAOS,
203 ARPHRD_CAN: DLT_LINUX_SLL,
204 ARPHRD_IEEE802_TR: DLT_IEEE802,
205 ARPHRD_IEEE802: DLT_EN10MB,
206 ARPHRD_ARCNET: DLT_ARCNET_LINUX,
207 ARPHRD_FDDI: DLT_FDDI,
208 ARPHRD_ATM: -1,
209 ARPHRD_IEEE80211: DLT_IEEE802_11,
210 ARPHRD_IEEE80211_PRISM: DLT_PRISM_HEADER,
211 ARPHRD_IEEE80211_RADIOTAP: DLT_IEEE802_11_RADIO,
212 ARPHRD_PPP: DLT_RAW,
213 ARPHRD_CISCO: DLT_C_HDLC,
214 ARPHRD_SIT: DLT_RAW,
215 ARPHRD_CSLIP: DLT_RAW,
216 ARPHRD_SLIP6: DLT_RAW,
217 ARPHRD_CSLIP6: DLT_RAW,
218 ARPHRD_ADAPT: DLT_RAW,
219 ARPHRD_SLIP: DLT_RAW,
220 ARPHRD_RAWHDLC: DLT_RAW,
221 ARPHRD_DLCI: DLT_RAW,
222 ARPHRD_FRAD: DLT_FRELAY,
223 ARPHRD_LOCALTLK: DLT_LTALK,
224 18: DLT_IP_OVER_FC,
225 ARPHRD_FCPP: DLT_FC_2,
226 ARPHRD_FCAL: DLT_FC_2,
227 ARPHRD_FCPL: DLT_FC_2,
228 ARPHRD_FCFABRIC: DLT_FC_2,
229 ARPHRD_IRDA: DLT_LINUX_IRDA,
230 ARPHRD_LAPD: DLT_LINUX_LAPD,
231 ARPHRD_NONE: DLT_RAW,
232 ARPHRD_IEEE802154: DLT_IEEE802_15_4_NOFCS,
233 ARPHRD_NETLINK: DLT_NETLINK,
234 ARPHRD_VSOCKMON: DLT_VSOCK,
235}
237# Constants for PPI header types.
238PPI_DOT11COMMON = 2
239PPI_DOT11NMAC = 3
240PPI_DOT11NMACPHY = 4
241PPI_SPECTRUM_MAP = 5
242PPI_PROCESS_INFO = 6
243PPI_CAPTURE_INFO = 7
244PPI_AGGREGATION = 8
245PPI_DOT3 = 9
246PPI_GPS = 30002
247PPI_VECTOR = 30003
248PPI_SENSOR = 30004
249PPI_ANTENNA = 30005
250PPI_BTLE = 30006
252# Human-readable type names for PPI header types.
253PPI_TYPES = {
254 PPI_DOT11COMMON: 'dot11-common',
255 PPI_DOT11NMAC: 'dot11-nmac',
256 PPI_DOT11NMACPHY: 'dot11-nmacphy',
257 PPI_SPECTRUM_MAP: 'spectrum-map',
258 PPI_PROCESS_INFO: 'process-info',
259 PPI_CAPTURE_INFO: 'capture-info',
260 PPI_AGGREGATION: 'aggregation',
261 PPI_DOT3: 'dot3',
262 PPI_GPS: 'gps',
263 PPI_VECTOR: 'vector',
264 PPI_SENSOR: 'sensor',
265 PPI_ANTENNA: 'antenna',
266 PPI_BTLE: 'btle',
267}
270# On windows, epoch is 01/02/1970 at 00:00
271EPOCH = calendar.timegm((1970, 1, 2, 0, 0, 0, 3, 1, 0)) - 86400
273MTU = 0xffff # a.k.a give me all you have
276# In fact, IANA enterprise-numbers file available at
277# http://www.iana.org/assignments/enterprise-numbers
278# is simply huge (more than 2Mo and 600Ko in bz2). I'll
279# add only most common vendors, and encountered values.
280# -- arno
281IANA_ENTERPRISE_NUMBERS = {
282 9: "ciscoSystems",
283 35: "Nortel Networks",
284 43: "3Com",
285 311: "Microsoft",
286 2636: "Juniper Networks, Inc.",
287 4526: "Netgear",
288 5771: "Cisco Systems, Inc.",
289 5842: "Cisco Systems",
290 11129: "Google, Inc",
291 16885: "Nortel Networks",
292}
295def scapy_data_cache(name):
296 # type: (str) -> Callable[[DecoratorCallable], DecoratorCallable]
297 """
298 This decorator caches the loading of 'data' dictionaries, in order to reduce
299 loading times.
300 """
301 from scapy.main import SCAPY_CACHE_FOLDER
302 if SCAPY_CACHE_FOLDER is None:
303 # Cannot cache.
304 return lambda x: x
305 cachepath = SCAPY_CACHE_FOLDER / name
307 def _cached_loader(func, name=name):
308 # type: (DecoratorCallable, str) -> DecoratorCallable
309 def load(filename=None):
310 # type: (Optional[str]) -> Any
311 cache_id = hashlib.sha256((filename or "").encode()).hexdigest()
312 if cachepath.exists():
313 try:
314 with cachepath.open("rb") as fd:
315 data = pickle.load(fd)
316 if data["id"] == cache_id:
317 return data["content"]
318 except Exception as ex:
319 log_loading.info(
320 "Couldn't load cache from %s: %s" % (
321 str(cachepath),
322 str(ex),
323 )
324 )
325 cachepath.unlink(missing_ok=True)
326 # Cache does not exist or is invalid.
327 content = func(filename)
328 data = {
329 "content": content,
330 "id": cache_id,
331 }
332 try:
333 cachepath.parent.mkdir(parents=True, exist_ok=True)
334 with cachepath.open("wb") as fd:
335 pickle.dump(data, fd)
336 return content
337 except Exception as ex:
338 log_loading.info(
339 "Couldn't write cache into %s: %s" % (
340 str(cachepath),
341 str(ex)
342 )
343 )
344 return content
345 return load # type: ignore
346 return _cached_loader
349def load_protocols(filename, _fallback=None, _integer_base=10,
350 _cls=DADict[int, str]):
351 # type: (str, Optional[Callable[[], Iterator[str]]], int, type) -> DADict[int, str]
352 """"
353 Parse /etc/protocols and return values as a dictionary.
354 """
355 dct = _cls(_name=filename) # type: DADict[int, str]
357 def _process_data(fdesc):
358 # type: (Iterator[str]) -> None
359 for line in fdesc:
360 try:
361 shrp = line.find("#")
362 if shrp >= 0:
363 line = line[:shrp]
364 line = line.strip()
365 if not line:
366 continue
367 lt = tuple(line.split())
368 if len(lt) < 2 or not lt[0]:
369 continue
370 dct[int(lt[1], _integer_base)] = fixname(lt[0])
371 except Exception as e:
372 log_loading.info(
373 "Couldn't parse file [%s]: line [%r] (%s)",
374 filename,
375 line,
376 e,
377 )
378 try:
379 if not filename:
380 raise IOError
381 with open(filename, "r", errors="backslashreplace") as fdesc:
382 _process_data(fdesc)
383 except IOError:
384 if _fallback:
385 _process_data(_fallback())
386 else:
387 log_loading.info("Can't open %s file", filename)
388 return dct
391class EtherDA(DADict[int, str]):
392 # Backward compatibility: accept
393 # ETHER_TYPES["MY_GREAT_TYPE"] = 12
394 def __setitem__(self, attr, val):
395 # type: (int, str) -> None
396 if isinstance(attr, str):
397 attr, val = val, attr
398 warnings.warn(
399 "ETHER_TYPES now uses the integer value as key !",
400 DeprecationWarning
401 )
402 super(EtherDA, self).__setitem__(attr, val)
404 def __getitem__(self, attr):
405 # type: (int) -> Any
406 if isinstance(attr, str):
407 warnings.warn(
408 "Please use 'ETHER_TYPES.%s'" % attr,
409 DeprecationWarning
410 )
411 return super(EtherDA, self).__getattr__(attr)
412 return super(EtherDA, self).__getitem__(attr)
415@scapy_data_cache("ethertypes")
416def load_ethertypes(filename=None):
417 # type: (Optional[str]) -> EtherDA
418 """"Parse /etc/ethertypes and return values as a dictionary.
419 If unavailable, use the copy bundled with Scapy."""
420 def _fallback() -> Iterator[str]:
421 # Fallback. Lazy loaded as the file is big.
422 from scapy.libs.ethertypes import DATA
423 return iter(DATA.split("\n"))
424 prot = load_protocols(filename or "scapy/ethertypes",
425 _fallback=_fallback,
426 _integer_base=16,
427 _cls=EtherDA)
428 return cast(EtherDA, prot)
431@scapy_data_cache("services")
432def load_services(filename):
433 # type: (str) -> Tuple[DADict[int, str], DADict[int, str], DADict[int, str]] # noqa: E501
434 tdct = DADict(_name="%s-tcp" % filename) # type: DADict[int, str]
435 udct = DADict(_name="%s-udp" % filename) # type: DADict[int, str]
436 sdct = DADict(_name="%s-sctp" % filename) # type: DADict[int, str]
437 dcts = {
438 b"tcp": tdct,
439 b"udp": udct,
440 b"sctp": sdct,
441 }
442 try:
443 with open(filename, "rb") as fdesc:
444 for line in fdesc:
445 try:
446 shrp = line.find(b"#")
447 if shrp >= 0:
448 line = line[:shrp]
449 line = line.strip()
450 if not line:
451 continue
452 lt = tuple(line.split())
453 if len(lt) < 2 or not lt[0]:
454 continue
455 if b"/" not in lt[1]:
456 continue
457 port, proto = lt[1].split(b"/", 1)
458 try:
459 dtct = dcts[proto]
460 except KeyError:
461 continue
462 name = fixname(lt[0])
463 if b"-" in port:
464 sport, eport = port.split(b"-")
465 for i in range(int(sport), int(eport) + 1):
466 dtct[i] = name
467 else:
468 dtct[int(port)] = name
469 except Exception as e:
470 log_loading.warning(
471 "Couldn't parse file [%s]: line [%r] (%s)",
472 filename,
473 line,
474 e,
475 )
476 except IOError:
477 log_loading.info("Can't open /etc/services file")
478 return tdct, udct, sdct
481class ManufDA(DADict[str, Tuple[str, str]]):
482 def ident(self, v):
483 # type: (Any) -> str
484 return fixname(v[0] if isinstance(v, tuple) else v)
486 def _get_manuf_couple(self, mac):
487 # type: (str) -> Tuple[str, str]
488 oui = ":".join(mac.split(":")[:3]).upper()
489 return self.d.get(oui, (mac, mac))
491 def _get_manuf(self, mac):
492 # type: (str) -> str
493 return self._get_manuf_couple(mac)[1]
495 def _get_short_manuf(self, mac):
496 # type: (str) -> str
497 return self._get_manuf_couple(mac)[0]
499 def _resolve_MAC(self, mac):
500 # type: (str) -> str
501 oui = ":".join(mac.split(":")[:3]).upper()
502 if oui in self:
503 return ":".join([self[oui][0]] + mac.split(":")[3:])
504 return mac
506 def lookup(self, mac):
507 # type: (str) -> Tuple[str, str]
508 """Find OUI name matching to a MAC"""
509 return self._get_manuf_couple(mac)
511 def reverse_lookup(self, name, case_sensitive=False):
512 # type: (str, bool) -> Dict[str, str]
513 """
514 Find all MACs registered to a OUI
516 :param name: the OUI name
517 :param case_sensitive: default to False
518 :returns: a dict of mac:tuples (Name, Extended Name)
519 """
520 if case_sensitive:
521 filtr = lambda x, l: any(x in z for z in l) # type: Callable[[str, Tuple[str, str]], bool] # noqa: E501
522 else:
523 name = name.lower()
524 filtr = lambda x, l: any(x in z.lower() for z in l)
525 return {k: v for k, v in self.d.items() if filtr(name, v)} # type: ignore
527 def __dir__(self):
528 # type: () -> List[str]
529 return [
530 "_get_manuf",
531 "_get_short_manuf",
532 "_resolve_MAC",
533 "loopkup",
534 "reverse_lookup",
535 ] + super(ManufDA, self).__dir__()
538@scapy_data_cache("manufdb")
539def load_manuf(filename=None):
540 # type: (Optional[str]) -> ManufDA
541 """
542 Loads manuf file from Wireshark.
544 :param filename: the file to load the manuf file from
545 :returns: a ManufDA filled object
546 """
547 manufdb = ManufDA(_name=filename or "scapy/manufdb")
549 def _process_data(fdesc):
550 # type: (Iterator[str]) -> None
551 for line in fdesc:
552 try:
553 line = line.strip()
554 if not line or line.startswith("#"):
555 continue
556 parts = line.split(None, 2)
557 oui, shrt = parts[:2]
558 lng = parts[2].lstrip("#").strip() if len(parts) > 2 else ""
559 lng = lng or shrt
560 manufdb[oui] = shrt, lng
561 except Exception:
562 log_loading.warning("Couldn't parse one line from [%s] [%r]",
563 filename, line, exc_info=True)
565 try:
566 if not filename:
567 raise IOError
568 with open(filename, "r", errors="backslashreplace") as fdesc:
569 _process_data(fdesc)
570 except IOError:
571 # Fallback. Lazy loaded as the file is big.
572 from scapy.libs.manuf import DATA
573 _process_data(iter(DATA.split("\n")))
574 return manufdb
577def select_path(directories, filename):
578 # type: (List[str], str) -> Optional[str]
579 """Find filename among several directories"""
580 for directory in directories:
581 path = os.path.join(directory, filename)
582 if os.path.exists(path):
583 return path
584 return None
587if WINDOWS:
588 IP_PROTOS = load_protocols(os.path.join(
589 os.environ["SystemRoot"],
590 "system32",
591 "drivers",
592 "etc",
593 "protocol",
594 ))
595 TCP_SERVICES, UDP_SERVICES, SCTP_SERVICES = load_services(os.path.join(
596 os.environ["SystemRoot"],
597 "system32",
598 "drivers",
599 "etc",
600 "services",
601 ))
602 ETHER_TYPES = load_ethertypes()
603 MANUFDB = load_manuf()
604else:
605 IP_PROTOS = load_protocols("/etc/protocols")
606 TCP_SERVICES, UDP_SERVICES, SCTP_SERVICES = load_services("/etc/services")
607 ETHER_TYPES = load_ethertypes("/etc/ethertypes")
608 MANUFDB = load_manuf(
609 select_path(
610 ['/usr', '/usr/local', '/opt', '/opt/wireshark',
611 '/Applications/Wireshark.app/Contents/Resources'],
612 "share/wireshark/manuf"
613 )
614 )
617#####################
618# knowledge bases #
619#####################
620KBBaseType = Optional[Union[str, List[Tuple[str, Dict[str, Dict[str, str]]]]]]
623class KnowledgeBase(object):
624 def __init__(self, filename):
625 # type: (Optional[Any]) -> None
626 self.filename = filename
627 self.base = None # type: KBBaseType
629 def lazy_init(self):
630 # type: () -> None
631 self.base = ""
633 def reload(self, filename=None):
634 # type: (Optional[Any]) -> None
635 if filename is not None:
636 self.filename = filename
637 oldbase = self.base
638 self.base = None
639 self.lazy_init()
640 if self.base is None:
641 self.base = oldbase
643 def get_base(self):
644 # type: () -> Union[str, List[Tuple[str, Dict[str,Dict[str,str]]]]]
645 if self.base is None:
646 self.lazy_init()
647 return cast(Union[str, List[Tuple[str, Dict[str, Dict[str, str]]]]], self.base)