Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/libs/extcap.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

125 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7Wireshark extcap API utils 

8https://www.wireshark.org/docs/wsdg_html_chunked/ChCaptureExtcap.html 

9""" 

10 

11import collections 

12import functools 

13import pathlib 

14import re 

15import subprocess 

16 

17from scapy.config import conf 

18from scapy.consts import WINDOWS 

19from scapy.data import MTU 

20from scapy.error import warning 

21from scapy.interfaces import ( 

22 network_name, 

23 resolve_iface, 

24 InterfaceProvider, 

25 NetworkInterface, 

26) 

27from scapy.packet import Packet 

28from scapy.supersocket import SuperSocket 

29from scapy.utils import PcapReader, _create_fifo, _open_fifo 

30 

31# Typing 

32from typing import ( 

33 cast, 

34 Any, 

35 Dict, 

36 List, 

37 NoReturn, 

38 Optional, 

39 Tuple, 

40 Type, 

41 Union, 

42) 

43 

44 

45def _extcap_call(prog: str, 

46 args: List[str], 

47 format: Dict[str, List[str]], 

48 ) -> Dict[str, List[Tuple[str, ...]]]: 

49 """ 

50 Function used to call a program using the extcap format, 

51 then parse the results 

52 """ 

53 p = subprocess.Popen( 

54 [prog] + args, 

55 # On Windows, we must be in the Wireshark/ folder. 

56 cwd=pathlib.Path(prog).parent.parent, 

57 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 

58 text=True 

59 ) 

60 data, err = p.communicate() 

61 if p.returncode != 0: 

62 raise OSError("%s returned with error code %s: %s" % (prog, p.returncode, err)) 

63 res = collections.defaultdict(list) 

64 for ifa in data.split("\n"): 

65 ifa = ifa.strip() 

66 for keyword, values in format.items(): 

67 if not ifa.startswith(keyword): 

68 continue 

69 

70 def _match(val: str, ifa: str) -> str: 

71 m = re.search(r"{%s=([^}]*)}" % val, ifa) 

72 if m: 

73 return m.group(1) 

74 return "" 

75 res[keyword].append( 

76 tuple( 

77 [_match(val, ifa) for val in values] 

78 ) 

79 ) 

80 break 

81 return cast(Dict[str, List[Tuple[str, ...]]], res) 

82 

83 

84class _ExtcapNetworkInterface(NetworkInterface): 

85 """ 

86 Extcap NetworkInterface 

87 """ 

88 

89 def get_extcap_config(self) -> Dict[str, Tuple[str, ...]]: 

90 """ 

91 Return a list of available configuration options on an extcap interface 

92 """ 

93 return _extcap_call( 

94 self.provider.cmdprog, # type: ignore 

95 ["--extcap-interface", self.network_name, "--extcap-config"], 

96 { 

97 "arg": ["number", "call", "display", "default", "required"], 

98 "value": ["arg", "value", "display", "default"], 

99 }, 

100 ) 

101 

102 def get_extcap_cmd(self, **kwarg: Dict[str, str]) -> List[str]: 

103 """ 

104 Return the extcap command line options 

105 """ 

106 cmds = [] 

107 for x in self.get_extcap_config()["arg"]: 

108 key = x[1].strip("-").replace("-", "_") 

109 if key in kwarg: 

110 # Apply argument 

111 cmds += [x[1], str(kwarg[key])] 

112 else: 

113 # Apply default 

114 if x[4] == "true": # required 

115 raise ValueError( 

116 "Missing required argument: '%s' on iface %s." % ( 

117 key, 

118 self.network_name, 

119 ) 

120 ) 

121 elif not x[3] or x[3] == "false": # no default (or false) 

122 continue 

123 if x[3] == "true": 

124 cmds += [x[1]] 

125 else: 

126 cmds += [x[1], x[3]] 

127 return cmds 

128 

129 

130class _ExtcapSocket(SuperSocket): 

131 """ 

132 Read packets at layer 2 using an extcap command 

