Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/ansmachine.py: 32%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

185 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Answering machines. 

8""" 

9 

10######################## 

11# Answering machines # 

12######################## 

13 

14import abc 

15import functools 

16import threading 

17import socket 

18import warnings 

19 

20from scapy.arch import get_if_addr 

21from scapy.config import conf 

22from scapy.sendrecv import sendp, sniff, AsyncSniffer 

23from scapy.packet import Packet 

24from scapy.plist import PacketList 

25 

26from typing import ( 

27 Any, 

28 Callable, 

29 Dict, 

30 Generic, 

31 Optional, 

32 Tuple, 

33 Type, 

34 TypeVar, 

35 cast, 

36) 

37 

38_T = TypeVar("_T", Packet, PacketList) 

39 

40 

41class ReferenceAM(type): 

42 def __new__(cls, 

43 name, # type: str 

44 bases, # type: Tuple[type, ...] 

45 dct # type: Dict[str, Any] 

46 ): 

47 # type: (...) -> Type['AnsweringMachine[_T]'] 

48 obj = cast('Type[AnsweringMachine[_T]]', 

49 super(ReferenceAM, cls).__new__(cls, name, bases, dct)) 

50 try: 

51 import inspect 

52 obj.__signature__ = inspect.signature( # type: ignore 

53 obj.parse_options 

54 ) 

55 except (ImportError, AttributeError): 

56 pass 

57 if obj.function_name: 

58 func = lambda obj=obj, *args, **kargs: obj(*args, **kargs)() # type: ignore # noqa: E501 

59 # Inject signature 

60 func.__name__ = func.__qualname__ = obj.function_name 

61 func.__doc__ = obj.__doc__ or obj.parse_options.__doc__ 

62 try: 

63 func.__signature__ = obj.__signature__ # type: ignore 

64 except (AttributeError): 

65 pass 

66 globals()[obj.function_name] = func 

67 return obj 

68 

69 

70class AnsweringMachine(Generic[_T], metaclass=ReferenceAM): 

71 function_name = "" 

72 filter = None # type: Optional[str] 

73 sniff_options = {"store": 0} # type: Dict[str, Any] 

74 sniff_options_list = ["store", "iface", "count", "promisc", "filter", 

75 "type", "prn", "stop_filter", "opened_socket"] 

76 send_options = {"verbose": 0} # type: Dict[str, Any] 

77 send_options_list = ["iface", "inter", "loop", "verbose", "socket"] 

78 send_function = staticmethod(sendp) 

79 

80 def __init__(self, **kargs): 

81 # type: (Any) -> None 

82 self.mode = 0 

83 self.verbose = kargs.get("verbose", conf.verb >= 0) 

84 if self.filter: 

85 kargs.setdefault("filter", self.filter) 

86 kargs.setdefault("prn", self.reply) 

87 self.optam1 = {} # type: Dict[str, Any] 

88 self.optam2 = {} # type: Dict[str, Any] 

89 self.optam0 = {} # type: Dict[str, Any] 

90 doptsend, doptsniff = self.parse_all_options(1, kargs) 

91 self.defoptsend = self.send_options.copy() 

92 self.defoptsend.update(doptsend) 

93 self.defoptsniff = self.sniff_options.copy() 

94 self.defoptsniff.update(doptsniff) 

95 self.optsend = {} # type: Dict[str, Any] 

96 self.optsniff = {} # type: Dict[str, Any] 

97 

98 def __getattr__(self, attr): 

99 # type: (str) -> Any 

100 for dct in [self.optam2, self.optam1]: 

101 if attr in dct: 

102 return dct[attr] 

103 raise AttributeError(attr) 

104 

105 def __setattr__(self, attr, val): 

106 # type: (str, Any) -> None 

107 mode = self.__dict__.get("mode", 0) 

108 if mode == 0: 

109 self.__dict__[attr] = val 

110 else: 

111 [self.optam1, self.optam2][mode - 1][attr] = val 

112 

113 def parse_options(self): 

114 # type: () -> None 

115 pass 

116 

117 def parse_all_options(self, mode, kargs): 

118 # type: (int, Any) -> Tuple[Dict[str, Any], Dict[str, Any]] 

119 sniffopt = {} # type: Dict[str, Any] 

120 sendopt = {} # type: Dict[str, Any] 

121 for k in list(kargs): # use list(): kargs is modified in the loop 

122 if k in self.sniff_options_list: 

123 sniffopt[k] = kargs[k] 

124 if k in self.send_options_list: 

125 sendopt[k] = kargs[k] 

126 if k in self.sniff_options_list + self.send_options_list: 

127 del kargs[k] 

128 if mode != 2 or kargs: 

129 if mode == 1: 

130 self.optam0 = kargs 

131 elif mode == 2 and kargs: 

132 k = self.optam0.copy() 

133 k.update(kargs) 

134 self.parse_options(**k) 

135 kargs = k 

136 omode = self.__dict__.get("mode", 0) 

137 self.__dict__["mode"] = mode 

138 self.parse_options(**kargs) 

139 self.__dict__["mode"] = omode 

140 return sendopt, sniffopt 

141 

142 def is_request(self, req): 

143 # type: (Packet) -> int 

144 return 1 

145 

146 @abc.abstractmethod 

147 def make_reply(self, req): 

148 # type: (Packet) -> _T 

149 pass 

150 

151 def send_reply(self, reply, send_function=None): 

152 # type: (_T, Optional[Callable[..., None]]) -> None 

153 if send_function: 

154 send_function(reply) 

155 else: 

156 self.send_function(reply, **self.optsend) 

157 

158 def print_reply(self, req, reply): 

159 # type: (Packet, _T) -> None 

160 if isinstance(reply, PacketList): 

161 print("%s ==> %s" % (req.summary(), 

162 [res.summary() for res in reply])) 

163 else: 

164 print("%s ==> %s" % (req.summary(), reply.summary())) 

165 

166 def reply(self, pkt, send_function=None, address=None): 

167 # type: (Packet, Optional[Callable[..., None]], Optional[Any]) -> None 

168 if not self.is_request(pkt): 

169 return 

170 if address: 

171 # Only on AnsweringMachineTCP 

172 reply = self.make_reply(pkt, address=address) # type: ignore 

173 else: 

174 reply = self.make_reply(pkt) 

175 if not reply: 

176 return 

177 if send_function: 

178 self.send_reply(reply, send_function=send_function) 

179 else: 

180 # Retro-compability. Remove this if eventually 

181 self.send_reply(reply) 

182 if self.verbose: 

183 self.print_reply(pkt, reply) 

184 

185 def run(self, *args, **kargs): 

186 # type: (Any, Any) -> None 

187 warnings.warn( 

188 "run() method deprecated. The instance is now callable", 

189 DeprecationWarning 

190 ) 

191 self(*args, **kargs) 

192 

193 def bg(self, *args, **kwargs): 

194 # type: (Any, Any) -> AsyncSniffer 

195 kwargs.setdefault("bg", True) 

196 self(*args, **kwargs) 

197 return self.sniffer 

198 

199 def __call__(self, *args, **kargs): 

200 # type: (Any, Any) -> None 

201 bg = kargs.pop("bg", False) 

202 optsend, optsniff = self.parse_all_options(2, kargs) 

203 self.optsend = self.defoptsend.copy() 

204 self.optsend.update(optsend) 

205 self.optsniff = self.defoptsniff.copy() 

206 self.optsniff.update(optsniff) 

207 

208 if bg: 

209 self.sniff_bg() 

210 else: 

211 try: 

212 self.sniff() 

213 except KeyboardInterrupt: 

214 print("Interrupted by user") 

215 

216 def sniff(self): 

217 # type: () -> None 

218 sniff(**self.optsniff) 

219 

220 def sniff_bg(self): 

221 # type: () -> None 

222 self.sniffer = AsyncSniffer(**self.optsniff) 

223 self.sniffer.start() 

224 

225 

226class AnsweringMachineTCP(AnsweringMachine[Packet]): 

227 """ 

