Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/meter.py: 13%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

171 statements  

1""" 

2------------------------------------------------------------------------------------------------------------------------ 

3meter.py 

4Copyright (C) 2019-22 - NFStream Developers 

5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/). 

6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public 

7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later 

8version. 

9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 

10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 

11You should have received a copy of the GNU Lesser General Public License along with NFStream. 

12If not, see <http://www.gnu.org/licenses/>. 

13------------------------------------------------------------------------------------------------------------------------ 

14""" 

15 

16from .engine import create_engine, setup_capture, setup_dissector, activate_capture 

17from .utils import set_affinity, InternalError, InternalState, NFEvent, NFMode 

18from collections import OrderedDict 

19from .flow import NFlow 

20 

21 

22ENGINE_LOAD_ERR = "Error when loading engine library. This means that you are probably building nfstream from source \ 

23and something went wrong during the engine compilation step. Please see: \ 

24https://www.nfstream.org/docs/#building-nfstream-from-sourcesfor more information" 

25 

26NPCAP_LOAD_ERR = "Error finding npcap library. Please make sure you npcap is installed on your system." 

27 

28NDPI_LOAD_ERR = "Error while loading Dissector. This means that you are building nfstream with an out of sync nDPI." 

29 

30FLOW_KEY = "{}:{}:{}:{}:{}:{}:{}:{}:{}" 

31 

32 

33class NFCache(OrderedDict): 

34 """LRU Flow Cache 

35 The NFCache object is used to cache flows entries such as MRU entries are kept on the end and LRU entries 

36 will be at the start. Note that we use OrderedDict which leverages classical python dict combined with a doubly 

37 linked list with sentinel nodes to track order. 

38 By doing so, we can access in an efficient way idle connections entries that need to expired based on a timeout. 

39 """ 

40 

41 def __init__(self, *args, **kwds): 

42 super().__init__(*args, **kwds) 

43 

44 def __getitem__(self, key): 

45 return super().__getitem__(key) 

46 

47 def __setitem__(self, key, value): 

48 super().__setitem__(key, value) 

49 self.move_to_end(key) # now this item is the most recently updated 

50 

51 def __eq__(self, other): 

52 return super().__eq__(other) 

53 

54 def get_lru_key(self): 

55 return next(iter(self)) 

56 

57 

58def meter_scan( 

59 meter_tick, 

60 cache, 

61 idle_timeout, 

62 channel, 

63 udps, 

64 sync, 

65 n_dissections, 

66 statistics, 

67 splt, 

68 ffi, 

69 lib, 

70 dissector, 

71): 

72 """Checks flow cache for expired flow. 

73 

74 Expired flows are identified, added to channel and then removed from the cache. 

75 """ 

76 remaining = True # We suppose that there is something to expire 

77 scanned = 0 

78 while ( 

79 remaining and scanned < 1000 

80 ): # idle scan budget (each 10ms we scan 1000 as maximum) 

81 try: 

82 flow_key = cache.get_lru_key() # will return the LRU flow key. 

83 flow = cache[flow_key] 

84 if flow.is_idle(meter_tick, idle_timeout): # idle, expire it. 

85 channel.put( 

86 flow.expire( 

87 udps, sync, n_dissections, statistics, splt, ffi, lib, dissector 

88 ) 

89 ) 

90 del cache[flow_key] 

91 del flow 

92 scanned += 1 

93 else: 

94 remaining = False # LRU flow is not yet idle. 

95 except StopIteration: # Empty cache 

96 remaining = False 

97 return scanned 

98 

99 

100def get_flow_key(src_ip, src_port, dst_ip, dst_port, protocol, vlan_id, tunnel_id): 

101 """Create a consistent direction agnostic flow key""" 

102 if src_ip[1] < dst_ip[1] or ((src_ip[1] == dst_ip[1]) and (src_ip[0] < dst_ip[0])): 

103 key = ( 

104 src_ip[0], 

105 src_ip[1], 

106 src_port, 

107 dst_ip[0], 

108 dst_ip[1], 

109 dst_port, 

110 protocol, 

111 vlan_id, 

112 tunnel_id, 

113 ) 

