1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Gabriel Potter
5
6"""
7Wireshark extcap API utils
8https://www.wireshark.org/docs/wsdg_html_chunked/ChCaptureExtcap.html
9"""
10
11import collections
12import functools
13import pathlib
14import re
15import subprocess
16
17from scapy.config import conf
18from scapy.consts import WINDOWS
19from scapy.data import MTU
20from scapy.error import warning
21from scapy.interfaces import (
22 network_name,
23 resolve_iface,
24 InterfaceProvider,
25 NetworkInterface,
26)
27from scapy.packet import Packet
28from scapy.supersocket import SuperSocket
29from scapy.utils import PcapReader, _create_fifo, _open_fifo
30
31# Typing
32from typing import (
33 cast,
34 Any,
35 Dict,
36 List,
37 NoReturn,
38 Optional,
39 Tuple,
40 Type,
41 Union,
42)
43
44
45def _extcap_call(prog: str,
46 args: List[str],
47 format: Dict[str, List[str]],
48 ) -> Dict[str, List[Tuple[str, ...]]]:
49 """
50 Function used to call a program using the extcap format,
51 then parse the results
52 """
53 p = subprocess.Popen(
54 [prog] + args,
55 # On Windows, we must be in the Wireshark/ folder.
56 cwd=pathlib.Path(prog).parent.parent,
57 stdout=subprocess.PIPE, stderr=subprocess.PIPE,
58 text=True
59 )
60 data, err = p.communicate()
61 if p.returncode != 0:
62 raise OSError("%s returned with error code %s: %s" % (prog, p.returncode, err))
63 res = collections.defaultdict(list)
64 for ifa in data.split("\n"):
65 ifa = ifa.strip()
66 for keyword, values in format.items():
67 if not ifa.startswith(keyword):
68 continue
69
70 def _match(val: str, ifa: str) -> str:
71 m = re.search(r"{%s=([^}]*)}" % val, ifa)
72 if m:
73 return m.group(1)
74 return ""
75 res[keyword].append(
76 tuple(
77 [_match(val, ifa) for val in values]
78 )
79 )
80 break
81 return cast(Dict[str, List[Tuple[str, ...]]], res)
82
83
84class _ExtcapNetworkInterface(NetworkInterface):
85 """
86 Extcap NetworkInterface
87 """
88
89 def get_extcap_config(self) -> Dict[str, Tuple[str, ...]]:
90 """
91 Return a list of available configuration options on an extcap interface
92 """
93 return _extcap_call(
94 self.provider.cmdprog, # type: ignore
95 ["--extcap-interface", self.network_name, "--extcap-config"],
96 {
97 "arg": ["number", "call", "display", "default", "required"],
98 "value": ["arg", "value", "display", "default"],
99 },
100 )
101
102 def get_extcap_cmd(self, **kwarg: Dict[str, str]) -> List[str]:
103 """
104 Return the extcap command line options
105 """
106 cmds = []
107 for x in self.get_extcap_config()["arg"]:
108 key = x[1].strip("-").replace("-", "_")
109 if key in kwarg:
110 # Apply argument
111 cmds += [x[1], str(kwarg[key])]
112 else:
113 # Apply default
114 if x[4] == "true": # required
115 raise ValueError(
116 "Missing required argument: '%s' on iface %s." % (
117 key,
118 self.network_name,
119 )
120 )
121 elif not x[3] or x[3] == "false": # no default (or false)
122 continue
123 if x[3] == "true":
124 cmds += [x[1]]
125 else:
126 cmds += [x[1], x[3]]
127 return cmds
128
129
130class _ExtcapSocket(SuperSocket):
131 """
132 Read packets at layer 2 using an extcap command
133 """
134
135 nonblocking_socket = True
136
137 @staticmethod
138 def select(sockets: List[SuperSocket],
139 remain: Optional[float] = None) -> List[SuperSocket]:
140 return sockets
141
142 def __init__(self, *_: Any, **kwarg: Any) -> None:
143 cmdprog = kwarg.pop("cmdprog")
144 iface = kwarg.pop("iface", None)
145 if iface is None:
146 raise NameError("Must select an interface for a extcap socket !")
147 iface = resolve_iface(iface)
148 if not isinstance(iface, _ExtcapNetworkInterface):
149 raise ValueError("Interface should be an _ExtcapNetworkInterface")
150 args = iface.get_extcap_cmd(**kwarg)
151 iface = network_name(iface)
152 self.outs = None # extcap sockets can't write
153 # open fifo
154 fifo, fd = _create_fifo()
155 args = ["--extcap-interface", iface, "--capture", "--fifo", fifo] + args
156 self.proc = subprocess.Popen(
157 [cmdprog] + args,
158 )
159 self.fd = _open_fifo(fd)
160 self.reader = PcapReader(self.fd) # type: ignore
161 self.ins = self.reader # type: ignore
162
163 def recv(self, x: int = MTU, **kwargs: Any) -> Packet:
164 return self.reader.recv(x, **kwargs)
165
166 def close(self) -> None:
167 self.proc.kill()
168 self.proc.wait(timeout=2)
169 SuperSocket.close(self)
170 self.fd.close()
171
172
173class _ExtcapInterfaceProvider(InterfaceProvider):
174 """
175 Interface provider made to hook on a extcap binary
176 """
177
178 headers = ("Index", "Name", "Address")
179 header_sort = 1
180
181 def __init__(self, *args: Any, **kwargs: Any) -> None:
182 self.cmdprog = kwargs.pop("cmdprog")
183 super(_ExtcapInterfaceProvider, self).__init__(*args, **kwargs)
184
185 def load(self) -> Dict[str, NetworkInterface]:
186 data: Dict[str, NetworkInterface] = {}
187 try:
188 interfaces = _extcap_call(
189 self.cmdprog,
190 ["--extcap-interfaces"],
191 {"interface": ["value", "display"]},
192 )["interface"]
193 except OSError as ex:
194 warning(
195 "extcap %s failed to load: %s",
196 self.name,
197 str(ex).strip().split("\n")[-1]
198 )
199 return {}
200 for netw_name, name in interfaces:
201 _index = re.search(r".*(\d+)", name)
202 if _index:
203 index = int(_index.group(1)) + 100
204 else:
205 index = 100
206 if_data = {
207 "name": name,
208 "network_name": netw_name,
209 "description": name,
210 "index": index,
211 }
212 data[netw_name] = _ExtcapNetworkInterface(self, if_data)
213 return data
214
215 def _l2listen(self, _: Any) -> Type[SuperSocket]:
216 return functools.partial(_ExtcapSocket, cmdprog=self.cmdprog) # type: ignore
217
218 def _l3socket(self, *_: Any) -> NoReturn:
219 raise ValueError("Only sniffing is available for an extcap provider !")
220
221 _l2socket = _l3socket # type: ignore
222
223 def _is_valid(self, dev: NetworkInterface) -> bool:
224 return True
225
226 def _format(self,
227 dev: NetworkInterface,
228 **kwargs: Any
229 ) -> Tuple[Union[str, List[str]], ...]:
230 """Returns a tuple of the elements used by show()"""
231 return (str(dev.index), dev.name, dev.network_name)
232
233
234def load_extcap() -> None:
235 """
236 Load extcap folder from wireshark and populate Scapy's providers.
237
238 Additional interfaces should appear in conf.ifaces.
239 """
240 if WINDOWS:
241 pattern = re.compile(r"^[^.]+(?:\.bat|\.exe)?$")
242 else:
243 pattern = re.compile(r"^[^.]+(?:\.sh)?$")
244 for fld in conf.prog.extcap_folders:
245 root = pathlib.Path(fld)
246 for _cmdprog in root.glob("*"):
247 if not _cmdprog.is_file() or not pattern.match(_cmdprog.name):
248 continue
249 cmdprog = str((root / _cmdprog).absolute())
250 # success
251 provname = pathlib.Path(cmdprog).name.rsplit(".", 1)[0]
252
253 class _prov(_ExtcapInterfaceProvider):
254 name = provname
255
256 conf.ifaces.register_provider(
257 functools.partial(_prov, cmdprog=cmdprog) # type: ignore
258 )