228 An answering machine that use the classic socket.socket to 

229 answer multiple TCP clients 

230 """ 

231 TYPE = socket.SOCK_STREAM 

232 

233 def parse_options(self, port=80, cls=conf.raw_layer): 

234 # type: (int, Type[Packet]) -> None 

235 self.port = port 

236 self.cls = cls 

237 

238 def close(self): 

239 # type: () -> None 

240 pass 

241 

242 def sniff(self): 

243 # type: () -> None 

244 from scapy.supersocket import StreamSocket 

245 ssock = socket.socket(socket.AF_INET, self.TYPE) 

246 try: 

247 ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 

248 except OSError: 

249 pass 

250 ssock.bind( 

251 (get_if_addr(self.optsniff.get("iface", conf.iface)), self.port)) 

252 ssock.listen() 

253 sniffers = [] 

254 try: 

255 while True: 

256 clientsocket, address = ssock.accept() 

257 print("%s connected" % repr(address)) 

258 sock = StreamSocket(clientsocket, self.cls) 

259 optsniff = self.optsniff.copy() 

260 optsniff["prn"] = functools.partial(self.reply, 

261 send_function=sock.send, 

262 address=address) 

263 del optsniff["iface"] 

264 sniffer = AsyncSniffer(opened_socket=sock, **optsniff) 

265 sniffer.start() 

266 sniffers.append((sniffer, sock)) 

267 finally: 

268 for (sniffer, sock) in sniffers: 

269 try: 

270 sniffer.stop() 

271 except Exception: 

272 pass 

273 sock.close() 

274 self.close() 

275 ssock.close() 

276 

277 def sniff_bg(self): 

278 # type: () -> None 

279 self.sniffer = threading.Thread(target=self.sniff) # type: ignore 

280 self.sniffer.start() 

281 

282 def make_reply(self, req, address=None): 

283 # type: (Packet, Optional[Any]) -> Packet 

284 return req 

285 

286 

287class AnsweringMachineUDP(AnsweringMachineTCP): 

288 """ 

289 An answering machine that use the classic socket.socket to 

290 answer multiple UDP clients 

291 """ 

292 TYPE = socket.SOCK_DGRAM