114 else: 

115 if src_ip[0] == dst_ip[0] and src_ip[1] == dst_ip[1]: 

116 if src_port <= dst_port: 

117 key = ( 

118 src_ip[0], 

119 src_ip[1], 

120 src_port, 

121 dst_ip[0], 

122 dst_ip[1], 

123 dst_port, 

124 protocol, 

125 vlan_id, 

126 tunnel_id, 

127 ) 

128 else: 

129 key = ( 

130 dst_ip[0], 

131 dst_ip[1], 

132 dst_port, 

133 src_ip[0], 

134 src_ip[1], 

135 src_port, 

136 protocol, 

137 vlan_id, 

138 tunnel_id, 

139 ) 

140 else: 

141 key = ( 

142 dst_ip[0], 

143 dst_ip[1], 

144 dst_port, 

145 src_ip[0], 

146 src_ip[1], 

147 src_port, 

148 protocol, 

149 vlan_id, 

150 tunnel_id, 

151 ) 

152 return key 

153 

154 

155def get_flow_key_from_pkt(packet): 

156 """Create flow key from packet information (7-tuple) 

157 

158 A flow key uniquely determines a flow using source ip, 

159 destination ip, source port, destination port, TCP/UDP protocol, VLAN ID 

160 and tunnel ID of the packets. 

161 """ 

162 return get_flow_key( 

163 packet.src_ip, 

164 packet.src_port, 

165 packet.dst_ip, 

166 packet.dst_port, 

167 packet.protocol, 

168 packet.vlan_id, 

169 packet.tunnel_id, 

170 ) 

171 

172 

173def consume( 

174 packet, 

175 cache, 

176 active_timeout, 

177 idle_timeout, 

178 channel, 

179 ffi, 

180 lib, 

181 udps, 

182 sync, 

183 accounting_mode, 

184 n_dissections, 

185 statistics, 

186 splt, 

187 dissector, 

188 decode_tunnels, 

189 system_visibility_mode, 

190): 

191 """consume a packet and produce flow""" 

192 # We maintain state for active flows computation 1 for creation, 0 for update/cut, -1 for custom expire 

193 flow_key = get_flow_key_from_pkt(packet) 

194 try: # update flow 

195 flow = cache[flow_key].update( 

196 packet, 

197 idle_timeout, 

198 active_timeout, 

199 ffi, 

200 lib, 

201 udps, 

202 sync, 

203 accounting_mode, 

204 n_dissections, 

205 statistics, 

206 splt, 

207 dissector, 

208 ) 

209 if flow is not None: 

210 if flow.expiration_id < 0: # custom expiration 

211 channel.put(flow) 

212 del cache[flow_key] 

213 del flow 

214 state = -1 

215 else: # active/inactive expiration 

216 channel.put(flow) 

217 del cache[flow_key] 

218 del flow 

219 try: 

220 cache[flow_key] = NFlow( 

221 packet, 

222 ffi, 

223 lib, 

224 udps, 

225 sync, 

226 accounting_mode, 

227 n_dissections, 

228 statistics, 

229 splt, 

230 dissector, 

231 decode_tunnels, 

232 system_visibility_mode, 

233 ) 

234 if ( 

235 cache[flow_key].expiration_id == -1 

236 ): # A user Plugin forced expiration on the first packet 

237 channel.put( 

238 cache[flow_key].expire( 

239 udps, 

240 sync, 

241 n_dissections, 

242 statistics, 

243 splt, 

244 ffi, 

245 lib, 

246 dissector, 

247 ) 

248 ) 

249 del cache[flow_key] 

250 state = 0 

251 except OSError: 

252 print( 

253 "WARNING: Failed to allocate memory space for flow creation. Flow creation aborted." 

254 ) 

255 state = 0 

256 else: 

257 state = 0 

258 except KeyError: # create flow 

259 try: 

260 if sync: 

