Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/libs/extcap.py: 30%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

124 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Gabriel Potter 

5 

6""" 

7Wireshark extcap API utils 

8https://www.wireshark.org/docs/wsdg_html_chunked/ChCaptureExtcap.html 

9""" 

10 

11import collections 

12import functools 

13import pathlib 

14import re 

15import subprocess 

16 

17from scapy.config import conf 

18from scapy.consts import WINDOWS 

19from scapy.data import MTU 

20from scapy.error import warning 

21from scapy.interfaces import ( 

22 network_name, 

23 resolve_iface, 

24 InterfaceProvider, 

25 NetworkInterface, 

26) 

27from scapy.packet import Packet 

28from scapy.supersocket import SuperSocket 

29from scapy.utils import PcapReader, _create_fifo, _open_fifo 

30 

31# Typing 

32from typing import ( 

33 cast, 

34 Any, 

35 Dict, 

36 List, 

37 NoReturn, 

38 Optional, 

39 Tuple, 

40 Type, 

41 Union, 

42) 

43 

44 

45def _extcap_call(prog: str, 

46 args: List[str], 

47 format: Dict[str, List[str]], 

48 ) -> Dict[str, List[Tuple[str, ...]]]: 

49 """ 

50 Function used to call a program using the extcap format, 

51 then parse the results 

52 """ 

53 p = subprocess.Popen( 

54 [prog] + args, 

55 stdout=subprocess.PIPE, stderr=subprocess.PIPE, 

56 text=True 

57 ) 

58 data, err = p.communicate() 

59 if p.returncode != 0: 

60 raise OSError("%s returned with error code %s: %s" % (prog, p.returncode, err)) 

61 res = collections.defaultdict(list) 

62 for ifa in data.split("\n"): 

63 ifa = ifa.strip() 

64 for keyword, values in format.items(): 

65 if not ifa.startswith(keyword): 

66 continue 

67 

68 def _match(val: str, ifa: str) -> str: 

69 m = re.search(r"{%s=([^}]*)}" % val, ifa) 

70 if m: 

71 return m.group(1) 

72 return "" 

73 res[keyword].append( 

74 tuple( 

75 [_match(val, ifa) for val in values] 

76 ) 

77 ) 

78 break 

79 return cast(Dict[str, List[Tuple[str, ...]]], res) 

80 

81 

82class _ExtcapNetworkInterface(NetworkInterface): 

83 """ 

84 Extcap NetworkInterface 

85 """ 

86 

87 def get_extcap_config(self) -> Dict[str, Tuple[str, ...]]: 

88 """ 

89 Return a list of available configuration options on an extcap interface 

90 """ 

91 return _extcap_call( 

92 self.provider.cmdprog, # type: ignore 

93 ["--extcap-interface", self.network_name, "--extcap-config"], 

94 { 

95 "arg": ["number", "call", "display", "default", "required"], 

96 "value": ["arg", "value", "display", "default"], 

97 }, 

98 ) 

99 

100 def get_extcap_cmd(self, **kwarg: Dict[str, str]) -> List[str]: 

101 """ 

102 Return the extcap command line options 

103 """ 

104 cmds = [] 

105 for x in self.get_extcap_config()["arg"]: 

106 key = x[1].strip("-").replace("-", "_") 

107 if key in kwarg: 

108 # Apply argument 

109 cmds += [x[1], str(kwarg[key])] 

110 else: 

111 # Apply default 

112 if x[4] == "true": # required 

113 raise ValueError( 

114 "Missing required argument: '%s' on iface %s." % ( 

115 key, 

116 self.network_name, 

117 ) 

118 ) 

119 elif not x[3] or x[3] == "false": # no default (or false) 

120 continue 

121 if x[3] == "true": 

122 cmds += [x[1]] 

123 else: 

124 cmds += [x[1], x[3]] 

125 return cmds 

126 

127 

128class _ExtcapSocket(SuperSocket): 

129 """ 

130 Read packets at layer 2 using an extcap command 

131 """ 

132 

133 nonblocking_socket = True 

134 

135 @staticmethod 

136 def select(sockets: List[SuperSocket], 

137 remain: Optional[float] = None) -> List[SuperSocket]: 

138 return sockets 

139 

140 def __init__(self, *_: Any, **kwarg: Any) -> None: 

141 cmdprog = kwarg.pop("cmdprog") 

142 iface = kwarg.pop("iface", None) 

