Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/sessions.py: 17%
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
Shortcuts on this page
r m x toggle line displays
j k next/prev highlighted chunk
0 (zero) top of page
1 (one) first highlighted chunk
1# SPDX-License-Identifier: GPL-2.0-only
2# This file is part of Scapy
3# See https://scapy.net/ for more information
5"""
6Sessions: decode flow of packets when sniffing
7"""
9from collections import defaultdict
10import socket
11import struct
13from scapy.compat import orb
14from scapy.config import conf
15from scapy.packet import Packet
16from scapy.pton_ntop import inet_pton
18# Typing imports
19from typing import (
20 Any,
21 Callable,
22 DefaultDict,
23 Dict,
24 Iterator,
25 List,
26 Optional,
27 Tuple,
28 Type,
29 cast,
30 TYPE_CHECKING,
31)
32from scapy.compat import Self
33if TYPE_CHECKING:
34 from scapy.supersocket import SuperSocket
37class DefaultSession(object):
38 """Default session: no stream decoding"""
40 def __init__(self, supersession: Optional[Self] = None):
41 if supersession and not isinstance(supersession, DefaultSession):
42 supersession = supersession()
43 self.supersession = supersession
45 def process(self, pkt: Packet) -> Optional[Packet]:
46 """
47 Called to pre-process the packet
48 """
49 # Optionally handle supersession
50 if self.supersession:
51 return self.supersession.process(pkt)
52 return pkt
54 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]:
55 """
56 Will be called by sniff() to ask for a packet
57 """
58 pkt = sock.recv()
59 if not pkt:
60 return
61 pkt = self.process(pkt)
62 if pkt:
63 yield pkt
66class IPSession(DefaultSession):
67 """Defragment IP packets 'on-the-flow'.
69 Usage:
70 >>> sniff(session=IPSession)
71 """
73 def __init__(self, *args, **kwargs):
74 # type: (*Any, **Any) -> None
75 DefaultSession.__init__(self, *args, **kwargs)
76 self.fragments = defaultdict(list) # type: DefaultDict[Tuple[Any, ...], List[Packet]] # noqa: E501
78 def process(self, packet: Packet) -> Optional[Packet]:
79 from scapy.layers.inet import IP, _defrag_ip_pkt
80 if not packet:
81 return None
82 if IP not in packet:
83 return packet
84 return _defrag_ip_pkt(packet, self.fragments)[1] # type: ignore
87class StringBuffer(object):
88 """StringBuffer is an object used to re-order data received during
89 a TCP transmission.
91 Each TCP fragment contains a sequence number, which marks
92 (relatively to the first sequence number) the index of the data contained
93 in the fragment.
95 If a TCP fragment is missed, this class will fill the missing space with
96 zeros.
97 """
99 def __init__(self):
100 # type: () -> None
101 self.content = bytearray(b"")
102 self.content_len = 0
103 self.noff = 0 # negative offset
104 self.incomplete = [] # type: List[Tuple[int, int]]
106 def append(self, data: bytes, seq: Optional[int] = None) -> None:
107 if not data:
108 return
109 data_len = len(data)
110 if seq is None:
111 seq = self.content_len
112 seq = seq - 1 - self.noff
113 if seq < 0:
114 # Data is located before the start of the current buffer
115 # (e.g. the first fragment was missing)
116 self.content = bytearray(b"\x00" * (-seq)) + self.content
117 self.content_len += (-seq)
118 self.noff += seq
119 seq = 0
120 if seq + data_len > self.content_len:
121 # Data is located after the end of the current buffer
122 self.content += b"\x00" * (seq - self.content_len + data_len)
123 # As data was missing, mark it.
124 # self.incomplete.append((self.content_len, seq))
125 self.content_len = seq + data_len
126 assert len(self.content) == self.content_len
127 # XXX removes empty space marker.
128 # for ifrag in self.incomplete:
129 # if [???]:
130 # self.incomplete.remove([???])
131 memoryview(self.content)[seq:seq + data_len] = data
133 def shiftleft(self, i: int) -> None:
134 self.content = self.content[i:]
135 self.content_len -= i
137 def full(self):
138 # type: () -> bool
139 # Should only be true when all missing data was filled up,
140 # (or there never was missing data)
141 return bool(self)
143 def clear(self):
144 # type: () -> None
145 self.__init__() # type: ignore
147 def __bool__(self):
148 # type: () -> bool
149 return bool(self.content_len)
150 __nonzero__ = __bool__
152 def __len__(self):
153 # type: () -> int
154 return self.content_len
156 def __bytes__(self):
157 # type: () -> bytes
158 return bytes(self.content)
160 def __str__(self):
161 # type: () -> str
162 return cast(str, self.__bytes__())
165def streamcls(cls: Type[Packet]) -> Callable[
166 [bytes, Dict[str, Any], Dict[str, Any]],
167 Optional[Packet],
168]:
169 """
170 Wraps a class for use when dissecting streams.
171 """
172 if hasattr(cls, "tcp_reassemble"):
173 return cls.tcp_reassemble # type: ignore
174 else:
175 # There is no tcp_reassemble. Just dissect the packet
176 return lambda data, *_: data and cls(data)
179class TCPSession(IPSession):
180 """A Session that reconstructs TCP streams.
182 NOTE: this has the same effect as wrapping a real socket.socket into StreamSocket,
183 but for all concurrent TCP streams (can be used on pcaps or sniffed sessions).
185 NOTE: only protocols that implement a ``tcp_reassemble`` function will be processed
186 by this session. Other protocols will not be reconstructed.
188 DEV: implement a class-function `tcp_reassemble` in your Packet class::
190 @classmethod
191 def tcp_reassemble(cls, data, metadata, session):
192 # data = the reassembled data from the same request/flow
193 # metadata = empty dictionary, that can be used to store data
194 # during TCP reassembly
195 # session = a dictionary proper to the bidirectional TCP session,
196 # that can be used to store anything
197 [...]
198 # If the packet is available, return it. Otherwise don't.
199 # Whenever you return a packet, the buffer will be discarded.
200 return pkt
201 # Otherwise, maybe store stuff in metadata, and return None,
202 # as you need additional data.
203 return None
205 For more details and a real example, see:
206 https://scapy.readthedocs.io/en/latest/usage.html#how-to-use-tcpsession-to-defragment-tcp-packets
208 :param app: Whether the socket is on application layer = has no TCP
209 layer. This is identical to StreamSocket so only use this if your
210 underlying source of data isn't a socket.socket.
211 """
213 def __init__(self, app=False, *args, **kwargs):
214 # type: (bool, *Any, **Any) -> None
215 super(TCPSession, self).__init__(*args, **kwargs)
216 self.app = app
217 if app:
218 self.data = StringBuffer()
219 self.metadata = {} # type: Dict[str, Any]
220 self.session = {} # type: Dict[str, Any]
221 else:
222 # The StringBuffer() is used to build a global
223 # string from fragments and their seq nulber
224 self.tcp_frags = defaultdict(
225 lambda: (StringBuffer(), {})
226 ) # type: DefaultDict[bytes, Tuple[StringBuffer, Dict[str, Any]]]
227 self.tcp_sessions = defaultdict(
228 dict
229 ) # type: DefaultDict[bytes, Dict[str, Any]]
230 # Setup stopping dissection condition
231 from scapy.layers.inet import TCP
232 self.stop_dissection_after = TCP
234 def _get_ident(self, pkt, session=False):
235 # type: (Packet, bool) -> bytes
236 underlayer = pkt["TCP"].underlayer
237 af = socket.AF_INET6 if "IPv6" in pkt else socket.AF_INET
238 src = underlayer and inet_pton(af, underlayer.src) or b""
239 dst = underlayer and inet_pton(af, underlayer.dst) or b""
240 if session:
241 # Bidirectional
242 def xor(x, y):
243 # type: (bytes, bytes) -> bytes
244 return bytes(orb(a) ^ orb(b) for a, b in zip(x, y))
245 return struct.pack("!4sH", xor(src, dst), pkt.dport ^ pkt.sport)
246 else:
247 # Uni-directional
248 return src + dst + struct.pack("!HH", pkt.dport, pkt.sport)
250 def _strip_padding(self, pkt: Packet) -> Optional[bytes]:
251 """Strip the packet of any padding, and return the padding.
252 """
253 if isinstance(pkt, conf.padding_layer):
254 return cast(bytes, pkt.load)
255 pad = pkt.getlayer(conf.padding_layer)
256 if pad is not None and pad.underlayer is not None:
257 # strip padding
258 del pad.underlayer.payload
259 return cast(bytes, pad.load)
260 return None
262 def process(self,
263 pkt: Packet,
264 cls: Optional[Type[Packet]] = None) -> Optional[Packet]:
265 """Process each packet: matches the TCP seq/ack numbers
266 to follow the TCP streams, and orders the fragments.
267 """
268 packet = None # type: Optional[Packet]
269 if self.app:
270 # Special mode: Application layer. Use on top of TCP
271 self.data.append(bytes(pkt))
272 if cls is None and not isinstance(pkt, bytes):
273 cls = pkt.__class__
274 if "tcp_reassemble" in self.metadata:
275 tcp_reassemble = self.metadata["tcp_reassemble"]
276 elif cls is not None:
277 self.metadata["tcp_reassemble"] = tcp_reassemble = streamcls(cls)
278 else:
279 return None
280 if self.data.full():
281 packet = tcp_reassemble(
282 bytes(self.data),
283 self.metadata,
284 self.session,
285 )
286 if packet:
287 padding = self._strip_padding(packet)
288 if padding:
289 # There is remaining data for the next payload.
290 self.data.shiftleft(len(self.data) - len(padding))
291 # Skip full-padding
292 if isinstance(packet, conf.padding_layer):
293 return None
294 else:
295 # No padding (data) left. Clear
296 self.data.clear()
297 self.metadata.clear()
298 return packet
299 return None
301 _pkt = super(TCPSession, self).process(pkt)
302 if _pkt is None:
303 return None
304 else: # Python 3.8 := would be nice
305 pkt = _pkt
307 from scapy.layers.inet import IP, TCP
308 if not pkt:
309 return None
310 if TCP not in pkt:
311 return pkt
312 pay = pkt[TCP].payload
313 new_data = pay.original
314 # Match packets by a unique TCP identifier
315 ident = self._get_ident(pkt)
316 data, metadata = self.tcp_frags[ident]
317 tcp_session = self.tcp_sessions[self._get_ident(pkt, True)]
318 # Handle TCP sequence numbers
319 seq = pkt[TCP].seq
320 if "seq" not in metadata:
321 metadata["seq"] = seq
322 if "next_seq" in metadata and seq < metadata["next_seq"]:
323 # Retransmitted data (that we already returned)
324 new_data = new_data[metadata["next_seq"] - seq:]
325 if not new_data:
326 return None
327 seq = metadata["next_seq"]
328 # Let's guess which class is going to be used
329 if "pay_class" not in metadata:
330 metadata["pay_class"] = pay_class = pkt[TCP].guess_payload_class(new_data)
331 metadata["tcp_reassemble"] = tcp_reassemble = streamcls(pay_class)
332 else:
333 tcp_reassemble = metadata["tcp_reassemble"]
335 if pay:
336 # Get a relative sequence number for a storage purpose
337 relative_seq = metadata.get("relative_seq", None)
338 if relative_seq is None:
339 relative_seq = metadata["relative_seq"] = seq - 1
340 seq = seq - relative_seq
341 # Add the data to the buffer
342 data.append(new_data, seq)
344 # Check TCP FIN or TCP RESET
345 if pkt[TCP].flags.F or pkt[TCP].flags.R:
346 metadata["tcp_end"] = True
347 elif not pay:
348 # If there's no payload and the stream isn't ending, ignore.
349 return pkt
351 # In case any app layer protocol requires it,
352 # allow the parser to inspect TCP PSH flag
353 if pkt[TCP].flags.P:
354 metadata["tcp_psh"] = True
355 # XXX TODO: check that no empty space is missing in the buffer.
356 # XXX Currently, if a TCP fragment was missing, we won't notice it.
357 if data.full():
358 # Reassemble using all previous packets
359 metadata["original"] = pkt
360 metadata["ident"] = ident
361 packet = tcp_reassemble(
362 bytes(data),
363 metadata,
364 tcp_session
365 )
366 # Stack the result on top of the previous frames
367 if packet:
368 if "seq" in metadata:
369 pkt[TCP].seq = metadata["seq"]
370 # Clear TCP reassembly metadata
371 metadata.clear()
372 # Check for padding
373 padding = self._strip_padding(packet)
374 while padding:
375 # There is remaining data for the next payload.
376 full_length = data.content_len - len(padding)
377 metadata["relative_seq"] = relative_seq + full_length
378 data.shiftleft(full_length)
379 # There might be a sub-payload hidden in the padding
380 sub_packet = tcp_reassemble(
381 bytes(data),
382 metadata,
383 tcp_session
384 )
385 if sub_packet:
386 packet /= sub_packet
387 padding = self._strip_padding(sub_packet)
388 else:
389 break
390 else:
391 # No padding (data) left. Clear
392 data.clear()
393 del self.tcp_frags[ident]
394 # Minimum next seq
395 metadata["next_seq"] = pkt[TCP].seq + len(new_data)
396 # Skip full-padding
397 if isinstance(packet, conf.padding_layer):
398 return None
399 # Rebuild resulting packet
400 if pay:
401 pay.underlayer.remove_payload()
402 if IP in pkt:
403 pkt[IP].len = None
404 pkt[IP].chksum = None
405 pkt = pkt / packet
406 pkt.wirelen = None
407 return pkt
408 return None
410 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]:
411 """
412 Will be called by sniff() to ask for a packet
413 """
414 pkt = sock.recv(stop_dissection_after=self.stop_dissection_after)
415 # Now handle TCP reassembly
416 if self.app:
417 while pkt is not None:
418 pkt = self.process(pkt)
419 if pkt:
420 yield pkt
421 # keep calling process as there might be more
422 pkt = b"" # type: ignore
423 else:
424 pkt = self.process(pkt) # type: ignore
425 if pkt:
426 yield pkt
427 return None