Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.8/site-packages/nfstream/streamer.py: 66%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

358 statements  

1""" 

2------------------------------------------------------------------------------------------------------------------------ 

3streamer.py 

4Copyright (C) 2019-22 - NFStream Developers 

5This file is part of NFStream, a Flexible Network Data Analysis Framework (https://www.nfstream.org/). 

6NFStream is free software: you can redistribute it and/or modify it under the terms of the GNU Lesser General Public 

7License as published by the Free Software Foundation, either version 3 of the License, or (at your option) any later 

8version. 

9NFStream is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty 

10of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details. 

11You should have received a copy of the GNU Lesser General Public License along with NFStream. 

12If not, see <http://www.gnu.org/licenses/>. 

13------------------------------------------------------------------------------------------------------------------------ 

14""" 

15 

16from multiprocessing import get_context 

17import threading 

18import pandas as pd 

19import time as tm 

20import os 

21import platform 

22import psutil 

23from collections.abc import Iterable 

24from os.path import isfile 

25from .meter import meter_workflow 

26from .anonymizer import NFAnonymizer 

27from .engine import is_interface 

28from .plugin import NFPlugin 

29from .utils import ( 

30 csv_converter, 

31 open_file, 

32 RepeatedTimer, 

33 update_performances, 

34 set_affinity, 

35 available_cpus_count, 

36) 

37from .utils import ( 

38 validate_flows_per_file, 

39 NFMode, 

40 create_csv_file_path, 

41 NFEvent, 

42 validate_rotate_files, 

43) 

44from .system import system_socket_worflow, match_flow_conn 

45 

46 

47class NFStreamer(object): 

48 streamer_id = 0 # class id generator 

49 glock = threading.Lock() 

50 is_windows = "windows" in platform.system().lower() 

51 

52 """ Network Flow Streamer 

53 

54 Examples: 

55 

56 >>> from nfstream import NFStreamer 

57 >>> # Streamer object for reading traffic from a PCAP 

58 >>> streamer = NFStreamer(source='path/to/file.pcap') 

59 >>> # Converting data to pandas dataframe 

60 >>> df = streamer.to_pandas() 

61 

62 """ 

63 

64 def __init__( 

65 self, 

66 source=None, 

67 decode_tunnels=True, 

68 bpf_filter=None, 

69 promiscuous_mode=True, 

70 snapshot_length=1536, 

71 socket_buffer_size=0, 

72 idle_timeout=120, # https://www.kernel.org/doc/Documentation/networking/nf_conntrack-sysctl.txt 

73 active_timeout=1800, 

74 accounting_mode=0, 

75 udps=None, 

76 n_dissections=20, 

77 statistical_analysis=False, 

78 splt_analysis=0, 

79 n_meters=0, 

80 max_nflows=0, 

81 performance_report=0, 

82 system_visibility_mode=0, 

83 system_visibility_poll_ms=100, 

84 ): 

85 with NFStreamer.glock: 

86 NFStreamer.streamer_id += 1 

87 self._idx = NFStreamer.streamer_id 

88 self._mode = NFMode.SINGLE_FILE 

89 self.source = source 

90 self.decode_tunnels = decode_tunnels 

91 self.bpf_filter = bpf_filter 

92 self.promiscuous_mode = promiscuous_mode 

93 self.snapshot_length = snapshot_length 

94 self.idle_timeout = idle_timeout 

95 self.active_timeout = active_timeout 

96 self.accounting_mode = accounting_mode 

97 self.udps = udps 

98 self.n_dissections = n_dissections 

99 self.statistical_analysis = statistical_analysis 

100 self.splt_analysis = splt_analysis 

101 self.n_meters = n_meters 

102 self.max_nflows = max_nflows 

103 self.performance_report = performance_report 

104 self.system_visibility_mode = system_visibility_mode 

105 self.system_visibility_poll_ms = system_visibility_poll_ms 

106 

107 # NIC socket buffer size. Default is 0, which means that the pcap default value is used. 

108 # The default values may vary depending on the OS and CPU architecture. 

109 # Range: 0 - 2^31-1 

110 self.socket_buffer_size = socket_buffer_size 

111 

112 if NFStreamer.is_windows: 

113 self._mp_context = get_context("spawn") 

114 else: 

115 self._mp_context = get_context("fork") 

116 

117 @property 

118 def source(self): 

119 return self._source 

120 

121 @source.setter 

122 def source(self, value): 

123 if type(value) == list: # List of pcap files to consider as a single one. 

124 if len(value) == 0: 