261 flow = NFlow( 

262 packet, 

263 ffi, 

264 lib, 

265 udps, 

266 sync, 

267 accounting_mode, 

268 n_dissections, 

269 statistics, 

270 splt, 

271 dissector, 

272 decode_tunnels, 

273 system_visibility_mode, 

274 ) 

275 if ( 

276 flow.expiration_id == -1 

277 ): # A user Plugin forced expiration on the first packet 

278 channel.put( 

279 flow.expire( 

280 udps, 

281 sync, 

282 n_dissections, 

283 statistics, 

284 splt, 

285 ffi, 

286 lib, 

287 dissector, 

288 ) 

289 ) 

290 del flow 

291 state = 0 

292 else: 

293 cache[flow_key] = flow 

294 state = 1 

295 else: 

296 cache[flow_key] = NFlow( 

297 packet, 

298 ffi, 

299 lib, 

300 udps, 

301 sync, 

302 accounting_mode, 

303 n_dissections, 

304 statistics, 

305 splt, 

306 dissector, 

307 decode_tunnels, 

308 system_visibility_mode, 

309 ) 

310 state = 1 

311 except OSError: 

312 print( 

313 "WARNING: Failed to allocate memory space for flow creation. Flow creation aborted." 

314 ) 

315 state = 0 

316 return state 

317 

318 

319def meter_cleanup( 

320 cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector 

321): 

322 """cleanup all entries in NFCache""" 

323 for flow_key in list(cache.keys()): 

324 flow = cache[flow_key] 

325 # Push it on channel. 

326 channel.put( 

327 flow.expire( 

328 udps, sync, n_dissections, statistics, splt, ffi, lib, dissector 

329 ) 

330 ) 

331 del cache[flow_key] 

332 del flow 

333 

334 

335def capture_track(lib, capture, mode, interface_stats, tracker, processed, ignored): 

336 """Update shared performance values""" 

337 lib.capture_stats(capture, interface_stats, mode) 

338 tracker[0].value = interface_stats.dropped 

339 tracker[1].value = processed 

340 tracker[2].value = ignored 

341 

342 

343def send_error(root_idx, channel, msg): 

344 if root_idx == 0: 

345 channel.put(InternalError(NFEvent.ERROR, msg)) 

346 

347 

348def meter_workflow( 

349 source, 

350 snaplen, 

351 decode_tunnels, 

352 bpf_filter, 

353 promisc, 

354 n_roots, 

355 root_idx, 

356 mode, 

357 idle_timeout, 

358 active_timeout, 

359 accounting_mode, 

360 udps, 

361 n_dissections, 

362 statistics, 

363 splt, 

364 channel, 

365 tracker, 

366 lock, 

367 group_id, 

368 system_visibility_mode, 

369 socket_buffer_size, 

370): 

371 """Metering workflow""" 

372 set_affinity(root_idx + 1) 

373 ffi, lib = create_engine() 

374 if lib is None: 

375 send_error(root_idx, channel, ENGINE_LOAD_ERR) 

376 return 

377 meter_tick, meter_scan_tick, meter_track_tick = ( 

378 0, 

379 0, 

380 0, 

381 ) # meter, idle scan and perf track timelines 

382 meter_scan_interval, meter_track_interval = ( 

383 10, 

384 1000, 

385 ) # we scan each 10 msecs and update perf each sec. 

386 cache = NFCache() 

387 dissector = setup_dissector(ffi, lib, n_dissections) 

388 if dissector == ffi.NULL and n_dissections: 

389 send_error(root_idx, channel, NDPI_LOAD_ERR) 

390 return 

391 active_flows, ignored_packets, processed_packets = 0, 0, 0 

392 sync = False 

393 if len(udps) > 0: # streamer started with udps: sync internal structures on update. 

394 sync = True 

395 interface_stats = ffi.new("struct nf_stat *") 

396 # We ensure that processes start at the same time 

397 if root_idx == n_roots - 1: 

398 lock.release() 

399 channel.put(InternalState(NFEvent.ALL_AFFINITY_SET)) 

400 else: 

