1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7Wireshark extcap API utils
8https://www.wireshark.org/docs/wsdg_html_chunked/ChCaptureExtcap.html
9"""
10
11import collections
12import functools
13import pathlib
14import re
15import subprocess
16
17from scapy.config import conf
18from scapy.consts import WINDOWS
19from scapy.data import MTU
20from scapy.error import warning
21from scapy.interfaces import (
22 network_name,
23 resolve_iface,
24 InterfaceProvider,
25 NetworkInterface,
26)
27from scapy.packet import Packet
28from scapy.supersocket import SuperSocket
29from scapy.utils import PcapReader, _create_fifo, _open_fifo
30
31# Typing
32from typing import (
33 cast,
34 Any,
35 Dict,
36 List,
37 NoReturn,
38 Optional,
39 Tuple,
40 Type,
41 Union,
42)
43
44
45def _extcap_call(prog: str,
46 args: List[str],
47 format: Dict[str, List[str]],
48 ) -> Dict[str, List[Tuple[str, ...]]]:
49 """
50 Function used to call a program using the extcap format,
51 then parse the results
52 """
53 p = subprocess.Popen(
54 [prog] + args,
55 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
56 text=True
57 )
58 data, err = p.communicate()
59 if p.returncode != 0:
60 raise OSError("%s returned with error code %s: %s" % (prog, p.returncode, err))
61 res = collections.defaultdict(list)
62 for ifa in data.split("\n"):
63 ifa = ifa.strip()
64 for keyword, values in format.items():
65 if not ifa.startswith(keyword):
66 continue
67
68 def _match(val: str, ifa: str) -> str:
69 m = re.search(r"{%s=([^}]*)}" % val, ifa)
70 if m:
71 return m.group(1)
72 return ""
73 res[keyword].append(
74 tuple(
75 [_match(val, ifa) for val in values]
76 )
77 )
78 break
79 return cast(Dict[str, List[Tuple[str, ...]]], res)
80
81
82class _ExtcapNetworkInterface(NetworkInterface):
83 """
84 Extcap NetworkInterface
85 """
86
87 def get_extcap_config(self) -> Dict[str, Tuple[str, ...]]:
88 """
89 Return a list of available configuration options on an extcap interface
90 """
91 return _extcap_call(
92 self.provider.cmdprog, # type: ignore
93 ["--extcap-interface", self.network_name, "--extcap-config"],
94 {
95 "arg": ["number", "call", "display", "default", "required"],
96 "value": ["arg", "value", "display", "default"],
97 },
98 )
99
100 def get_extcap_cmd(self, **kwarg: Dict[str, str]) -> List[str]:
101 """
102 Return the extcap command line options
103 """
104 cmds = []
105 for x in self.get_extcap_config()["arg"]:
106 key = x[1].strip("-").replace("-", "_")
107 if key in kwarg:
108 # Apply argument
109 cmds += [x[1], str(kwarg[key])]
110 else:
111 # Apply default
112 if x[4] == "true": # required
113 raise ValueError(
114 "Missing required argument: '%s' on iface %s." % (
115 key,
116 self.network_name,
117 )
118 )
119 elif not x[3] or x[3] == "false": # no default (or false)
120 continue
121 if x[3] == "true":
122 cmds += [x[1]]
123 else:
124 cmds += [x[1], x[3]]
125 return cmds
126
127
128class _ExtcapSocket(SuperSocket):
129 """
130 Read packets at layer 2 using an extcap command
131 """
132
133 nonblocking_socket = True
134
135 @staticmethod
136 def select(sockets: List[SuperSocket],
137 remain: Optional[float] = None) -> List[SuperSocket]:
138 return sockets
139
140 def __init__(self, *_: Any, **kwarg: Any) -> None:
141 cmdprog = kwarg.pop("cmdprog")
142 iface = kwarg.pop("iface", None)
143 if iface is None:
144 raise NameError("Must select an interface for a extcap socket !")
145 iface = resolve_iface(iface)
146 if not isinstance(iface, _ExtcapNetworkInterface):
147 raise ValueError("Interface should be an _ExtcapNetworkInterface")
148 args = iface.get_extcap_cmd(**kwarg)
149 iface = network_name(iface)
150 self.outs = None # extcap sockets can't write
151 # open fifo
152 fifo, fd = _create_fifo()
153 args = ["--extcap-interface", iface, "--capture", "--fifo", fifo] + args
154 self.proc = subprocess.Popen(
155 [cmdprog] + args,
156 )
157 self.fd = _open_fifo(fd)
158 self.reader = PcapReader(self.fd) # type: ignore
159 self.ins = self.reader # type: ignore
160
161 def recv(self, x: int = MTU, **kwargs: Any) -> Packet:
162 return self.reader.recv(x, **kwargs)
163
164 def close(self) -> None:
165 self.proc.kill()
166 self.proc.wait(timeout=2)
167 SuperSocket.close(self)
168 self.fd.close()
169
170
171class _ExtcapInterfaceProvider(InterfaceProvider):
172 """
173 Interface provider made to hook on a extcap binary
174 """
175
176 headers = ("Index", "Name", "Address")
177 header_sort = 1
178
179 def __init__(self, *args: Any, **kwargs: Any) -> None:
180 self.cmdprog = kwargs.pop("cmdprog")
181 super(_ExtcapInterfaceProvider, self).__init__(*args, **kwargs)
182
183 def load(self) -> Dict[str, NetworkInterface]:
184 data: Dict[str, NetworkInterface] = {}
185 try:
186 interfaces = _extcap_call(
187 self.cmdprog,
188 ["--extcap-interfaces"],
189 {"interface": ["value", "display"]},
190 )["interface"]
191 except OSError as ex:
192 warning(
193 "extcap %s failed to load: %s",
194 self.name,
195 str(ex).strip().split("\n")[-1]
196 )
197 return {}
198 for netw_name, name in interfaces:
199 _index = re.search(r".*(\d+)", name)
200 if _index:
201 index = int(_index.group(1)) + 100
202 else:
203 index = 100
204 if_data = {
205 "name": name,
206 "network_name": netw_name,
207 "description": name,
208 "index": index,
209 }
210 data[netw_name] = _ExtcapNetworkInterface(self, if_data)
211 return data
212
213 def _l2listen(self, _: Any) -> Type[SuperSocket]:
214 return functools.partial(_ExtcapSocket, cmdprog=self.cmdprog) # type: ignore
215
216 def _l3socket(self, *_: Any) -> NoReturn:
217 raise ValueError("Only sniffing is available for an extcap provider !")
218
219 _l2socket = _l3socket # type: ignore
220
221 def _is_valid(self, dev: NetworkInterface) -> bool:
222 return True
223
224 def _format(self,
225 dev: NetworkInterface,
226 **kwargs: Any
227 ) -> Tuple[Union[str, List[str]], ...]:
228 """Returns a tuple of the elements used by show()"""
229 return (str(dev.index), dev.name, dev.network_name)
230
231
232def load_extcap() -> None:
233 """
234 Load extcap folder from wireshark and populate providers
235 """
236 if WINDOWS:
237 pattern = re.compile(r"^[^.]+(?:\.bat|\.exe)?$")
238 else:
239 pattern = re.compile(r"^[^.]+(?:\.sh)?$")
240 for fld in conf.prog.extcap_folders:
241 root = pathlib.Path(fld)
242 for _cmdprog in root.glob("*"):
243 if not _cmdprog.is_file() or not pattern.match(_cmdprog.name):
244 continue
245 cmdprog = str((root / _cmdprog).absolute())
246 # success
247 provname = pathlib.Path(cmdprog).name.rsplit(".", 1)[0]
248
249 class _prov(_ExtcapInterfaceProvider):
250 name = provname
251
252 conf.ifaces.register_provider(
253 functools.partial(_prov, cmdprog=cmdprog) # type: ignore
254 )