Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/scapy/sessions.py: 18%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

196 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4 

5""" 

6Sessions: decode flow of packets when sniffing 

7""" 

8 

9from collections import defaultdict 

10import socket 

11import struct 

12 

13from scapy.compat import orb 

14from scapy.config import conf 

15from scapy.packet import NoPayload, Packet 

16from scapy.pton_ntop import inet_pton 

17 

18# Typing imports 

19from typing import ( 

20 Any, 

21 Callable, 

22 DefaultDict, 

23 Dict, 

24 Iterator, 

25 List, 

26 Optional, 

27 Tuple, 

28 Type, 

29 cast, 

30 TYPE_CHECKING, 

31) 

32from scapy.compat import Self 

33if TYPE_CHECKING: 

34 from scapy.supersocket import SuperSocket 

35 

36 

37class DefaultSession(object): 

38 """Default session: no stream decoding""" 

39 

40 def __init__(self, supersession: Optional[Self] = None): 

41 if supersession and not isinstance(supersession, DefaultSession): 

42 supersession = supersession() 

43 self.supersession = supersession 

44 

45 def process(self, pkt: Packet) -> Optional[Packet]: 

46 """ 

47 Called to pre-process the packet 

48 """ 

49 # Optionally handle supersession 

50 if self.supersession: 

51 return self.supersession.process(pkt) 

52 return pkt 

53 

54 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]: 

55 """ 

56 Will be called by sniff() to ask for a packet 

57 """ 

58 pkt = sock.recv() 

59 if not pkt: 

60 return 

61 pkt = self.process(pkt) 

62 if pkt: 

63 yield pkt 

64 

65 

66class IPSession(DefaultSession): 

67 """Defragment IP packets 'on-the-flow'. 

68 

69 Usage: 

70 >>> sniff(session=IPSession) 

71 """ 

72 

73 def __init__(self, *args, **kwargs): 

74 # type: (*Any, **Any) -> None 

75 DefaultSession.__init__(self, *args, **kwargs) 

76 self.fragments = defaultdict(list) # type: DefaultDict[Tuple[Any, ...], List[Packet]] # noqa: E501 

77 

78 def process(self, packet: Packet) -> Optional[Packet]: 

79 from scapy.layers.inet import IP, _defrag_ip_pkt 

80 if not packet: 

81 return None 

82 if IP not in packet: 

83 return packet 

84 return _defrag_ip_pkt(packet, self.fragments)[1] # type: ignore 

85 

86 

87class StringBuffer(object): 

88 """StringBuffer is an object used to re-order data received during 

89 a TCP transmission. 

90 

91 Each TCP fragment contains a sequence number, which marks 

92 (relatively to the first sequence number) the index of the data contained 

93 in the fragment. 

94 

95 If a TCP fragment is missed, this class will fill the missing space with 

96 zeros. 

97 """ 

98 

99 def __init__(self): 

100 # type: () -> None 

101 self.content = bytearray(b"") 

102 self.content_len = 0 

103 self.noff = 0 # negative offset 

104 self.incomplete = [] # type: List[Tuple[int, int]] 

105 

106 def append(self, data: bytes, seq: Optional[int] = None) -> None: 

107 data_len = len(data) 

108 if seq is None: 

109 seq = self.content_len 

110 seq = seq - 1 - self.noff 

111 if seq < 0: 

112 # Data is located before the start of the current buffer 

113 # (e.g. the first fragment was missing) 

114 self.content = bytearray(b"\x00" * (-seq)) + self.content 

115 self.content_len += (-seq) 

116 self.noff += seq 

117 seq = 0 

118 if seq + data_len > self.content_len: 

119 # Data is located after the end of the current buffer 

120 self.content += b"\x00" * (seq - self.content_len + data_len) 

121 # As data was missing, mark it. 

122 # self.incomplete.append((self.content_len, seq)) 

123 self.content_len = seq + data_len 

124 assert len(self.content) == self.content_len 

125 # XXX removes empty space marker. 

126 # for ifrag in self.incomplete: 

127 # if [???]: 

128 # self.incomplete.remove([???]) 

129 memoryview(self.content)[seq:seq + data_len] = data 

130 

131 def shiftleft(self, i: int) -> None: 

132 self.content = self.content[i:] 

133 self.content_len -= i 

134 

135 def full(self): 

136 # type: () -> bool 

137 # Should only be true when all missing data was filled up, 

138 # (or there never was missing data) 

139 return True # XXX 

140 

141 def clear(self): 

142 # type: () -> None 

143 self.__init__() # type: ignore 

144 

145 def __bool__(self): 

146 # type: () -> bool 

147 return bool(self.content_len) 

148 __nonzero__ = __bool__ 

149 

150 def __len__(self): 

151 # type: () -> int 

152 return self.content_len 

153 

154 def __bytes__(self): 

155 # type: () -> bytes 

156 return bytes(self.content) 

157 