143 if iface is None: 

144 raise NameError("Must select an interface for a extcap socket !") 

145 iface = resolve_iface(iface) 

146 if not isinstance(iface, _ExtcapNetworkInterface): 

147 raise ValueError("Interface should be an _ExtcapNetworkInterface") 

148 args = iface.get_extcap_cmd(**kwarg) 

149 iface = network_name(iface) 

150 self.outs = None # extcap sockets can't write 

151 # open fifo 

152 fifo, fd = _create_fifo() 

153 args = ["--extcap-interface", iface, "--capture", "--fifo", fifo] + args 

154 self.proc = subprocess.Popen( 

155 [cmdprog] + args, 

156 ) 

157 self.fd = _open_fifo(fd) 

158 self.reader = PcapReader(self.fd) # type: ignore 

159 self.ins = self.reader # type: ignore 

160 

161 def recv(self, x: int = MTU, **kwargs: Any) -> Packet: 

162 return self.reader.recv(x, **kwargs) 

163 

164 def close(self) -> None: 

165 self.proc.kill() 

166 self.proc.wait(timeout=2) 

167 SuperSocket.close(self) 

168 self.fd.close() 

169 

170 

171class _ExtcapInterfaceProvider(InterfaceProvider): 

172 """ 

173 Interface provider made to hook on a extcap binary 

174 """ 

175 

176 headers = ("Index", "Name", "Address") 

177 header_sort = 1 

178 

179 def __init__(self, *args: Any, **kwargs: Any) -> None: 

180 self.cmdprog = kwargs.pop("cmdprog") 

181 super(_ExtcapInterfaceProvider, self).__init__(*args, **kwargs) 

182 

183 def load(self) -> Dict[str, NetworkInterface]: 

184 data: Dict[str, NetworkInterface] = {} 

185 try: 

186 interfaces = _extcap_call( 

187 self.cmdprog, 

188 ["--extcap-interfaces"], 

189 {"interface": ["value", "display"]}, 

190 )["interface"] 

191 except OSError as ex: 

192 warning( 

193 "extcap %s failed to load: %s", 

194 self.name, 

195 str(ex).strip().split("\n")[-1] 

196 ) 

197 return {} 

198 for netw_name, name in interfaces: 

199 _index = re.search(r".*(\d+)", name) 

200 if _index: 

201 index = int(_index.group(1)) + 100 

202 else: 

203 index = 100 

204 if_data = { 

205 "name": name, 

206 "network_name": netw_name, 

207 "description": name, 

208 "index": index, 

209 } 

210 data[netw_name] = _ExtcapNetworkInterface(self, if_data) 

211 return data 

212 

213 def _l2listen(self, _: Any) -> Type[SuperSocket]: 

214 return functools.partial(_ExtcapSocket, cmdprog=self.cmdprog) # type: ignore 

215 

216 def _l3socket(self, *_: Any) -> NoReturn: 

217 raise ValueError("Only sniffing is available for an extcap provider !") 

218 

219 _l2socket = _l3socket # type: ignore 

220 

221 def _is_valid(self, dev: NetworkInterface) -> bool: 

222 return True 

223 

224 def _format(self, 

225 dev: NetworkInterface, 

226 **kwargs: Any 

227 ) -> Tuple[Union[str, List[str]], ...]: 

228 """Returns a tuple of the elements used by show()""" 

229 return (str(dev.index), dev.name, dev.network_name) 

230 

231 

232def load_extcap() -> None: 

233 """ 

234 Load extcap folder from wireshark and populate providers 

235 """ 

236 if WINDOWS: 

237 pattern = re.compile(r"^[^.]+(?:\.bat|\.exe)?$") 

238 else: 

239 pattern = re.compile(r"^[^.]+(?:\.sh)?$") 

240 for fld in conf.prog.extcap_folders: 

241 root = pathlib.Path(fld) 

242 for _cmdprog in root.glob("*"): 

243 if not _cmdprog.is_file() or not pattern.match(_cmdprog.name): 

244 continue 

245 cmdprog = str((root / _cmdprog).absolute()) 

246 # success 

247 provname = pathlib.Path(cmdprog).name.rsplit(".", 1)[0] 

248 

249 class _prov(_ExtcapInterfaceProvider): 

250 name = provname 

251 

252 conf.ifaces.register_provider( 

253 functools.partial(_prov, cmdprog=cmdprog) # type: ignore 

254 )