Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/sessions.py: 17%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

218 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4 

5""" 

6Sessions: decode flow of packets when sniffing 

7""" 

8 

9from collections import defaultdict 

10import socket 

11import struct 

12 

13from scapy.compat import orb 

14from scapy.config import conf 

15from scapy.packet import Packet 

16from scapy.pton_ntop import inet_pton 

17 

18# Typing imports 

19from typing import ( 

20 Any, 

21 Callable, 

22 DefaultDict, 

23 Dict, 

24 Iterator, 

25 List, 

26 Optional, 

27 Tuple, 

28 Type, 

29 cast, 

30 TYPE_CHECKING, 

31) 

32from scapy.compat import Self 

33if TYPE_CHECKING: 

34 from scapy.supersocket import SuperSocket 

35 

36 

37class DefaultSession(object): 

38 """Default session: no stream decoding""" 

39 

40 def __init__(self, supersession: Optional[Self] = None): 

41 if supersession and not isinstance(supersession, DefaultSession): 

42 supersession = supersession() 

43 self.supersession = supersession 

44 

45 def process(self, pkt: Packet) -> Optional[Packet]: 

46 """ 

47 Called to pre-process the packet 

48 """ 

49 # Optionally handle supersession 

50 if self.supersession: 

51 return self.supersession.process(pkt) 

52 return pkt 

53 

54 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]: 

55 """ 

56 Will be called by sniff() to ask for a packet 

57 """ 

58 pkt = sock.recv() 

59 if not pkt: 

60 return 

61 pkt = self.process(pkt) 

62 if pkt: 

63 yield pkt 

64 

65 

66class IPSession(DefaultSession): 

67 """Defragment IP packets 'on-the-flow'. 

68 

69 Usage: 

70 >>> sniff(session=IPSession) 

71 """ 

72 

73 def __init__(self, *args, **kwargs): 

74 # type: (*Any, **Any) -> None 

75 DefaultSession.__init__(self, *args, **kwargs) 

76 self.fragments = defaultdict(list) # type: DefaultDict[Tuple[Any, ...], List[Packet]] # noqa: E501 

77 

78 def process(self, packet: Packet) -> Optional[Packet]: 

79 from scapy.layers.inet import IP, _defrag_ip_pkt 

80 if not packet: 

81 return None 

82 if IP not in packet: 

83 return packet 

84 return _defrag_ip_pkt(packet, self.fragments)[1] # type: ignore 

85 

86 

87class StringBuffer(object): 

88 """StringBuffer is an object used to re-order data received during 

89 a TCP transmission. 

90 

91 Each TCP fragment contains a sequence number, which marks 

92 (relatively to the first sequence number) the index of the data contained 

93 in the fragment. 

94 

95 If a TCP fragment is missed, this class will fill the missing space with 

96 zeros. 

97 """ 

98 

99 def __init__(self): 

100 # type: () -> None 

101 self.content = bytearray(b"") 

102 self.content_len = 0 

103 self.noff = 0 # negative offset 

104 self.incomplete = [] # type: List[Tuple[int, int]] 

105 

106 def append(self, data: bytes, seq: Optional[int] = None) -> None: 

107 if not data: 

108 return 

109 data_len = len(data) 

110 if seq is None: 

111 seq = self.content_len 

112 seq = seq - 1 - self.noff 

113 if seq < 0: 

114 # Data is located before the start of the current buffer 

115 # (e.g. the first fragment was missing) 

116 self.content = bytearray(b"\x00" * (-seq)) + self.content 

117 self.content_len += (-seq) 

118 self.noff += seq 

119 seq = 0 

120 if seq + data_len > self.content_len: 

121 # Data is located after the end of the current buffer 

122 self.content += b"\x00" * (seq - self.content_len + data_len) 

123 # As data was missing, mark it. 

124 # self.incomplete.append((self.content_len, seq)) 

125 self.content_len = seq + data_len 