125 raise ValueError("Please provide a non-empty list of sources.") 

126 else: 

127 for i in range(len(value)): 

128 try: 

129 value[i] = str(os.fspath(value[i])) 

130 if not isfile(value[i]): 

131 raise TypeError 

132 except TypeError: 

133 raise ValueError( 

134 "Invalid pcap file path at index: " + str(i) + "." 

135 ) 

136 self._mode = NFMode.MULTIPLE_FILES 

137 else: 

138 try: 

139 value = str(os.fspath(value)) 

140 except TypeError: 

141 raise ValueError( 

142 "Please specify a pcap file path or a valid network interface name as source." 

143 ) 

144 if isfile(value): 

145 self._mode = NFMode.SINGLE_FILE 

146 else: 

147 interface = is_interface(value) 

148 if interface is not None: 

149 self._mode = NFMode.INTERFACE 

150 value = interface 

151 else: 

152 raise ValueError( 

153 "Please specify a pcap file path or a valid network interface name as source." 

154 ) 

155 self._source = value 

156 

157 @property 

158 def decode_tunnels(self): 

159 return self._decode_tunnels 

160 

161 @decode_tunnels.setter 

162 def decode_tunnels(self, value): 

163 if not isinstance(value, bool): 

164 raise ValueError( 

165 "Please specify a valid decode_tunnels parameter (possible values: True, False)." 

166 ) 

167 self._decode_tunnels = value 

168 

169 @property 

170 def bpf_filter(self): 

171 return self._bpf_filter 

172 

173 @bpf_filter.setter 

174 def bpf_filter(self, value): 

175 if not isinstance(value, str) and value is not None: 

176 raise ValueError("Please specify a valid bpf_filter format.") 

177 self._bpf_filter = value 

178 

179 @property 

180 def promiscuous_mode(self): 

181 return self._promiscuous_mode 

182 

183 @promiscuous_mode.setter 

184 def promiscuous_mode(self, value): 

185 if not isinstance(value, bool): 

186 raise ValueError( 

187 "Please specify a valid promiscuous_mode parameter (possible values: True, False)." 

188 ) 

189 self._promiscuous_mode = value 

190 

191 @property 

192 def snapshot_length(self): 

193 return self._snapshot_length 

194 

195 @snapshot_length.setter 

196 def snapshot_length(self, value): 

197 if not isinstance(value, int) or value <= 0: 

198 raise ValueError( 

199 "Please specify a valid snapshot_length parameter (positive integer)." 

200 ) 

201 self._snapshot_length = value 

202 

203 @property 

204 def socket_buffer_size(self): 

205 return self._socket_buffer_size 

206 

207 @socket_buffer_size.setter 

208 def socket_buffer_size(self, value): 

209 if not isinstance(value, int) or (value < 0 or value > 2**31 - 1): 

210 raise ValueError( 

211 "Please specify a valid socket_buffer_size parameter (positive integer <= 2^31-1)." 

212 ) 

213 self._socket_buffer_size = value 

214 

215 @property 

216 def idle_timeout(self): 

217 return self._idle_timeout 

218 

219 @idle_timeout.setter 

220 def idle_timeout(self, value): 

221 if not isinstance(value, int) or ( 

222 (value < 0) or (value * 1000) > 18446744073709551615 

223 ): # max uint64_t 

224 raise ValueError( 

225 "Please specify a valid idle_timeout parameter (positive integer in seconds)." 

226 ) 

227 self._idle_timeout = value 

228 

229 @property 

230 def active_timeout(self): 

231 return self._active_timeout 

232 

233 @active_timeout.setter 

234 def active_timeout(self, value): 

235 if not isinstance(value, int) or ( 

236 (value < 0) or (value * 1000) > 18446744073709551615 

237 ): # max uint64_t 

238 raise ValueError( 

239 "Please specify a valid active_timeout parameter (positive integer in seconds)." 

240 ) 

241 self._active_timeout = value 

242 

243 @property 

244 def accounting_mode(self): 

245 return self._accounting_mode 

246 

247 @accounting_mode.setter 

248 def accounting_mode(self, value): 

249 if not isinstance(value, int) or (value not in [0, 1, 2, 3]): 

250 raise ValueError( 

251 "Please specify a valid accounting_mode parameter (possible values: 0, 1, 2, 3)." 

252 ) 

253 self._accounting_mode = value 

254 

255 @property 

256 def udps(self): 

257 return self._udps 

258 

259 @udps.setter 

260 def udps(self, value): 

261 multiple = isinstance(value, Iterable) 

