Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/data.py: 74%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
6"""
7Global variables and functions for handling external data sets.
8"""
10import calendar
11import os
12import pickle
13import warnings
15from scapy.dadict import DADict, fixname
16from scapy.consts import FREEBSD, NETBSD, OPENBSD, WINDOWS
17from scapy.error import log_loading
19# Typing imports
20from typing import (
21 Any,
22 Callable,
23 Dict,
24 Iterator,
25 List,
26 Optional,
27 Tuple,
28 Union,
29 cast,
30)
31from scapy.compat import DecoratorCallable
34############
35# Consts #
36############
38ETHER_ANY = b"\x00" * 6
39ETHER_BROADCAST = b"\xff" * 6
41# From bits/socket.h
42SOL_PACKET = 263
43# From asm/socket.h
44SO_ATTACH_FILTER = 26
45SO_TIMESTAMPNS = 35 # SO_TIMESTAMPNS_OLD: not 2038 safe
47ETH_P_ALL = 3
48ETH_P_IP = 0x800
49ETH_P_ARP = 0x806
50ETH_P_IPV6 = 0x86dd
51ETH_P_MACSEC = 0x88e5
53# From net/if_arp.h
54ARPHDR_ETHER = 1
55ARPHDR_METRICOM = 23
56ARPHDR_PPP = 512
57ARPHDR_LOOPBACK = 772
58ARPHDR_TUN = 65534
60# From pcap/dlt.h
61DLT_NULL = 0
62DLT_EN10MB = 1
63DLT_EN3MB = 2
64DLT_AX25 = 3
65DLT_PRONET = 4
66DLT_CHAOS = 5
67DLT_IEEE802 = 6
68DLT_ARCNET = 7
69DLT_SLIP = 8
70DLT_PPP = 9
71DLT_FDDI = 10
72if OPENBSD:
73 DLT_RAW = 14
74else:
75 DLT_RAW = 12
76DLT_RAW_ALT = 101 # At least in Argus
77if FREEBSD or NETBSD:
78 DLT_SLIP_BSDOS = 13
79 DLT_PPP_BSDOS = 14
80else:
81 DLT_SLIP_BSDOS = 15
82 DLT_PPP_BSDOS = 16
83if FREEBSD:
84 DLT_PFSYNC = 121
85else:
86 DLT_PFSYNC = 18
87 DLT_HHDLC = 121
88DLT_ATM_CLIP = 19
89DLT_PPP_SERIAL = 50
90DLT_PPP_ETHER = 51
91DLT_SYMANTEC_FIREWALL = 99
92DLT_C_HDLC = 104
93DLT_IEEE802_11 = 105
94DLT_FRELAY = 107
95if OPENBSD:
96 DLT_LOOP = 12
97 DLT_ENC = 13
98else:
99 DLT_LOOP = 108
100 DLT_ENC = 109
101DLT_LINUX_SLL = 113
102DLT_LTALK = 114
103DLT_PFLOG = 117
104DLT_PRISM_HEADER = 119
105DLT_AIRONET_HEADER = 120
106DLT_IP_OVER_FC = 122
107DLT_IEEE802_11_RADIO = 127
108DLT_ARCNET_LINUX = 129
109DLT_LINUX_IRDA = 144
110DLT_IEEE802_11_RADIO_AVS = 163
111DLT_LINUX_LAPD = 177
112DLT_BLUETOOTH_HCI_H4 = 187
113DLT_USB_LINUX = 189
114DLT_PPI = 192
115DLT_IEEE802_15_4_WITHFCS = 195
116DLT_BLUETOOTH_HCI_H4_WITH_PHDR = 201
117DLT_AX25_KISS = 202
118DLT_PPP_WITH_DIR = 204
119DLT_FC_2 = 224
120DLT_CAN_SOCKETCAN = 227
121if OPENBSD:
122 DLT_IPV4 = DLT_RAW
123 DLT_IPV6 = DLT_RAW
124else:
125 DLT_IPV4 = 228
126 DLT_IPV6 = 229
127DLT_IEEE802_15_4_NOFCS = 230
128DLT_USBPCAP = 249
129DLT_NETLINK = 253
130DLT_USB_DARWIN = 266
131DLT_BLUETOOTH_LE_LL = 251
132DLT_BLUETOOTH_LINUX_MONITOR = 254
133DLT_BLUETOOTH_LE_LL_WITH_PHDR = 256
134DLT_VSOCK = 271
135DLT_NORDIC_BLE = 272
136DLT_ETHERNET_MPACKET = 274
137DLT_LINUX_SLL2 = 276
139# From net/ipv6.h on Linux (+ Additions)
140IPV6_ADDR_UNICAST = 0x01
141IPV6_ADDR_MULTICAST = 0x02
142IPV6_ADDR_CAST_MASK = 0x0F
143IPV6_ADDR_LOOPBACK = 0x10
144IPV6_ADDR_GLOBAL = 0x00
145IPV6_ADDR_LINKLOCAL = 0x20
146IPV6_ADDR_SITELOCAL = 0x40 # deprecated since Sept. 2004 by RFC 3879
147IPV6_ADDR_SCOPE_MASK = 0xF0
148# IPV6_ADDR_COMPATv4 = 0x80 # deprecated; i.e. ::/96
149# IPV6_ADDR_MAPPED = 0x1000 # i.e.; ::ffff:0.0.0.0/96
150IPV6_ADDR_6TO4 = 0x0100 # Added to have more specific info (should be 0x0101 ?) # noqa: E501
151IPV6_ADDR_UNSPECIFIED = 0x10000
153# from if_arp.h
154ARPHRD_ETHER = 1
155ARPHRD_EETHER = 2
156ARPHRD_AX25 = 3
157ARPHRD_PRONET = 4
158ARPHRD_CHAOS = 5
159ARPHRD_IEEE802 = 6
160ARPHRD_ARCNET = 7
161ARPHRD_DLCI = 15
162ARPHRD_ATM = 19
163ARPHRD_METRICOM = 23
164ARPHRD_SLIP = 256
165ARPHRD_CSLIP = 257
166ARPHRD_SLIP6 = 258
167ARPHRD_CSLIP6 = 259
168ARPHRD_ADAPT = 264
169ARPHRD_CAN = 280
170ARPHRD_PPP = 512
171ARPHRD_CISCO = 513
172ARPHRD_RAWHDLC = 518
173ARPHRD_TUNNEL = 768
174ARPHRD_FRAD = 770
175ARPHRD_LOOPBACK = 772
176ARPHRD_LOCALTLK = 773
177ARPHRD_FDDI = 774
178ARPHRD_SIT = 776
179ARPHRD_FCPP = 784
180ARPHRD_FCAL = 785
181ARPHRD_FCPL = 786
182ARPHRD_FCFABRIC = 787
183ARPHRD_IRDA = 783
184ARPHRD_IEEE802_TR = 800
185ARPHRD_IEEE80211 = 801
186ARPHRD_IEEE80211_PRISM = 802
187ARPHRD_IEEE80211_RADIOTAP = 803
188ARPHRD_IEEE802154 = 804
189ARPHRD_NETLINK = 824
190ARPHRD_VSOCKMON = 826 # from pcap/pcap-linux.c
191ARPHRD_LAPD = 8445 # from pcap/pcap-linux.c
192ARPHRD_NONE = 0xFFFE
194ARPHRD_TO_DLT = { # netlink -> datalink
195 ARPHRD_ETHER: DLT_EN10MB,
196 ARPHRD_METRICOM: DLT_EN10MB,
197 ARPHRD_LOOPBACK: DLT_EN10MB,
198 ARPHRD_EETHER: DLT_EN3MB,
199 ARPHRD_AX25: DLT_AX25_KISS,
200 ARPHRD_PRONET: DLT_PRONET,
201 ARPHRD_CHAOS: DLT_CHAOS,
202 ARPHRD_CAN: DLT_LINUX_SLL,
203 ARPHRD_IEEE802_TR: DLT_IEEE802,
204 ARPHRD_IEEE802: DLT_IEEE802,
205 ARPHRD_ARCNET: DLT_ARCNET_LINUX,
206 ARPHRD_FDDI: DLT_FDDI,
207 ARPHRD_ATM: -1,
208 ARPHRD_IEEE80211: DLT_IEEE802_11,
209 ARPHRD_IEEE80211_PRISM: DLT_PRISM_HEADER,
210 ARPHRD_IEEE80211_RADIOTAP: DLT_IEEE802_11_RADIO,
211 ARPHRD_PPP: DLT_RAW,
212 ARPHRD_CISCO: DLT_C_HDLC,
213 ARPHRD_SIT: DLT_RAW,
214 ARPHRD_CSLIP: DLT_RAW,
215 ARPHRD_SLIP6: DLT_RAW,
216 ARPHRD_CSLIP6: DLT_RAW,
217 ARPHRD_ADAPT: DLT_RAW,
218 ARPHRD_SLIP: DLT_RAW,
219 ARPHRD_RAWHDLC: DLT_RAW,
220 ARPHRD_DLCI: DLT_RAW,
221 ARPHRD_FRAD: DLT_FRELAY,
222 ARPHRD_LOCALTLK: DLT_LTALK,
223 18: DLT_IP_OVER_FC,
224 ARPHRD_FCPP: DLT_FC_2,
225 ARPHRD_FCAL: DLT_FC_2,
226 ARPHRD_FCPL: DLT_FC_2,
227 ARPHRD_FCFABRIC: DLT_FC_2,
228 ARPHRD_IRDA: DLT_LINUX_IRDA,
229 ARPHRD_LAPD: DLT_LINUX_LAPD,
230 ARPHRD_NONE: DLT_RAW,
231 ARPHRD_IEEE802154: DLT_IEEE802_15_4_NOFCS,
232 ARPHRD_NETLINK: DLT_NETLINK,
233 ARPHRD_VSOCKMON: DLT_VSOCK,
234}
236# Constants for PPI header types.
237PPI_DOT11COMMON = 2
238PPI_DOT11NMAC = 3
239PPI_DOT11NMACPHY = 4
240PPI_SPECTRUM_MAP = 5
241PPI_PROCESS_INFO = 6
242PPI_CAPTURE_INFO = 7
243PPI_AGGREGATION = 8
244PPI_DOT3 = 9
245PPI_GPS = 30002
246PPI_VECTOR = 30003
247PPI_SENSOR = 30004
248PPI_ANTENNA = 30005
249PPI_BTLE = 30006
251# Human-readable type names for PPI header types.
252PPI_TYPES = {
253 PPI_DOT11COMMON: 'dot11-common',
254 PPI_DOT11NMAC: 'dot11-nmac',
255 PPI_DOT11NMACPHY: 'dot11-nmacphy',
256 PPI_SPECTRUM_MAP: 'spectrum-map',
257 PPI_PROCESS_INFO: 'process-info',
258 PPI_CAPTURE_INFO: 'capture-info',
259 PPI_AGGREGATION: 'aggregation',
260 PPI_DOT3: 'dot3',
261 PPI_GPS: 'gps',
262 PPI_VECTOR: 'vector',
263 PPI_SENSOR: 'sensor',
264 PPI_ANTENNA: 'antenna',
265 PPI_BTLE: 'btle',
266}
269# On windows, epoch is 01/02/1970 at 00:00
270EPOCH = calendar.timegm((1970, 1, 2, 0, 0, 0, 3, 1, 0)) - 86400
272MTU = 0xffff # a.k.a give me all you have
275# In fact, IANA enterprise-numbers file available at
276# http://www.iana.org/assignments/enterprise-numbers
277# is simply huge (more than 2Mo and 600Ko in bz2). I'll
278# add only most common vendors, and encountered values.
279# -- arno
280IANA_ENTERPRISE_NUMBERS = {
281 9: "ciscoSystems",
282 35: "Nortel Networks",
283 43: "3Com",
284 311: "Microsoft",
285 2636: "Juniper Networks, Inc.",
286 4526: "Netgear",
287 5771: "Cisco Systems, Inc.",
288 5842: "Cisco Systems",
289 11129: "Google, Inc",
290 16885: "Nortel Networks",
291}
294def scapy_data_cache(name):
295 # type: (str) -> Callable[[DecoratorCallable], DecoratorCallable]
296 """
297 This decorator caches the loading of 'data' dictionaries, in order to reduce
298 loading times.
299 """
300 from scapy.main import SCAPY_CACHE_FOLDER
301 if SCAPY_CACHE_FOLDER is None:
302 # Cannot cache.
303 return lambda x: x
304 cachepath = SCAPY_CACHE_FOLDER / name
306 def _cached_loader(func, name=name):
307 # type: (DecoratorCallable, str) -> DecoratorCallable
308 def load(filename=None):
309 # type: (Optional[str]) -> Any
310 cache_id = hash(filename)
311 if cachepath.exists():
312 try:
313 with cachepath.open("rb") as fd:
314 data = pickle.load(fd)
315 if data["id"] == cache_id:
316 return data["content"]
317 except Exception:
318 log_loading.warning(
319 "Couldn't load cache from %s" % str(cachepath),
320 exc_info=True,
321 )
322 cachepath.unlink()
323 # Cache does not exist or is invalid.
324 content = func(filename)
325 data = {
326 "content": content,
327 "id": cache_id,
328 }
329 try:
330 cachepath.parent.mkdir(parents=True, exist_ok=True)
331 with cachepath.open("wb") as fd:
332 pickle.dump(data, fd)
333 return content
334 except Exception:
335 log_loading.warning(
336 "Couldn't cache %s into %s" % (repr(func), str(cachepath)),
337 exc_info=True,
338 )
339 return content
340 return load # type: ignore
341 return _cached_loader
344def load_protocols(filename, _fallback=None, _integer_base=10,
345 _cls=DADict[int, str]):
346 # type: (str, Optional[Callable[[], Iterator[str]]], int, type) -> DADict[int, str]
347 """"
348 Parse /etc/protocols and return values as a dictionary.
349 """
350 dct = _cls(_name=filename) # type: DADict[int, str]
352 def _process_data(fdesc):
353 # type: (Iterator[str]) -> None
354 for line in fdesc:
355 try:
356 shrp = line.find("#")
357 if shrp >= 0:
358 line = line[:shrp]
359 line = line.strip()
360 if not line:
361 continue
362 lt = tuple(line.split())
363 if len(lt) < 2 or not lt[0]:
364 continue
365 dct[int(lt[1], _integer_base)] = fixname(lt[0])
366 except Exception as e:
367 log_loading.info(
368 "Couldn't parse file [%s]: line [%r] (%s)",
369 filename,
370 line,
371 e,
372 )
373 try:
374 if not filename:
375 raise IOError
376 with open(filename, "r", errors="backslashreplace") as fdesc:
377 _process_data(fdesc)
378 except IOError:
379 if _fallback:
380 _process_data(_fallback())
381 else:
382 log_loading.info("Can't open %s file", filename)
383 return dct
386class EtherDA(DADict[int, str]):
387 # Backward compatibility: accept
388 # ETHER_TYPES["MY_GREAT_TYPE"] = 12
389 def __setitem__(self, attr, val):
390 # type: (int, str) -> None
391 if isinstance(attr, str):
392 attr, val = val, attr
393 warnings.warn(
394 "ETHER_TYPES now uses the integer value as key !",
395 DeprecationWarning
396 )
397 super(EtherDA, self).__setitem__(attr, val)
399 def __getitem__(self, attr):
400 # type: (int) -> Any
401 if isinstance(attr, str):
402 warnings.warn(
403 "Please use 'ETHER_TYPES.%s'" % attr,
404 DeprecationWarning
405 )
406 return super(EtherDA, self).__getattr__(attr)
407 return super(EtherDA, self).__getitem__(attr)
410@scapy_data_cache("ethertypes")
411def load_ethertypes(filename=None):
412 # type: (Optional[str]) -> EtherDA
413 """"Parse /etc/ethertypes and return values as a dictionary.
414 If unavailable, use the copy bundled with Scapy."""
415 def _fallback() -> Iterator[str]:
416 # Fallback. Lazy loaded as the file is big.
417 from scapy.libs.ethertypes import DATA
418 return iter(DATA.split("\n"))
419 prot = load_protocols(filename or "scapy/ethertypes",
420 _fallback=_fallback,
421 _integer_base=16,
422 _cls=EtherDA)
423 return cast(EtherDA, prot)
426@scapy_data_cache("services")
427def load_services(filename):
428 # type: (str) -> Tuple[DADict[int, str], DADict[int, str], DADict[int, str]] # noqa: E501
429 tdct = DADict(_name="%s-tcp" % filename) # type: DADict[int, str]
430 udct = DADict(_name="%s-udp" % filename) # type: DADict[int, str]
431 sdct = DADict(_name="%s-sctp" % filename) # type: DADict[int, str]
432 dcts = {
433 b"tcp": tdct,
434 b"udp": udct,
435 b"sctp": sdct,
436 }
437 try:
438 with open(filename, "rb") as fdesc:
439 for line in fdesc:
440 try:
441 shrp = line.find(b"#")
442 if shrp >= 0:
443 line = line[:shrp]
444 line = line.strip()
445 if not line:
446 continue
447 lt = tuple(line.split())
448 if len(lt) < 2 or not lt[0]:
449 continue
450 if b"/" not in lt[1]:
451 continue
452 port, proto = lt[1].split(b"/", 1)
453 try:
454 dtct = dcts[proto]
455 except KeyError:
456 continue
457 name = fixname(lt[0])
458 if b"-" in port:
459 sport, eport = port.split(b"-")
460 for i in range(int(sport), int(eport) + 1):
461 dtct[i] = name
462 else:
463 dtct[int(port)] = name
464 except Exception as e:
465 log_loading.warning(
466 "Couldn't parse file [%s]: line [%r] (%s)",
467 filename,
468 line,
469 e,
470 )
471 except IOError:
472 log_loading.info("Can't open /etc/services file")
473 return tdct, udct, sdct
476class ManufDA(DADict[str, Tuple[str, str]]):
477 def ident(self, v):
478 # type: (Any) -> str
479 return fixname(v[0] if isinstance(v, tuple) else v)
481 def _get_manuf_couple(self, mac):
482 # type: (str) -> Tuple[str, str]
483 oui = ":".join(mac.split(":")[:3]).upper()
484 return self.d.get(oui, (mac, mac))
486 def _get_manuf(self, mac):
487 # type: (str) -> str
488 return self._get_manuf_couple(mac)[1]
490 def _get_short_manuf(self, mac):
491 # type: (str) -> str
492 return self._get_manuf_couple(mac)[0]
494 def _resolve_MAC(self, mac):
495 # type: (str) -> str
496 oui = ":".join(mac.split(":")[:3]).upper()
497 if oui in self:
498 return ":".join([self[oui][0]] + mac.split(":")[3:])
499 return mac
501 def lookup(self, mac):
502 # type: (str) -> Tuple[str, str]
503 """Find OUI name matching to a MAC"""
504 return self._get_manuf_couple(mac)
506 def reverse_lookup(self, name, case_sensitive=False):
507 # type: (str, bool) -> Dict[str, str]
508 """
509 Find all MACs registered to a OUI
511 :param name: the OUI name
512 :param case_sensitive: default to False
513 :returns: a dict of mac:tuples (Name, Extended Name)
514 """
515 if case_sensitive:
516 filtr = lambda x, l: any(x in z for z in l) # type: Callable[[str, Tuple[str, str]], bool] # noqa: E501
517 else:
518 name = name.lower()
519 filtr = lambda x, l: any(x in z.lower() for z in l)
520 return {k: v for k, v in self.d.items() if filtr(name, v)} # type: ignore
522 def __dir__(self):
523 # type: () -> List[str]
524 return [
525 "_get_manuf",
526 "_get_short_manuf",
527 "_resolve_MAC",
528 "loopkup",
529 "reverse_lookup",
530 ] + super(ManufDA, self).__dir__()
533@scapy_data_cache("manufdb")
534def load_manuf(filename=None):
535 # type: (Optional[str]) -> ManufDA
536 """
537 Loads manuf file from Wireshark.
539 :param filename: the file to load the manuf file from
540 :returns: a ManufDA filled object
541 """
542 manufdb = ManufDA(_name=filename or "scapy/manufdb")
544 def _process_data(fdesc):
545 # type: (Iterator[str]) -> None
546 for line in fdesc:
547 try:
548 line = line.strip()
549 if not line or line.startswith("#"):
550 continue
551 parts = line.split(None, 2)
552 oui, shrt = parts[:2]
553 lng = parts[2].lstrip("#").strip() if len(parts) > 2 else ""
554 lng = lng or shrt
555 manufdb[oui] = shrt, lng
556 except Exception:
557 log_loading.warning("Couldn't parse one line from [%s] [%r]",
558 filename, line, exc_info=True)
560 try:
561 if not filename:
562 raise IOError
563 with open(filename, "r", errors="backslashreplace") as fdesc:
564 _process_data(fdesc)
565 except IOError:
566 # Fallback. Lazy loaded as the file is big.
567 from scapy.libs.manuf import DATA
568 _process_data(iter(DATA.split("\n")))
569 return manufdb
572def select_path(directories, filename):
573 # type: (List[str], str) -> Optional[str]
574 """Find filename among several directories"""
575 for directory in directories:
576 path = os.path.join(directory, filename)
577 if os.path.exists(path):
578 return path
579 return None
582if WINDOWS:
583 IP_PROTOS = load_protocols(os.path.join(
584 os.environ["SystemRoot"],
585 "system32",
586 "drivers",
587 "etc",
588 "protocol",
589 ))
590 TCP_SERVICES, UDP_SERVICES, SCTP_SERVICES = load_services(os.path.join(
591 os.environ["SystemRoot"],
592 "system32",
593 "drivers",
594 "etc",
595 "services",
596 ))
597 ETHER_TYPES = load_ethertypes()
598 MANUFDB = load_manuf()
599else:
600 IP_PROTOS = load_protocols("/etc/protocols")
601 TCP_SERVICES, UDP_SERVICES, SCTP_SERVICES = load_services("/etc/services")
602 ETHER_TYPES = load_ethertypes("/etc/ethertypes")
603 MANUFDB = load_manuf(
604 select_path(
605 ['/usr', '/usr/local', '/opt', '/opt/wireshark',
606 '/Applications/Wireshark.app/Contents/Resources'],
607 "share/wireshark/manuf"
608 )
609 )
612#####################
613# knowledge bases #
614#####################
615KBBaseType = Optional[Union[str, List[Tuple[str, Dict[str, Dict[str, str]]]]]]
618class KnowledgeBase(object):
619 def __init__(self, filename):
620 # type: (Optional[Any]) -> None
621 self.filename = filename
622 self.base = None # type: KBBaseType
624 def lazy_init(self):
625 # type: () -> None
626 self.base = ""
628 def reload(self, filename=None):
629 # type: (Optional[Any]) -> None
630 if filename is not None:
631 self.filename = filename
632 oldbase = self.base
633 self.base = None
634 self.lazy_init()
635 if self.base is None:
636 self.base = oldbase
638 def get_base(self):
639 # type: () -> Union[str, List[Tuple[str, Dict[str,Dict[str,str]]]]]
640 if self.base is None:
641 self.lazy_init()
642 return cast(Union[str, List[Tuple[str, Dict[str, Dict[str, str]]]]], self.base)