133 """ 

134 

135 nonblocking_socket = True 

136 

137 @staticmethod 

138 def select(sockets: List[SuperSocket], 

139 remain: Optional[float] = None) -> List[SuperSocket]: 

140 return sockets 

141 

142 def __init__(self, *_: Any, **kwarg: Any) -> None: 

143 cmdprog = kwarg.pop("cmdprog") 

144 iface = kwarg.pop("iface", None) 

145 if iface is None: 

146 raise NameError("Must select an interface for a extcap socket !") 

147 iface = resolve_iface(iface) 

148 if not isinstance(iface, _ExtcapNetworkInterface): 

149 raise ValueError("Interface should be an _ExtcapNetworkInterface") 

150 args = iface.get_extcap_cmd(**kwarg) 

151 iface = network_name(iface) 

152 self.outs = None # extcap sockets can't write 

153 # open fifo 

154 fifo, fd = _create_fifo() 

155 args = ["--extcap-interface", iface, "--capture", "--fifo", fifo] + args 

156 self.proc = subprocess.Popen( 

157 [cmdprog] + args, 

158 ) 

159 self.fd = _open_fifo(fd) 

160 self.reader = PcapReader(self.fd) # type: ignore 

161 self.ins = self.reader # type: ignore 

162 

163 def recv(self, x: int = MTU, **kwargs: Any) -> Packet: 

164 return self.reader.recv(x, **kwargs) 

165 

166 def close(self) -> None: 

167 self.proc.kill() 

168 self.proc.wait(timeout=2) 

169 SuperSocket.close(self) 

170 self.fd.close() 

171 

172 

173class _ExtcapInterfaceProvider(InterfaceProvider): 

174 """ 

175 Interface provider made to hook on a extcap binary 

176 """ 

177 

178 headers = ("Index", "Name", "Address") 

179 header_sort = 1 

180 

181 def __init__(self, *args: Any, **kwargs: Any) -> None: 

182 self.cmdprog = kwargs.pop("cmdprog") 

183 super(_ExtcapInterfaceProvider, self).__init__(*args, **kwargs) 

184 

185 def load(self) -> Dict[str, NetworkInterface]: 

186 data: Dict[str, NetworkInterface] = {} 

187 try: 

188 interfaces = _extcap_call( 

189 self.cmdprog, 

190 ["--extcap-interfaces"], 

191 {"interface": ["value", "display"]}, 

192 )["interface"] 

193 except OSError as ex: 

194 warning( 

195 "extcap %s failed to load: %s", 

196 self.name, 

197 str(ex).strip().split("\n")[-1] 

198 ) 

199 return {} 

200 for netw_name, name in interfaces: 

201 _index = re.search(r".*(\d+)", name) 

202 if _index: 

203 index = int(_index.group(1)) + 100 

204 else: 

205 index = 100 

206 if_data = { 

207 "name": name, 

208 "network_name": netw_name, 

209 "description": name, 

210 "index": index, 

211 } 

212 data[netw_name] = _ExtcapNetworkInterface(self, if_data) 

213 return data 

214 

215 def _l2listen(self, _: Any) -> Type[SuperSocket]: 

216 return functools.partial(_ExtcapSocket, cmdprog=self.cmdprog) # type: ignore 

217 

218 def _l3socket(self, *_: Any) -> NoReturn: 

219 raise ValueError("Only sniffing is available for an extcap provider !") 

220 

221 _l2socket = _l3socket # type: ignore 

222 

223 def _is_valid(self, dev: NetworkInterface) -> bool: 

224 return True 

225 

226 def _format(self, 

227 dev: NetworkInterface, 

228 **kwargs: Any 

229 ) -> Tuple[Union[str, List[str]], ...]: 

230 """Returns a tuple of the elements used by show()""" 

231 return (str(dev.index), dev.name, dev.network_name) 

232 

233 

234def load_extcap() -> None: 

235 """ 

236 Load extcap folder from wireshark and populate Scapy's providers. 

237 

238 Additional interfaces should appear in conf.ifaces. 

239 """ 

240 if WINDOWS: 

241 pattern = re.compile(r"^[^.]+(?:\.bat|\.exe)?$") 

242 else: 

243 pattern = re.compile(r"^[^.]+(?:\.sh)?$") 

244 for fld in conf.prog.extcap_folders: 

245 root = pathlib.Path(fld) 

246 for _cmdprog in root.glob("*"): 

247 if not _cmdprog.is_file() or not pattern.match(_cmdprog.name): 

248 continue 

249 cmdprog = str((root / _cmdprog).absolute()) 

250 # success 

251 provname = pathlib.Path(cmdprog).name.rsplit(".", 1)[0] 

252 

253 class _prov(_ExtcapInterfaceProvider): 

254 name = provname 

255 

256 conf.ifaces.register_provider( 

257 functools.partial(_prov, cmdprog=cmdprog) # type: ignore 

258 )