401 lock.acquire() 

402 lock.release() 

403 

404 if mode == NFMode.MULTIPLE_FILES: 

405 sources = source 

406 else: 

407 sources = [source] 

408 

409 for source_idx, source in enumerate(sources): 

410 error_child = ffi.new("char[256]") 

411 capture = setup_capture( 

412 ffi, 

413 lib, 

414 source, 

415 snaplen, 

416 promisc, 

417 mode, 

418 error_child, 

419 group_id, 

420 socket_buffer_size, 

421 ) 

422 if capture is None: 

423 send_error( 

424 root_idx, 

425 channel, 

426 ffi.string(error_child).decode("utf-8", errors="ignore"), 

427 ) 

428 return 

429 # Here the last operation, BPF filtering setup and activation. 

430 if not activate_capture(capture, lib, error_child, bpf_filter, mode): 

431 send_error( 

432 root_idx, 

433 channel, 

434 ffi.string(error_child).decode("utf-8", errors="ignore"), 

435 ) 

436 return 

437 

438 remaining_packets = True 

439 while remaining_packets: 

440 nf_packet = ffi.new("struct nf_packet *") 

441 ret = lib.capture_next( 

442 capture, nf_packet, decode_tunnels, n_roots, root_idx, int(mode) 

443 ) 

444 if ret > 0: # Valid must be processed by meter 

445 packet_time = nf_packet.time 

446 if packet_time > meter_tick: 

447 meter_tick = packet_time 

448 else: 

449 nf_packet.time = meter_tick # Force time order 

450 if ret == 1: # Must be processed 

451 processed_packets += 1 

452 go_scan = False 

453 if meter_tick - meter_scan_tick >= meter_scan_interval: 

454 go_scan = True # Activate scan 

455 meter_scan_tick = meter_tick 

456 # Consume packet and return diff 

457 diff = consume( 

458 nf_packet, 

459 cache, 

460 active_timeout, 

461 idle_timeout, 

462 channel, 

463 ffi, 

464 lib, 

465 udps, 

466 sync, 

467 accounting_mode, 

468 n_dissections, 

469 statistics, 

470 splt, 

471 dissector, 

472 decode_tunnels, 

473 system_visibility_mode, 

474 ) 

475 active_flows += diff 

476 if go_scan: 

477 idles = meter_scan( 

478 meter_tick, 

479 cache, 

480 idle_timeout, 

481 channel, 

482 udps, 

483 sync, 

484 n_dissections, 

485 statistics, 

486 splt, 

487 ffi, 

488 lib, 

489 dissector, 

490 ) 

491 active_flows -= idles 

492 else: # time ticker 

493 if meter_tick - meter_scan_tick >= meter_scan_interval: 

494 idles = meter_scan( 

495 meter_tick, 

496 cache, 

497 idle_timeout, 

498 channel, 

499 udps, 

500 sync, 

501 n_dissections, 

502 statistics, 

503 splt, 

504 ffi, 

505 lib, 

506 dissector, 

507 ) 

508 active_flows -= idles 

509 meter_scan_tick = meter_tick 

510 elif ret == 0: # Ignored packet 

511 ignored_packets += 1 

512 elif ret == -1: # Read error or empty buffer 

513 pass 

514 else: # End of file 

515 remaining_packets = False # end of loop 

516 if ( 

517 meter_tick - meter_track_tick >= meter_track_interval 

518 ): # Performance tracking 

519 capture_track( 

520 lib, 

521 capture, 

522 mode, 

523 interface_stats, 

524 tracker, 

525 processed_packets, 

526 ignored_packets, 

527 ) 

528 meter_track_tick = meter_tick 

529 # Close capture 

530 lib.capture_close(capture) 

531 

532 # Expire all remaining flows in the cache. 

533 meter_cleanup( 

534 cache, channel, udps, sync, n_dissections, statistics, splt, ffi, lib, dissector 

535 ) 

536 # Clean dissector 

537 lib.dissector_cleanup(dissector) 

538 # Release engine library 

539 channel.put(None)