262 if multiple: 

263 for plugin in value: 

264 if isinstance(plugin, NFPlugin): 

265 pass 

266 else: 

267 raise ValueError( 

268 "User defined plugins must inherit from NFPlugin type." 

269 ) 

270 self._udps = value 

271 else: 

272 if isinstance(value, NFPlugin): 

273 self._udps = (value,) 

274 else: 

275 if value is None: 

276 self._udps = () 

277 else: 

278 raise ValueError( 

279 "User defined plugins must inherit from NFPlugin type." 

280 ) 

281 

282 @property 

283 def n_dissections(self): 

284 return self._n_dissections 

285 

286 @n_dissections.setter 

287 def n_dissections(self, value): 

288 if not isinstance(value, int) or (value < 0 or value > 255): 

289 raise ValueError( 

290 "Please specify a valid n_dissections parameter (possible values in : [0,...,255])." 

291 ) 

292 self._n_dissections = value 

293 

294 @property 

295 def statistical_analysis(self): 

296 return self._statistical_analysis 

297 

298 @statistical_analysis.setter 

299 def statistical_analysis(self, value): 

300 if not isinstance(value, bool): 

301 raise ValueError( 

302 "Please specify a valid statistical_analysis parameter (possible values: True, False)." 

303 ) 

304 self._statistical_analysis = value 

305 

306 @property 

307 def splt_analysis(self): 

308 return self._splt_analysis 

309 

310 @splt_analysis.setter 

311 def splt_analysis(self, value): 

312 if not isinstance(value, int) or (value < 0 or value > 65535): 

313 raise ValueError( 

314 "Please specify a valid splt_analysis parameter (possible values in : [0,...,65535])" 

315 ) 

316 if value > 255: 

317 print( 

318 "[WARNING]: The specified splt_analysis parameter is higher than 255. High values can impact the performance of the tool." 

319 ) 

320 self._splt_analysis = value 

321 

322 @property 

323 def n_meters(self): 

324 return self._n_meters 

325 

326 @n_meters.setter 

327 def n_meters(self, value): 

328 if isinstance(value, int) and value >= 0: 

329 pass 

330 else: 

331 raise ValueError( 

332 "Please specify a valid n_meters parameter (>=1 or 0 for auto scaling)." 

333 ) 

334 c_cpus, c_cores = available_cpus_count(), psutil.cpu_count(logical=False) 

335 if ( 

336 c_cores is None 

337 ): # Patch for platforms returning None (https://github.com/giampaolo/psutil/issues/1078) 

338 c_cores = c_cpus 

339 if value == 0: 

340 if platform.system() == "Linux" and self._mode == NFMode.INTERFACE: 

341 self._n_meters = ( 

342 c_cpus - 1 

343 ) # We are in live capture mode and kernel fanout will be available 

344 # only on Linux, we set the n_meters to detected logical CPUs -1 

345 else: # Windows, MacOS, offline capture 

346 if c_cpus >= c_cores: 

347 if ( 

348 c_cpus == 2 * c_cores or c_cpus == c_cores 

349 ): # multi-thread or single threaded 

350 self._n_meters = c_cores - 1 

351 else: 

352 self._n_meters = int(divmod(c_cpus / 2, 1)[0]) - 1 

353 else: # weird case, fallback on cpu count. 

354 self._n_meters = c_cpus - 1 

355 else: 

356 if (value + 1) <= c_cpus: 

357 self._n_meters = value 

358 else: # avoid contention 

359 print( 

360 "WARNING: n_meters set to :{} in order to avoid contention.".format( 

361 c_cpus - 1 

362 ) 

363 ) 

364 self._n_meters = c_cpus - 1 

365 if self._n_meters == 0: # one CPU case 

366 self._n_meters = 1 

367 

368 @property 

369 def max_nflows(self): 

370 return self._max_nflows 

371 

372 @max_nflows.setter 

373 def max_nflows(self, value): 

374 if isinstance(value, int) and value >= 0: 

375 self._max_nflows = value - 1 

376 else: 

377 raise ValueError("Please specify a valid max_nflows parameter (>=0).") 

378 

379 @property 

380 def performance_report(self): 

381 return self._performance_report 

382 

383 @performance_report.setter 

384 def performance_report(self, value): 

385 if isinstance(value, int) and value >= 0: 

386 pass 

387 else: 

388 raise ValueError( 

389 "Please specify a valid performance_report parameter (>=1 for reporting interval (seconds)" 

390 " or 0 to disable). [Available only for Live capture]" 

391 ) 