126 assert len(self.content) == self.content_len 

127 # XXX removes empty space marker. 

128 # for ifrag in self.incomplete: 

129 # if [???]: 

130 # self.incomplete.remove([???]) 

131 memoryview(self.content)[seq:seq + data_len] = data 

132 

133 def shiftleft(self, i: int) -> None: 

134 self.content = self.content[i:] 

135 self.content_len -= i 

136 

137 def full(self): 

138 # type: () -> bool 

139 # Should only be true when all missing data was filled up, 

140 # (or there never was missing data) 

141 return bool(self) 

142 

143 def clear(self): 

144 # type: () -> None 

145 self.__init__() # type: ignore 

146 

147 def __bool__(self): 

148 # type: () -> bool 

149 return bool(self.content_len) 

150 __nonzero__ = __bool__ 

151 

152 def __len__(self): 

153 # type: () -> int 

154 return self.content_len 

155 

156 def __bytes__(self): 

157 # type: () -> bytes 

158 return bytes(self.content) 

159 

160 def __str__(self): 

161 # type: () -> str 

162 return cast(str, self.__bytes__()) 

163 

164 

165def streamcls(cls: Type[Packet]) -> Callable[ 

166 [bytes, Dict[str, Any], Dict[str, Any]], 

167 Optional[Packet], 

168]: 

169 """ 

170 Wraps a class for use when dissecting streams. 

171 """ 

172 if hasattr(cls, "tcp_reassemble"): 

173 return cls.tcp_reassemble # type: ignore 

174 else: 

175 # There is no tcp_reassemble. Just dissect the packet 

176 return lambda data, *_: data and cls(data) 

177 

178 

179class TCPSession(IPSession): 

180 """A Session that reconstructs TCP streams. 

181 

182 NOTE: this has the same effect as wrapping a real socket.socket into StreamSocket, 

183 but for all concurrent TCP streams (can be used on pcaps or sniffed sessions). 

184 

185 NOTE: only protocols that implement a ``tcp_reassemble`` function will be processed 

186 by this session. Other protocols will not be reconstructed. 

187 

188 DEV: implement a class-function `tcp_reassemble` in your Packet class:: 

189 

190 @classmethod 

191 def tcp_reassemble(cls, data, metadata, session): 

192 # data = the reassembled data from the same request/flow 

193 # metadata = empty dictionary, that can be used to store data 

194 # during TCP reassembly 

195 # session = a dictionary proper to the bidirectional TCP session, 

196 # that can be used to store anything 

197 [...] 

198 # If the packet is available, return it. Otherwise don't. 

199 # Whenever you return a packet, the buffer will be discarded. 

200 return pkt 

201 # Otherwise, maybe store stuff in metadata, and return None, 

202 # as you need additional data. 

203 return None 

204 

205 For more details and a real example, see: 

206 https://scapy.readthedocs.io/en/latest/usage.html#how-to-use-tcpsession-to-defragment-tcp-packets 

207 

208 :param app: Whether the socket is on application layer = has no TCP 

209 layer. This is identical to StreamSocket so only use this if your 

210 underlying source of data isn't a socket.socket. 

211 """ 

212 

213 def __init__(self, app=False, *args, **kwargs): 

214 # type: (bool, *Any, **Any) -> None 

215 super(TCPSession, self).__init__(*args, **kwargs) 

216 self.app = app 

217 if app: 

218 self.data = StringBuffer() 

219 self.metadata = {} # type: Dict[str, Any] 

220 self.session = {} # type: Dict[str, Any] 

221 else: 

222 # The StringBuffer() is used to build a global 

223 # string from fragments and their seq nulber 

224 self.tcp_frags = defaultdict( 

225 lambda: (StringBuffer(), {}) 

226 ) # type: DefaultDict[bytes, Tuple[StringBuffer, Dict[str, Any]]] 