158 def __str__(self): 

159 # type: () -> str 

160 return cast(str, self.__bytes__()) 

161 

162 

163def streamcls(cls: Type[Packet]) -> Callable[ 

164 [bytes, Dict[str, Any], Dict[str, Any]], 

165 Optional[Packet], 

166]: 

167 """ 

168 Wraps a class for use when dissecting streams. 

169 """ 

170 if hasattr(cls, "tcp_reassemble"): 

171 return cls.tcp_reassemble # type: ignore 

172 else: 

173 # There is no tcp_reassemble. Just dissect the packet 

174 return lambda data, *_: data and cls(data) 

175 

176 

177class TCPSession(IPSession): 

178 """A Session that reconstructs TCP streams. 

179 

180 NOTE: this has the same effect as wrapping a real socket.socket into StreamSocket, 

181 but for all concurrent TCP streams (can be used on pcaps or sniffed sessions). 

182 

183 NOTE: only protocols that implement a ``tcp_reassemble`` function will be processed 

184 by this session. Other protocols will not be reconstructed. 

185 

186 DEV: implement a class-function `tcp_reassemble` in your Packet class:: 

187 

188 @classmethod 

189 def tcp_reassemble(cls, data, metadata, session): 

190 # data = the reassembled data from the same request/flow 

191 # metadata = empty dictionary, that can be used to store data 

192 # during TCP reassembly 

193 # session = a dictionary proper to the bidirectional TCP session, 

194 # that can be used to store anything 

195 [...] 

196 # If the packet is available, return it. Otherwise don't. 

197 # Whenever you return a packet, the buffer will be discarded. 

198 return pkt 

199 # Otherwise, maybe store stuff in metadata, and return None, 

200 # as you need additional data. 

201 return None 

202 

203 For more details and a real example, see: 

204 https://scapy.readthedocs.io/en/latest/usage.html#how-to-use-tcpsession-to-defragment-tcp-packets 

205 

206 :param app: Whether the socket is on application layer = has no TCP 

207 layer. This is identical to StreamSocket so only use this if your 

208 underlying source of data isn't a socket.socket. 

209 """ 

210 

211 def __init__(self, app=False, *args, **kwargs): 

212 # type: (bool, *Any, **Any) -> None 

213 super(TCPSession, self).__init__(*args, **kwargs) 

214 self.app = app 

215 if app: 

216 self.data = StringBuffer() 

217 self.metadata = {} # type: Dict[str, Any] 

218 self.session = {} # type: Dict[str, Any] 

219 else: 

220 # The StringBuffer() is used to build a global 

221 # string from fragments and their seq nulber 

222 self.tcp_frags = defaultdict( 

223 lambda: (StringBuffer(), {}) 

224 ) # type: DefaultDict[bytes, Tuple[StringBuffer, Dict[str, Any]]] 

225 self.tcp_sessions = defaultdict( 

226 dict 

227 ) # type: DefaultDict[bytes, Dict[str, Any]] 

228 # Setup stopping dissection condition 

229 from scapy.layers.inet import TCP 

230 self.stop_dissection_after = TCP 

231 

232 def _get_ident(self, pkt, session=False): 

233 # type: (Packet, bool) -> bytes 

234 underlayer = pkt["TCP"].underlayer 

235 af = socket.AF_INET6 if "IPv6" in pkt else socket.AF_INET 

236 src = underlayer and inet_pton(af, underlayer.src) or b"" 

237 dst = underlayer and inet_pton(af, underlayer.dst) or b"" 

238 if session: 

239 # Bidirectional 

240 def xor(x, y): 

241 # type: (bytes, bytes) -> bytes 

242 return bytes(orb(a) ^ orb(b) for a, b in zip(x, y)) 

243 return struct.pack("!4sH", xor(src, dst), pkt.dport ^ pkt.sport) 

244 else: 

245 # Uni-directional 

246 return src + dst + struct.pack("!HH", pkt.dport, pkt.sport) 

247 

248 def _strip_padding(self, pkt: Packet) -> Optional[bytes]: 

249 """Strip the packet of any padding, and return the padding. 

250 """ 

251 pad = pkt.getlayer(conf.padding_layer) 

252 if pad is not None and pad.underlayer is not None: 

253 # strip padding 

254 del pad.underlayer.payload 

255 return cast(bytes, pad.load) 

256 return None 

257 

258 def process(self, 

259 pkt: Packet, 

260 cls: Optional[Type[Packet]] = None) -> Optional[Packet]: 

261 """Process each packet: matches the TCP seq/ack numbers 

262 to follow the TCP streams, and orders the fragments. 

263 """ 

264 packet = None # type: Optional[Packet] 

265 if self.app: 

266 # Special mode: Application layer. Use on top of TCP 

267 self.data.append(bytes(pkt)) 

268 if cls is None and not isinstance(pkt, bytes): 

269 cls = pkt.__class__ 

