Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/sessions.py: 18%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
5"""
6Sessions: decode flow of packets when sniffing
7"""
9from collections import defaultdict
10import socket
11import struct
13from scapy.compat import orb
14from scapy.config import conf
15from scapy.packet import NoPayload, Packet
16from scapy.pton_ntop import inet_pton
18# Typing imports
19from typing import (
20 Any,
21 Callable,
22 DefaultDict,
23 Dict,
24 Iterator,
25 List,
26 Optional,
27 Tuple,
28 Type,
29 cast,
30 TYPE_CHECKING,
31)
32from scapy.compat import Self
33if TYPE_CHECKING:
34 from scapy.supersocket import SuperSocket
37class DefaultSession(object):
38 """Default session: no stream decoding"""
40 def __init__(self, supersession: Optional[Self] = None):
41 if supersession and not isinstance(supersession, DefaultSession):
42 supersession = supersession()
43 self.supersession = supersession
45 def process(self, pkt: Packet) -> Optional[Packet]:
46 """
47 Called to pre-process the packet
48 """
49 # Optionally handle supersession
50 if self.supersession:
51 return self.supersession.process(pkt)
52 return pkt
54 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]:
55 """
56 Will be called by sniff() to ask for a packet
57 """
58 pkt = sock.recv()
59 if not pkt:
60 return
61 pkt = self.process(pkt)
62 if pkt:
63 yield pkt
66class IPSession(DefaultSession):
67 """Defragment IP packets 'on-the-flow'.
69 Usage:
70 >>> sniff(session=IPSession)
71 """
73 def __init__(self, *args, **kwargs):
74 # type: (*Any, **Any) -> None
75 DefaultSession.__init__(self, *args, **kwargs)
76 self.fragments = defaultdict(list) # type: DefaultDict[Tuple[Any, ...], List[Packet]] # noqa: E501
78 def process(self, packet: Packet) -> Optional[Packet]:
79 from scapy.layers.inet import IP, _defrag_ip_pkt
80 if not packet:
81 return None
82 if IP not in packet:
83 return packet
84 return _defrag_ip_pkt(packet, self.fragments)[1] # type: ignore
87class StringBuffer(object):
88 """StringBuffer is an object used to re-order data received during
89 a TCP transmission.
91 Each TCP fragment contains a sequence number, which marks
92 (relatively to the first sequence number) the index of the data contained
93 in the fragment.
95 If a TCP fragment is missed, this class will fill the missing space with
96 zeros.
97 """
99 def __init__(self):
100 # type: () -> None
101 self.content = bytearray(b"")
102 self.content_len = 0
103 self.noff = 0 # negative offset
104 self.incomplete = [] # type: List[Tuple[int, int]]
106 def append(self, data: bytes, seq: Optional[int] = None) -> None:
107 data_len = len(data)
108 if seq is None:
109 seq = self.content_len
110 seq = seq - 1 - self.noff
111 if seq < 0:
112 # Data is located before the start of the current buffer
113 # (e.g. the first fragment was missing)
114 self.content = bytearray(b"\x00" * (-seq)) + self.content
115 self.content_len += (-seq)
116 self.noff += seq
117 seq = 0
118 if seq + data_len > self.content_len:
119 # Data is located after the end of the current buffer
120 self.content += b"\x00" * (seq - self.content_len + data_len)
121 # As data was missing, mark it.
122 # self.incomplete.append((self.content_len, seq))
123 self.content_len = seq + data_len
124 assert len(self.content) == self.content_len
125 # XXX removes empty space marker.
126 # for ifrag in self.incomplete:
127 # if [???]:
128 # self.incomplete.remove([???])
129 memoryview(self.content)[seq:seq + data_len] = data
131 def shiftleft(self, i: int) -> None:
132 self.content = self.content[i:]
133 self.content_len -= i
135 def full(self):
136 # type: () -> bool
137 # Should only be true when all missing data was filled up,
138 # (or there never was missing data)
139 return True # XXX
141 def clear(self):
142 # type: () -> None
143 self.__init__() # type: ignore
145 def __bool__(self):
146 # type: () -> bool
147 return bool(self.content_len)
148 __nonzero__ = __bool__
150 def __len__(self):
151 # type: () -> int
152 return self.content_len
154 def __bytes__(self):
155 # type: () -> bytes
156 return bytes(self.content)
158 def __str__(self):
159 # type: () -> str
160 return cast(str, self.__bytes__())
163def streamcls(cls: Type[Packet]) -> Callable[
164 [bytes, Dict[str, Any], Dict[str, Any]],
165 Optional[Packet],
166]:
167 """
168 Wraps a class for use when dissecting streams.
169 """
170 if hasattr(cls, "tcp_reassemble"):
171 return cls.tcp_reassemble # type: ignore
172 else:
173 # There is no tcp_reassemble. Just dissect the packet
174 return lambda data, *_: data and cls(data)
177class TCPSession(IPSession):
178 """A Session that reconstructs TCP streams.
180 NOTE: this has the same effect as wrapping a real socket.socket into StreamSocket,
181 but for all concurrent TCP streams (can be used on pcaps or sniffed sessions).
183 NOTE: only protocols that implement a ``tcp_reassemble`` function will be processed
184 by this session. Other protocols will not be reconstructed.
186 DEV: implement a class-function `tcp_reassemble` in your Packet class::
188 @classmethod
189 def tcp_reassemble(cls, data, metadata, session):
190 # data = the reassembled data from the same request/flow
191 # metadata = empty dictionary, that can be used to store data
192 # during TCP reassembly
193 # session = a dictionary proper to the bidirectional TCP session,
194 # that can be used to store anything
195 [...]
196 # If the packet is available, return it. Otherwise don't.
197 # Whenever you return a packet, the buffer will be discarded.
198 return pkt
199 # Otherwise, maybe store stuff in metadata, and return None,
200 # as you need additional data.
201 return None
203 For more details and a real example, see:
204 https://scapy.readthedocs.io/en/latest/usage.html#how-to-use-tcpsession-to-defragment-tcp-packets
206 :param app: Whether the socket is on application layer = has no TCP
207 layer. This is identical to StreamSocket so only use this if your
208 underlying source of data isn't a socket.socket.
209 """
211 def __init__(self, app=False, *args, **kwargs):
212 # type: (bool, *Any, **Any) -> None
213 super(TCPSession, self).__init__(*args, **kwargs)
214 self.app = app
215 if app:
216 self.data = StringBuffer()
217 self.metadata = {} # type: Dict[str, Any]
218 self.session = {} # type: Dict[str, Any]
219 else:
220 # The StringBuffer() is used to build a global
221 # string from fragments and their seq nulber
222 self.tcp_frags = defaultdict(
223 lambda: (StringBuffer(), {})
224 ) # type: DefaultDict[bytes, Tuple[StringBuffer, Dict[str, Any]]]
225 self.tcp_sessions = defaultdict(
226 dict
227 ) # type: DefaultDict[bytes, Dict[str, Any]]
228 # Setup stopping dissection condition
229 from scapy.layers.inet import TCP
230 self.stop_dissection_after = TCP
232 def _get_ident(self, pkt, session=False):
233 # type: (Packet, bool) -> bytes
234 underlayer = pkt["TCP"].underlayer
235 af = socket.AF_INET6 if "IPv6" in pkt else socket.AF_INET
236 src = underlayer and inet_pton(af, underlayer.src) or b""
237 dst = underlayer and inet_pton(af, underlayer.dst) or b""
238 if session:
239 # Bidirectional
240 def xor(x, y):
241 # type: (bytes, bytes) -> bytes
242 return bytes(orb(a) ^ orb(b) for a, b in zip(x, y))
243 return struct.pack("!4sH", xor(src, dst), pkt.dport ^ pkt.sport)
244 else:
245 # Uni-directional
246 return src + dst + struct.pack("!HH", pkt.dport, pkt.sport)
248 def _strip_padding(self, pkt: Packet) -> Optional[bytes]:
249 """Strip the packet of any padding, and return the padding.
250 """
251 pad = pkt.getlayer(conf.padding_layer)
252 if pad is not None and pad.underlayer is not None:
253 # strip padding
254 del pad.underlayer.payload
255 return cast(bytes, pad.load)
256 return None
258 def process(self,
259 pkt: Packet,
260 cls: Optional[Type[Packet]] = None) -> Optional[Packet]:
261 """Process each packet: matches the TCP seq/ack numbers
262 to follow the TCP streams, and orders the fragments.
263 """
264 packet = None # type: Optional[Packet]
265 if self.app:
266 # Special mode: Application layer. Use on top of TCP
267 self.data.append(bytes(pkt))
268 if cls is None and not isinstance(pkt, bytes):
269 cls = pkt.__class__
270 if "tcp_reassemble" in self.metadata:
271 tcp_reassemble = self.metadata["tcp_reassemble"]
272 elif cls is not None:
273 self.metadata["tcp_reassemble"] = tcp_reassemble = streamcls(cls)
274 else:
275 return None
276 packet = tcp_reassemble(
277 bytes(self.data),
278 self.metadata,
279 self.session,
280 )
281 if packet:
282 padding = self._strip_padding(packet)
283 if padding:
284 # There is remaining data for the next payload.
285 self.data.shiftleft(len(self.data) - len(padding))
286 else:
287 # No padding (data) left. Clear
288 self.data.clear()
289 self.metadata.clear()
290 return packet
291 return None
293 _pkt = super(TCPSession, self).process(pkt)
294 if _pkt is None:
295 return None
296 else: # Python 3.8 := would be nice
297 pkt = _pkt
299 from scapy.layers.inet import IP, TCP
300 if not pkt:
301 return None
302 if TCP not in pkt:
303 return pkt
304 pay = pkt[TCP].payload
305 if isinstance(pay, (NoPayload, conf.padding_layer)):
306 return pkt
307 new_data = pay.original
308 # Match packets by a unique TCP identifier
309 ident = self._get_ident(pkt)
310 data, metadata = self.tcp_frags[ident]
311 tcp_session = self.tcp_sessions[self._get_ident(pkt, True)]
312 # Handle TCP sequence numbers
313 seq = pkt[TCP].seq
314 if "seq" not in metadata:
315 metadata["seq"] = seq
316 if "next_seq" in metadata and seq < metadata["next_seq"]:
317 # Retransmitted data (that we already returned)
318 new_data = new_data[metadata["next_seq"] - seq:]
319 if not new_data:
320 return None
321 seq = metadata["next_seq"]
322 # Let's guess which class is going to be used
323 if "pay_class" not in metadata:
324 metadata["pay_class"] = pay_class = pkt[TCP].guess_payload_class(new_data)
325 metadata["tcp_reassemble"] = tcp_reassemble = streamcls(pay_class)
326 else:
327 tcp_reassemble = metadata["tcp_reassemble"]
328 # Get a relative sequence number for a storage purpose
329 relative_seq = metadata.get("relative_seq", None)
330 if relative_seq is None:
331 relative_seq = metadata["relative_seq"] = seq - 1
332 seq = seq - relative_seq
333 # Add the data to the buffer
334 data.append(new_data, seq)
335 # Check TCP FIN or TCP RESET
336 if pkt[TCP].flags.F or pkt[TCP].flags.R:
337 metadata["tcp_end"] = True
339 # In case any app layer protocol requires it,
340 # allow the parser to inspect TCP PSH flag
341 if pkt[TCP].flags.P:
342 metadata["tcp_psh"] = True
343 # XXX TODO: check that no empty space is missing in the buffer.
344 # XXX Currently, if a TCP fragment was missing, we won't notice it.
345 if data.full():
346 # Reassemble using all previous packets
347 metadata["original"] = pkt
348 packet = tcp_reassemble(
349 bytes(data),
350 metadata,
351 tcp_session
352 )
353 # Stack the result on top of the previous frames
354 if packet:
355 if "seq" in metadata:
356 pkt[TCP].seq = metadata["seq"]
357 # Clear TCP reassembly metadata
358 metadata.clear()
359 # Check for padding
360 padding = self._strip_padding(packet)
361 if padding:
362 # There is remaining data for the next payload.
363 full_length = data.content_len - len(padding)
364 metadata["relative_seq"] = relative_seq + full_length
365 data.shiftleft(full_length)
366 else:
367 # No padding (data) left. Clear
368 data.clear()
369 del self.tcp_frags[ident]
370 # Minimum next seq
371 metadata["next_seq"] = pkt[TCP].seq + len(new_data)
372 # Rebuild resulting packet
373 pay.underlayer.remove_payload()
374 if IP in pkt:
375 pkt[IP].len = None
376 pkt[IP].chksum = None
377 pkt = pkt / packet
378 pkt.wirelen = None
379 return pkt
380 return None
382 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]:
383 """
384 Will be called by sniff() to ask for a packet
385 """
386 pkt = sock.recv(stop_dissection_after=self.stop_dissection_after)
387 # Now handle TCP reassembly
388 while pkt is not None:
389 pkt = self.process(pkt)
390 if pkt:
391 yield pkt
392 # keep calling process as there might be more
393 pkt = b"" # type: ignore
394 return None