392 self._performance_report = value 

393 

394 @property 

395 def system_visibility_mode(self): 

396 return self._system_visibility_mode 

397 

398 @system_visibility_mode.setter 

399 def system_visibility_mode(self, value): 

400 if isinstance(value, int) and value in [0, 1]: 

401 if self._mode == NFMode.SINGLE_FILE and value > 0: 

402 print( 

403 "WARNING: system_visibility_mode switched to 0 in offline capture " 

404 "(available only for live capture)" 

405 ) 

406 value = 0 

407 else: 

408 pass 

409 else: 

410 raise ValueError( 

411 "Please specify a valid system_visibility_mode parameter\n" 

412 "0: disable\n" 

413 "1: process information\n" 

414 "[Available only for live capture on the system generating the traffic]" 

415 ) 

416 self._system_visibility_mode = value 

417 

418 @property 

419 def system_visibility_poll_ms(self): 

420 return self._system_visibility_poll_ms 

421 

422 @system_visibility_poll_ms.setter 

423 def system_visibility_poll_ms(self, value): 

424 if isinstance(value, int) and value >= 0: 

425 pass 

426 else: 

427 raise ValueError( 

428 "Please specify a valid system_visibility_poll_ms parameter " 

429 "(positive integer in milliseconds)" 

430 ) 

431 self._system_visibility_poll_ms = value 

432 

433 def __iter__(self): 

434 lock = self._mp_context.Lock() 

435 lock.acquire() 

436 meters = [] 

437 performances = [] 

438 n_terminated = 0 

439 child_error = None 

440 rt = None 

441 socket_listener = None 

442 browser_listener = None 

443 conn_cache = {} 

444 

445 # To avoid issues on PyPy on Windows (See https://foss.heptapod.net/pypy/pypy/-/issues/3488), All 

446 # multiprocessing Value invocation must be performed before the call to Queue. 

447 n_meters = self.n_meters 

448 idx_generator = self._mp_context.Value("i", 0) 

449 for i in range(n_meters): 

450 performances.append( 

451 [ 

452 self._mp_context.Value("I", 0), 

453 self._mp_context.Value("I", 0), 

454 self._mp_context.Value("I", 0), 

455 ] 

456 ) 

457 channel = self._mp_context.Queue(maxsize=32767) # Backpressure strategy. 

458 # We set it to (2^15-1) to cope with OSX max semaphore value. 

459 group_id = os.getpid() + self._idx # Used for fanout on Linux systems 

460 try: 

461 for i in range(n_meters): 

462 meters.append( 

463 self._mp_context.Process( 

464 target=meter_workflow, 

465 args=( 

466 self.source, 

467 self.snapshot_length, 

468 self.decode_tunnels, 

469 self.bpf_filter, 

470 self.promiscuous_mode, 

471 n_meters, 

472 i, 

473 self._mode, 

474 self.idle_timeout * 1000, 

475 self.active_timeout * 1000, 

476 self.accounting_mode, 

477 self.udps, 

478 self.n_dissections, 

479 self.statistical_analysis, 

480 self.splt_analysis, 

481 channel, 

482 performances[i], 

483 lock, 

484 group_id, 

485 self.system_visibility_mode, 

486 self.socket_buffer_size, 

487 ), 

488 ) 

489 ) 

490 meters[i].daemon = True # demonize meter 

491 meters[i].start() 

492 if self._mode == NFMode.INTERFACE and self.performance_report > 0: 

493 if platform.system() == "Linux": 

494 rt = RepeatedTimer( 

495 self.performance_report, 

496 update_performances, 

497 performances, 

498 True, 

499 idx_generator, 

500 ) 

501 else: 

502 rt = RepeatedTimer( 

503 self.performance_report, 

504 update_performances, 

505 performances, 

506 False, 

507 idx_generator, 

508 ) 

509 if self._mode == NFMode.INTERFACE and self.system_visibility_mode: 

510 socket_listener = self._mp_context.Process( 

511 target=system_socket_worflow, 

512 args=( 

513 channel, 

514 self.idle_timeout * 1000, 

515 self.system_visibility_poll_ms / 1000, 

516 ), 

517 ) 

518 socket_listener.daemon = True # demonize socket_listener 

519 socket_listener.start() 

520 

521 while True: 

522 try: 

523 recv = channel.get() 

524 if recv is None: # termination and stats 

525 n_terminated += 1 

526 if n_terminated == n_meters: 

527 break # We finish up when all metering jobs are terminated 

528 else: 