270 if "tcp_reassemble" in self.metadata: 

271 tcp_reassemble = self.metadata["tcp_reassemble"] 

272 elif cls is not None: 

273 self.metadata["tcp_reassemble"] = tcp_reassemble = streamcls(cls) 

274 else: 

275 return None 

276 packet = tcp_reassemble( 

277 bytes(self.data), 

278 self.metadata, 

279 self.session, 

280 ) 

281 if packet: 

282 padding = self._strip_padding(packet) 

283 if padding: 

284 # There is remaining data for the next payload. 

285 self.data.shiftleft(len(self.data) - len(padding)) 

286 else: 

287 # No padding (data) left. Clear 

288 self.data.clear() 

289 self.metadata.clear() 

290 return packet 

291 return None 

292 

293 _pkt = super(TCPSession, self).process(pkt) 

294 if _pkt is None: 

295 return None 

296 else: # Python 3.8 := would be nice 

297 pkt = _pkt 

298 

299 from scapy.layers.inet import IP, TCP 

300 if not pkt: 

301 return None 

302 if TCP not in pkt: 

303 return pkt 

304 pay = pkt[TCP].payload 

305 if isinstance(pay, (NoPayload, conf.padding_layer)): 

306 return pkt 

307 new_data = pay.original 

308 # Match packets by a unique TCP identifier 

309 ident = self._get_ident(pkt) 

310 data, metadata = self.tcp_frags[ident] 

311 tcp_session = self.tcp_sessions[self._get_ident(pkt, True)] 

312 # Handle TCP sequence numbers 

313 seq = pkt[TCP].seq 

314 if "seq" not in metadata: 

315 metadata["seq"] = seq 

316 if "next_seq" in metadata and seq < metadata["next_seq"]: 

317 # Retransmitted data (that we already returned) 

318 new_data = new_data[metadata["next_seq"] - seq:] 

319 if not new_data: 

320 return None 

321 seq = metadata["next_seq"] 

322 # Let's guess which class is going to be used 

323 if "pay_class" not in metadata: 

324 metadata["pay_class"] = pay_class = pkt[TCP].guess_payload_class(new_data) 

325 metadata["tcp_reassemble"] = tcp_reassemble = streamcls(pay_class) 

326 else: 

327 tcp_reassemble = metadata["tcp_reassemble"] 

328 # Get a relative sequence number for a storage purpose 

329 relative_seq = metadata.get("relative_seq", None) 

330 if relative_seq is None: 

331 relative_seq = metadata["relative_seq"] = seq - 1 

332 seq = seq - relative_seq 

333 # Add the data to the buffer 

334 data.append(new_data, seq) 

335 # Check TCP FIN or TCP RESET 

336 if pkt[TCP].flags.F or pkt[TCP].flags.R: 

337 metadata["tcp_end"] = True 

338 

339 # In case any app layer protocol requires it, 

340 # allow the parser to inspect TCP PSH flag 

341 if pkt[TCP].flags.P: 

342 metadata["tcp_psh"] = True 

343 # XXX TODO: check that no empty space is missing in the buffer. 

344 # XXX Currently, if a TCP fragment was missing, we won't notice it. 

345 if data.full(): 

346 # Reassemble using all previous packets 

347 metadata["original"] = pkt 

348 packet = tcp_reassemble( 

349 bytes(data), 

350 metadata, 

351 tcp_session 

352 ) 

353 # Stack the result on top of the previous frames 

354 if packet: 

355 if "seq" in metadata: 

356 pkt[TCP].seq = metadata["seq"] 

357 # Clear TCP reassembly metadata 

358 metadata.clear() 

359 # Check for padding 

360 padding = self._strip_padding(packet) 

361 if padding: 

362 # There is remaining data for the next payload. 

363 full_length = data.content_len - len(padding) 

364 metadata["relative_seq"] = relative_seq + full_length 

365 data.shiftleft(full_length) 

366 else: 

367 # No padding (data) left. Clear 

368 data.clear() 

369 del self.tcp_frags[ident] 

370 # Minimum next seq 

371 metadata["next_seq"] = pkt[TCP].seq + len(new_data) 

372 # Rebuild resulting packet 

373 pay.underlayer.remove_payload() 

374 if IP in pkt: 

375 pkt[IP].len = None 

376 pkt[IP].chksum = None 

377 pkt = pkt / packet 

378 pkt.wirelen = None 

379 return pkt 

380 return None 

381 

382 def recv(self, sock: 'SuperSocket') -> Iterator[Packet]: 

383 """ 

384 Will be called by sniff() to ask for a packet 

385 """ 

386 pkt = sock.recv(stop_dissection_after=self.stop_dissection_after) 

387 # Now handle TCP reassembly 

388 while pkt is not None: 

389 pkt = self.process(pkt) 

390 if pkt: 

391 yield pkt 

392 # keep calling process as there might be more 

393 pkt = b"" # type: ignore 

394 return None