1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
4# Copyright (C) Philippe Biondi <phil@secdev.org>
5
6"""
7Answering machines.
8"""
9
10########################
11# Answering machines #
12########################
13
14import abc
15import functools
16import threading
17import socket
18import warnings
19
20from scapy.arch import get_if_addr
21from scapy.config import conf
22from scapy.sendrecv import sendp, sniff, AsyncSniffer
23from scapy.packet import Packet
24from scapy.plist import PacketList
25
26from typing import (
27 Any,
28 Callable,
29 Dict,
30 Generic,
31 Optional,
32 Tuple,
33 Type,
34 TypeVar,
35 cast,
36)
37
38_T = TypeVar("_T", Packet, PacketList)
39
40
41class ReferenceAM(type):
42 def __new__(cls,
43 name, # type: str
44 bases, # type: Tuple[type, ...]
45 dct # type: Dict[str, Any]
46 ):
47 # type: (...) -> Type['AnsweringMachine[_T]']
48 obj = cast('Type[AnsweringMachine[_T]]',
49 super(ReferenceAM, cls).__new__(cls, name, bases, dct))
50 try:
51 import inspect
52 obj.__signature__ = inspect.signature( # type: ignore
53 obj.parse_options
54 )
55 except (ImportError, AttributeError):
56 pass
57 if obj.function_name:
58 func = lambda obj=obj, *args, **kargs: obj(*args, **kargs)() # type: ignore # noqa: E501
59 # Inject signature
60 func.__name__ = func.__qualname__ = obj.function_name
61 func.__doc__ = obj.__doc__ or obj.parse_options.__doc__
62 try:
63 func.__signature__ = obj.__signature__ # type: ignore
64 except (AttributeError):
65 pass
66 globals()[obj.function_name] = func
67 return obj
68
69
70class AnsweringMachine(Generic[_T], metaclass=ReferenceAM):
71 function_name = ""
72 filter = None # type: Optional[str]
73 sniff_options = {"store": 0} # type: Dict[str, Any]
74 sniff_options_list = ["store", "iface", "count", "promisc", "filter",
75 "type", "prn", "stop_filter", "opened_socket"]
76 send_options = {"verbose": 0} # type: Dict[str, Any]
77 send_options_list = ["iface", "inter", "loop", "verbose", "socket"]
78 send_function = staticmethod(sendp)
79
80 def __init__(self, **kargs):
81 # type: (Any) -> None
82 self.mode = 0
83 self.verbose = kargs.get("verbose", conf.verb >= 0)
84 if self.filter:
85 kargs.setdefault("filter", self.filter)
86 kargs.setdefault("prn", self.reply)
87 self.optam1 = {} # type: Dict[str, Any]
88 self.optam2 = {} # type: Dict[str, Any]
89 self.optam0 = {} # type: Dict[str, Any]
90 doptsend, doptsniff = self.parse_all_options(1, kargs)
91 self.defoptsend = self.send_options.copy()
92 self.defoptsend.update(doptsend)
93 self.defoptsniff = self.sniff_options.copy()
94 self.defoptsniff.update(doptsniff)
95 self.optsend = {} # type: Dict[str, Any]
96 self.optsniff = {} # type: Dict[str, Any]
97
98 def __getattr__(self, attr):
99 # type: (str) -> Any
100 for dct in [self.optam2, self.optam1]:
101 if attr in dct:
102 return dct[attr]
103 raise AttributeError(attr)
104
105 def __setattr__(self, attr, val):
106 # type: (str, Any) -> None
107 mode = self.__dict__.get("mode", 0)
108 if mode == 0:
109 self.__dict__[attr] = val
110 else:
111 [self.optam1, self.optam2][mode - 1][attr] = val
112
113 def parse_options(self):
114 # type: () -> None
115 pass
116
117 def parse_all_options(self, mode, kargs):
118 # type: (int, Any) -> Tuple[Dict[str, Any], Dict[str, Any]]
119 sniffopt = {} # type: Dict[str, Any]
120 sendopt = {} # type: Dict[str, Any]
121 for k in list(kargs): # use list(): kargs is modified in the loop
122 if k in self.sniff_options_list:
123 sniffopt[k] = kargs[k]
124 if k in self.send_options_list:
125 sendopt[k] = kargs[k]
126 if k in self.sniff_options_list + self.send_options_list:
127 del kargs[k]
128 if mode != 2 or kargs:
129 if mode == 1:
130 self.optam0 = kargs
131 elif mode == 2 and kargs:
132 k = self.optam0.copy()
133 k.update(kargs)
134 self.parse_options(**k)
135 kargs = k
136 omode = self.__dict__.get("mode", 0)
137 self.__dict__["mode"] = mode
138 self.parse_options(**kargs)
139 self.__dict__["mode"] = omode
140 return sendopt, sniffopt
141
142 def is_request(self, req):
143 # type: (Packet) -> int
144 return 1
145
146 @abc.abstractmethod
147 def make_reply(self, req):
148 # type: (Packet) -> _T
149 pass
150
151 def send_reply(self, reply, send_function=None):
152 # type: (_T, Optional[Callable[..., None]]) -> None
153 if send_function:
154 send_function(reply)
155 else:
156 self.send_function(reply, **self.optsend)
157
158 def print_reply(self, req, reply):
159 # type: (Packet, _T) -> None
160 if isinstance(reply, PacketList):
161 print("%s ==> %s" % (req.summary(),
162 [res.summary() for res in reply]))
163 else:
164 print("%s ==> %s" % (req.summary(), reply.summary()))
165
166 def reply(self, pkt, send_function=None, address=None):
167 # type: (Packet, Optional[Callable[..., None]], Optional[Any]) -> None
168 if not self.is_request(pkt):
169 return
170 if address:
171 # Only on AnsweringMachineTCP
172 reply = self.make_reply(pkt, address=address) # type: ignore
173 else:
174 reply = self.make_reply(pkt)
175 if not reply:
176 return
177 if send_function:
178 self.send_reply(reply, send_function=send_function)
179 else:
180 # Retro-compability. Remove this if eventually
181 self.send_reply(reply)
182 if self.verbose:
183 self.print_reply(pkt, reply)
184
185 def run(self, *args, **kargs):
186 # type: (Any, Any) -> None
187 warnings.warn(
188 "run() method deprecated. The instance is now callable",
189 DeprecationWarning
190 )
191 self(*args, **kargs)
192
193 def bg(self, *args, **kwargs):
194 # type: (Any, Any) -> AsyncSniffer
195 kwargs.setdefault("bg", True)
196 self(*args, **kwargs)
197 return self.sniffer
198
199 def __call__(self, *args, **kargs):
200 # type: (Any, Any) -> None
201 bg = kargs.pop("bg", False)
202 optsend, optsniff = self.parse_all_options(2, kargs)
203 self.optsend = self.defoptsend.copy()
204 self.optsend.update(optsend)
205 self.optsniff = self.defoptsniff.copy()
206 self.optsniff.update(optsniff)
207
208 if bg:
209 self.sniff_bg()
210 else:
211 try:
212 self.sniff()
213 except KeyboardInterrupt:
214 print("Interrupted by user")
215
216 def sniff(self):
217 # type: () -> None
218 sniff(**self.optsniff)
219
220 def sniff_bg(self):
221 # type: () -> None
222 self.sniffer = AsyncSniffer(**self.optsniff)
223 self.sniffer.start()
224
225
226class AnsweringMachineTCP(AnsweringMachine[Packet]):
227 """
228 An answering machine that use the classic socket.socket to
229 answer multiple TCP clients
230 """
231 TYPE = socket.SOCK_STREAM
232
233 def parse_options(self, port=80, cls=conf.raw_layer):
234 # type: (int, Type[Packet]) -> None
235 self.port = port
236 self.cls = cls
237
238 def close(self):
239 # type: () -> None
240 pass
241
242 def sniff(self):
243 # type: () -> None
244 from scapy.supersocket import StreamSocket
245 ssock = socket.socket(socket.AF_INET, self.TYPE)
246 try:
247 ssock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
248 except OSError:
249 pass
250 ssock.bind(
251 (get_if_addr(self.optsniff.get("iface", conf.iface)), self.port))
252 ssock.listen()
253 sniffers = []
254 try:
255 while True:
256 clientsocket, address = ssock.accept()
257 print("%s connected" % repr(address))
258 sock = StreamSocket(clientsocket, self.cls)
259 optsniff = self.optsniff.copy()
260 optsniff["prn"] = functools.partial(self.reply,
261 send_function=sock.send,
262 address=address)
263 del optsniff["iface"]
264 sniffer = AsyncSniffer(opened_socket=sock, **optsniff)
265 sniffer.start()
266 sniffers.append((sniffer, sock))
267 finally:
268 for (sniffer, sock) in sniffers:
269 try:
270 sniffer.stop()
271 except Exception:
272 pass
273 sock.close()
274 self.close()
275 ssock.close()
276
277 def sniff_bg(self):
278 # type: () -> None
279 self.sniffer = threading.Thread(target=self.sniff) # type: ignore
280 self.sniffer.start()
281
282 def make_reply(self, req, address=None):
283 # type: (Packet, Optional[Any]) -> Packet
284 return req
285
286
287class AnsweringMachineUDP(AnsweringMachineTCP):
288 """
289 An answering machine that use the classic socket.socket to
290 answer multiple UDP clients
291 """
292 TYPE = socket.SOCK_DGRAM