529 if recv.id == NFEvent.ERROR: # Error message 

530 for i in range(n_meters): # We break workflow loop 

531 meters[i].terminate() 

532 child_error = recv.message 

533 break 

534 elif recv.id == NFEvent.ALL_AFFINITY_SET: 

535 set_affinity( 

536 0 

537 ) # we pin streamer to core 0 as it's the less intensive task and several services runs 

538 # by default on this core. 

539 elif recv.id == NFEvent.SOCKET_CREATE: 

540 conn_cache[recv.key] = [recv.process_name, recv.process_pid] 

541 elif recv.id == NFEvent.SOCKET_REMOVE: 

542 del conn_cache[recv.key] 

543 else: # NFEvent.FLOW 

544 recv.id = idx_generator.value # Unify ID 

545 idx_generator.value = idx_generator.value + 1 

546 if ( 

547 self._mode == NFMode.INTERFACE 

548 and self.system_visibility_mode 

549 ): 

550 recv = match_flow_conn(conn_cache, recv) 

551 yield recv 

552 if recv.id == self.max_nflows: 

553 raise KeyboardInterrupt # We reached the maximum flows count defined by the user. 

554 except KeyboardInterrupt: 

555 for i in range(n_meters): # We break workflow loop 

556 meters[i].terminate() 

557 break 

558 for i in range(n_meters): 

559 if meters[i].is_alive(): 

560 meters[i].join() # Join metering jobs 

561 if self._mode == NFMode.INTERFACE and self.performance_report > 0: 

562 rt.stop() 

563 if self._mode == NFMode.INTERFACE and self.system_visibility_mode: 

564 socket_listener.terminate() 

565 channel.close() # We close the queue 

566 channel.join_thread() # and we join its thread 

567 if child_error is not None: 

568 raise ValueError(child_error) 

569 except ( 

570 ValueError 

571 ) as observer_error: # job initiation failed due to some bad observer parameters. 

572 raise ValueError(observer_error) 

573 

574 def to_csv( 

575 self, path=None, columns_to_anonymize=(), flows_per_file=0, rotate_files=0 

576 ): 

577 validate_flows_per_file(flows_per_file) 

578 validate_rotate_files(rotate_files) 

579 chunked, chunk_idx = True, -1 

580 if flows_per_file == 0: 

581 chunked = False 

582 output_path = create_csv_file_path(path, self.source) 

583 total_flows, chunk_flows = 0, 0 

584 anon = NFAnonymizer(cols_names=columns_to_anonymize) 

585 f = None 

586 for flow in self: 

587 try: 

588 if total_flows == 0 or ( 

589 chunked and (chunk_flows > flows_per_file) 

590 ): # header creation 

591 if f is not None: 

592 f.close() 

593 chunk_flows = 1 

594 chunk_idx += 1 

595 f = open_file(output_path, chunked, chunk_idx, rotate_files) 

596 header = ",".join([str(i) for i in flow.keys()]) + "\n" 

597 f.write(header.encode("utf-8")) 

598 values = anon.process(flow) 

599 csv_converter(values) 

600 to_export = ",".join([str(i) for i in values]) + "\n" 

601 f.write(to_export.encode("utf-8")) 

602 total_flows = total_flows + 1 

603 chunk_flows += 1 

604 except KeyboardInterrupt: 

605 pass 

606 if f is not None: 

607 if not f.closed: 

608 f.close() 

609 return total_flows 

610 

611 def to_pandas(self, columns_to_anonymize=()): 

612 """streamer to pandas function""" 

613 temp_file_path = "nfstream-{pid}-{iid}-{ts}.csv".format( 

614 pid=os.getpid(), iid=NFStreamer.streamer_id, ts=tm.time() 

615 ) 

616 total_flows = self.to_csv( 

617 path=temp_file_path, 

618 columns_to_anonymize=columns_to_anonymize, 

619 flows_per_file=0, 

620 ) 

621 if total_flows > 0: # If there is flows, return Dataframe else return None. 

622 df = pd.read_csv( 

623 temp_file_path, engine="c" 

624 ) # Use C engine for superior performance (non-experimental) 

625 if total_flows != df.shape[0]: 

626 print( 

627 "WARNING: {} flows ignored by pandas type conversion. Consider using to_csv() " 

628 "method if drops are critical.".format( 

629 abs(df.shape[0] - total_flows) 

630 ) 

631 ) 

632 else: 

633 df = None 

634 if os.path.exists(temp_file_path): 

635 os.remove(temp_file_path) 

636 return df