227 self.tcp_sessions = defaultdict( 

228 dict 

229 ) # type: DefaultDict[bytes, Dict[str, Any]] 

230 # Setup stopping dissection condition 

231 from scapy.layers.inet import TCP 

232 self.stop_dissection_after = TCP 

233 

234 def _get_ident(self, pkt, session=False): 

235 # type: (Packet, bool) -> bytes 

236 underlayer = pkt["TCP"].underlayer 

237 af = socket.AF_INET6 if "IPv6" in pkt else socket.AF_INET 

238 src = underlayer and inet_pton(af, underlayer.src) or b"" 

239 dst = underlayer and inet_pton(af, underlayer.dst) or b"" 

240 if session: 

241 # Bidirectional 

242 def xor(x, y): 

243 # type: (bytes, bytes) -> bytes 

244 return bytes(orb(a) ^ orb(b) for a, b in zip(x, y)) 

245 return struct.pack("!4sH", xor(src, dst), pkt.dport ^ pkt.sport) 

246 else: 

247 # Uni-directional 

248 return src + dst + struct.pack("!HH", pkt.dport, pkt.sport) 

249 

250 def _strip_padding(self, pkt: Packet) -> Optional[bytes]: 

251 """Strip the packet of any padding, and return the padding. 

252 """ 

253 if isinstance(pkt, conf.padding_layer): 

254 return cast(bytes, pkt.load) 

255 pad = pkt.getlayer(conf.padding_layer) 

256 if pad is not None and pad.underlayer is not None: 

257 # strip padding 

258 del pad.underlayer.payload 

259 return cast(bytes, pad.load) 

260 return None 

261 

262 def process(self, 

263 pkt: Packet, 

264 cls: Optional[Type[Packet]] = None) -> Optional[Packet]: 

265 """Process each packet: matches the TCP seq/ack numbers 

266 to follow the TCP streams, and orders the fragments. 

267 """ 

268 packet = None # type: Optional[Packet] 

269 if self.app: 

270 # Special mode: Application layer. Use on top of TCP 

271 self.data.append(bytes(pkt)) 

272 if cls is None and not isinstance(pkt, bytes): 

273 cls = pkt.__class__ 

274 if "tcp_reassemble" in self.metadata: 

275 tcp_reassemble = self.metadata["tcp_reassemble"] 

276 elif cls is not None: 

277 self.metadata["tcp_reassemble"] = tcp_reassemble = streamcls(cls) 

278 else: 

279 return None 

280 if self.data.full(): 

281 packet = tcp_reassemble( 

282 bytes(self.data), 

283 self.metadata, 

284 self.session, 

285 ) 

286 if packet: 

287 padding = self._strip_padding(packet) 

288 if padding: 

289 # There is remaining data for the next payload. 

290 self.data.shiftleft(len(self.data) - len(padding)) 

291 # Skip full-padding 

292 if isinstance(packet, conf.padding_layer): 

293 return None 

294 else: 

295 # No padding (data) left. Clear 

296 self.data.clear() 

297 self.metadata.clear() 

298 return packet 

299 return None 

300 

301 _pkt = super(TCPSession, self).process(pkt) 

302 if _pkt is None: 

303 return None 

304 else: # Python 3.8 := would be nice 

305 pkt = _pkt 

306 

307 from scapy.layers.inet import IP, TCP 

308 if not pkt: 

309 return None 

310 if TCP not in pkt: 

311 return pkt 

312 pay = pkt[TCP].payload 

313 new_data = pay.original 

314 # Match packets by a unique TCP identifier 

315 ident = self._get_ident(pkt) 

316 data, metadata = self.tcp_frags[ident] 

317 tcp_session = self.tcp_sessions[self._get_ident(pkt, True)] 

318 # Handle TCP sequence numbers 

319 seq = pkt[TCP].seq 

320 if "seq" not in metadata: 

321 metadata["seq"] = seq 

322 if "next_seq" in metadata and seq < metadata["next_seq"]: 

323 # Retransmitted data (that we already returned) 

324 new_data = new_data[metadata["next_seq"] - seq:] 

325 if not new_data: 

326 return None 

327 seq = metadata["next_seq"] 

328 # Let's guess which class is going to be used 

329 if "pay_class" not in metadata: 

330 metadata["pay_class"] = pay_class = pkt[TCP].guess_payload_class(new_data) 

331 metadata["tcp_reassemble"] = tcp_reassemble = streamcls(pay_class) 

332 else: 

333 tcp_reassemble = metadata["tcp_reassemble"] 

334 

335 if pay: 

336 # Get a relative sequence number for a storage purpose 

337 relative_seq = metadata.get("relative_seq", None) 

338 if relative_seq is None: 

339 relative_seq = metadata["relative_seq"] = seq - 1 

340 seq = seq - relative_seq 

341 # Add the data to the buffer 

342 data.append(new_data, seq) 

343 

344 # Check TCP FIN or TCP RESET 

345 if pkt[TCP].flags.F or pkt[TCP].flags.R: 

346 metadata["tcp_end"] = True 

347 elif not pay: 

348 # If there's no payload and the stream isn't ending, ignore. 

349 return pkt 

350 

351 # In case any app layer protocol requires it, 

352 # allow the parser to inspect TCP PSH flag 

353 if pkt[TCP].flags.P: 

354 metadata["tcp_psh"] = True 

355 # XXX TODO: check that no empty space is missing in the buffer. 

356 # XXX Currently, if a TCP fragment was missing, we won't notice it. 

357 if data.full(): 

358 # Reassemble using all previous packets 

359 metadata["original"] = pkt 

360 metadata["ident"] = ident 

361 packet = tcp_reassemble( 

362 bytes(data), 

363 metadata, 

364 tcp_session 

365 ) 

366 # Stack the result on top of the previous frames 

367 if packet: 

368 if "seq" in metadata: 

369 pkt[TCP].seq = metadata["seq"] 

370 # Clear TCP reassembly metadata 

371 metadata.clear() 

372 # Check for padding 

373 padding = self._strip_padding(packet) 

374 while padding: 

375 # There is remaining data for the next payload. 

376 full_length = data.content_len - len(padding) 

377 metadata["relative_seq"] = relative_seq + full_length 

378 data.shiftleft(full_length) 

379 # There might be a sub-payload hidden in the padding 

380 sub_packet = tcp_reassemble( 

381 bytes(data), 

382 metadata, 

383 tcp_session 

384 ) 

385 if sub_packet: 

386 packet /= sub_packet 

387 padding = self._strip_padding(sub_packet) 

388 else: 

389 break 

390 else: 

391 # No padding (data) left. Clear 

392 data.clear() 

393 del self.tcp_frags[ident] 

394 # Minimum next seq 

395 metadata["next_seq"] = pkt[TCP].seq + len(new_data) 

396 # Skip full-padding 

397 if isinstance(packet, conf.padding_layer): 

398 return None 

399 # Rebuild resulting packet 

400 if pay: 

401 pay.underlayer.remove_payload() 

402 if IP in pkt: 

403 pkt[IP].len = None 

404 pkt[IP].chksum = None 

405 pkt = pkt / packet 

406 pkt.wirelen = None 

407 return pkt 

408 return None 

409 

410 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]: 

411 """ 

412 Will be called by sniff() to ask for a packet 

413 """ 

414 pkt = sock.recv(stop_dissection_after=self.stop_dissection_after) 

415 # Now handle TCP reassembly 

416 if self.app: 

417 while pkt is not None: 

418 pkt = self.process(pkt) 

419 if pkt: 

420 yield pkt 

421 # keep calling process as there might be more 

422 pkt = b"" # type: ignore 

423 else: 

424 pkt = self.process(pkt) # type: ignore 

425 if pkt: 

426 yield pkt 

427 return None