Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/main.py: 15%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

507 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7Main module for interactive startup. 

8""" 

9 

10 

11import builtins 

12import pathlib 

13import sys 

14import os 

15import getopt 

16import code 

17import gzip 

18import glob 

19import importlib 

20import io 

21from itertools import zip_longest 

22import logging 

23import pickle 

24import types 

25import warnings 

26from random import choice 

27 

28# Never add any global import, in main.py, that would trigger a 

29# warning message before the console handlers gets added in interact() 

30from scapy.error import ( 

31 log_interactive, 

32 log_loading, 

33 Scapy_Exception, 

34) 

35from scapy.themes import DefaultTheme, BlackAndWhite, apply_ipython_style 

36from scapy.consts import WINDOWS 

37 

38from typing import ( 

39 Any, 

40 Dict, 

41 List, 

42 Optional, 

43 Union, 

44 overload, 

45) 

46from scapy.compat import ( 

47 Literal, 

48) 

49 

50LAYER_ALIASES = { 

51 "tls": "tls.all", 

52 "msrpce": "msrpce.all", 

53} 

54 

55QUOTES = [ 

56 ("Craft packets like it is your last day on earth.", "Lao-Tze"), 

57 ("Craft packets like I craft my beer.", "Jean De Clerck"), 

58 ("Craft packets before they craft you.", "Socrate"), 

59 ("Craft me if you can.", "IPv6 layer"), 

60 ("To craft a packet, you have to be a packet, and learn how to swim in " 

61 "the wires and in the waves.", "Jean-Claude Van Damme"), 

62 ("We are in France, we say Skappee. OK? Merci.", "Sebastien Chabal"), 

63 ("Wanna support scapy? Star us on GitHub!", "Satoshi Nakamoto"), 

64 ("I'll be back.", "Python 2"), 

65] 

66 

67 

68def _probe_xdg_folder(var, default, *cf): 

69 # type: (str, str, *str) -> Optional[pathlib.Path] 

70 path = pathlib.Path(os.environ.get(var, default)) 

71 try: 

72 if not path.exists(): 

73 # ~ folder doesn't exist. Create according to spec 

74 # https://specifications.freedesktop.org/basedir-spec/basedir-spec-latest.html 

75 # "If, when attempting to write a file, the destination directory is 

76 # non-existent an attempt should be made to create it with permission 0700." 

77 path.mkdir(mode=0o700, exist_ok=True) 

78 except Exception: 

79 # There is a gazillion ways this can fail. Most notably, a read-only fs or no 

80 # permissions to even check for folder to exist (e.x. privileges were dropped 

81 # before scapy was started). 

82 return None 

83 return path.joinpath(*cf).resolve() 

84 

85 

86def _probe_config_folder(*cf): 

87 # type: (str) -> Optional[pathlib.Path] 

88 return _probe_xdg_folder( 

89 "XDG_CONFIG_HOME", 

90 os.path.join(os.path.expanduser("~"), ".config"), 

91 *cf 

92 ) 

93 

94 

95def _probe_cache_folder(*cf): 

96 # type: (str) -> Optional[pathlib.Path] 

97 return _probe_xdg_folder( 

98 "XDG_CACHE_HOME", 

99 os.path.join(os.path.expanduser("~"), ".cache"), 

100 *cf 

101 ) 

102 

103 

104def _check_perms(file: Union[pathlib.Path, str]) -> None: 

105 """ 

106 Checks that the permissions of a file are properly user-specific, if sudo is used. 

107 """ 

108 if ( 

109 not WINDOWS and 

110 "SUDO_UID" in os.environ and 

111 "SUDO_GID" in os.environ 

112 ): 

113 # Was started with sudo. Still, chown to the user. 

114 try: 

115 os.chown( 

116 file, 

117 int(os.environ["SUDO_UID"]), 

118 int(os.environ["SUDO_GID"]), 

119 ) 

120 except Exception: 

121 pass 

122 

123 

124def _read_config_file(cf, _globals=globals(), _locals=locals(), 

125 interactive=True, default=None): 

126 # type: (str, Dict[str, Any], Dict[str, Any], bool, Optional[str]) -> None 

127 """Read a config file: execute a python file while loading scapy, that 

128 may contain some pre-configured values. 

129 

130 If _globals or _locals are specified, they will be updated with 

131 the loaded vars. This allows an external program to use the 

132 function. Otherwise, vars are only available from inside the scapy 

133 console. 

134 

135 Parameters: 

136 

137 :param _globals: the globals() vars 

138 :param _locals: the locals() vars 

139 :param interactive: specified whether or not errors should be printed 

140 using the scapy console or raised. 

141 :param default: if provided, set a default value for the config file 

142 

143 ex, content of a config.py file: 

144 'conf.verb = 42\n' 

145 Manual loading: 

146 >>> _read_config_file("./config.py")) 

147 >>> conf.verb 

148 2 

149 

150 """ 

151 cf_path = pathlib.Path(cf) 

152 if not cf_path.exists(): 

153 log_loading.debug("Config file [%s] does not exist.", cf) 

154 if default is None: 

155 return 

156 # We have a default ! set it 

157 try: 

158 if not cf_path.parent.exists(): 

159 cf_path.parent.mkdir(parents=True, exist_ok=True) 

160 _check_perms(cf_path.parent) 

161 

162 with cf_path.open("w") as fd: 

163 fd.write(default) 

164 

165 _check_perms(cf_path) 

166 log_loading.debug("Config file [%s] created with default.", cf) 

167 except OSError: 

168 log_loading.warning("Config file [%s] could not be created.", cf, 

169 exc_info=True) 

170 return 

171 log_loading.debug("Loading config file [%s]", cf) 

172 try: 

173 with open(cf) as cfgf: 

174 exec( 

175 compile(cfgf.read(), cf, 'exec'), 

176 _globals, _locals 

177 ) 

178 except IOError as e: 

179 if interactive: 

180 raise 

181 log_loading.warning("Cannot read config file [%s] [%s]", cf, e) 

182 except Exception: 

183 if interactive: 

184 raise 

185 log_loading.exception("Error during evaluation of config file [%s]", 

186 cf) 

187 

188 

189def _validate_local(k): 

190 # type: (str) -> bool 

191 """Returns whether or not a variable should be imported.""" 

192 return k[0] != "_" and k not in ["range", "map"] 

193 

194 

195# This is ~/.config/scapy 

196SCAPY_CONFIG_FOLDER = _probe_config_folder("scapy") 

197SCAPY_CACHE_FOLDER = _probe_cache_folder("scapy") 

198 

199if SCAPY_CONFIG_FOLDER: 

200 DEFAULT_PRESTART_FILE: Optional[str] = str(SCAPY_CONFIG_FOLDER / "prestart.py") 

201 DEFAULT_STARTUP_FILE: Optional[str] = str(SCAPY_CONFIG_FOLDER / "startup.py") 

202else: 

203 DEFAULT_PRESTART_FILE = None 

204 DEFAULT_STARTUP_FILE = None 

205 

206# Default scapy prestart.py config file 

207 

208DEFAULT_PRESTART = """ 

209# Scapy CLI 'pre-start' config file 

210# see https://scapy.readthedocs.io/en/latest/api/scapy.config.html#scapy.config.Conf 

211# for all available options 

212 

213# default interpreter 

214conf.interactive_shell = "auto" 

215 

216# color theme (DefaultTheme, BrightTheme, ColorOnBlackTheme, BlackAndWhite, ...) 

217conf.color_theme = DefaultTheme() 

218 

219# disable INFO: tags related to dependencies missing 

220# log_loading.setLevel(logging.WARNING) 

221 

222# force-use libpcap 

223# conf.use_pcap = True 

224""".strip() 

225 

226 

227def _usage(): 

228 # type: () -> None 

229 print( 

230 "Usage: scapy.py [-s sessionfile] [-c new_startup_file] " 

231 "[-p new_prestart_file] [-C] [-P] [-H]\n" 

232 "Args:\n" 

233 "\t-H: header-less start\n" 

234 "\t-C: do not read startup file\n" 

235 "\t-P: do not read pre-startup file\n" 

236 ) 

237 sys.exit(0) 

238 

239 

240###################### 

241# Extension system # 

242###################### 

243 

244 

245def _load(module, globals_dict=None, symb_list=None): 

246 # type: (str, Optional[Dict[str, Any]], Optional[List[str]]) -> None 

247 """Loads a Python module to make variables, objects and functions 

248available globally. 

249 

250 The idea is to load the module using importlib, then copy the 

251symbols to the global symbol table. 

252 

253 """ 

254 if globals_dict is None: 

255 globals_dict = builtins.__dict__ 

256 try: 

257 mod = importlib.import_module(module) 

258 if '__all__' in mod.__dict__: 

259 # import listed symbols 

260 for name in mod.__dict__['__all__']: 

261 if symb_list is not None: 

262 symb_list.append(name) 

263 globals_dict[name] = mod.__dict__[name] 

264 else: 

265 # only import non-private symbols 

266 for name, sym in mod.__dict__.items(): 

267 if _validate_local(name): 

268 if symb_list is not None: 

269 symb_list.append(name) 

270 globals_dict[name] = sym 

271 except Exception: 

272 log_interactive.error("Loading module %s", module, exc_info=True) 

273 

274 

275def load_module(name, globals_dict=None, symb_list=None): 

276 # type: (str, Optional[Dict[str, Any]], Optional[List[str]]) -> None 

277 """Loads a Scapy module to make variables, objects and functions 

278 available globally. 

279 

280 """ 

281 _load("scapy.modules." + name, 

282 globals_dict=globals_dict, symb_list=symb_list) 

283 

284 

285def load_layer(name, globals_dict=None, symb_list=None): 

286 # type: (str, Optional[Dict[str, Any]], Optional[List[str]]) -> None 

287 """Loads a Scapy layer module to make variables, objects and functions 

288 available globally. 

289 

290 """ 

291 _load("scapy.layers." + LAYER_ALIASES.get(name, name), 

292 globals_dict=globals_dict, symb_list=symb_list) 

293 

294 

295def load_contrib(name, globals_dict=None, symb_list=None): 

296 # type: (str, Optional[Dict[str, Any]], Optional[List[str]]) -> None 

297 """Loads a Scapy contrib module to make variables, objects and 

298 functions available globally. 

299 

300 If no contrib module can be found with the given name, try to find 

301 a layer module, since a contrib module may become a layer module. 

302 

303 """ 

304 try: 

305 importlib.import_module("scapy.contrib." + name) 

306 _load("scapy.contrib." + name, 

307 globals_dict=globals_dict, symb_list=symb_list) 

308 except ImportError as e: 

309 # if layer not found in contrib, try in layers 

310 try: 

311 load_layer(name, 

312 globals_dict=globals_dict, symb_list=symb_list) 

313 except ImportError: 

314 raise e # Let's raise the original error to avoid confusion 

315 

316 

317def list_contrib(name=None, # type: Optional[str] 

318 ret=False, # type: bool 

319 _debug=False # type: bool 

320 ): 

321 # type: (...) -> Optional[List[Dict[str, str]]] 

322 """Show the list of all existing contribs. 

323 

324 :param name: filter to search the contribs 

325 :param ret: whether the function should return a dict instead of 

326 printing it 

327 :returns: None or a dictionary containing the results if ret=True 

328 """ 

329 # _debug: checks that all contrib modules have correctly defined: 

330 # # scapy.contrib.description = [...] 

331 # # scapy.contrib.status = [...] 

332 # # scapy.contrib.name = [...] (optional) 

333 # or set the flag: 

334 # # scapy.contrib.description = skip 

335 # to skip the file 

336 if name is None: 

337 name = "*.py" 

338 elif "*" not in name and "?" not in name and not name.endswith(".py"): 

339 name += ".py" 

340 results = [] # type: List[Dict[str, str]] 

341 dir_path = os.path.join(os.path.dirname(__file__), "contrib") 

342 if sys.version_info >= (3, 5): 

343 name = os.path.join(dir_path, "**", name) 

344 iterator = glob.iglob(name, recursive=True) 

345 else: 

346 name = os.path.join(dir_path, name) 

347 iterator = glob.iglob(name) 

348 for f in iterator: 

349 mod = f.replace(os.path.sep, ".").partition("contrib.")[2] 

350 if mod.startswith("__"): 

351 continue 

352 if mod.endswith(".py"): 

353 mod = mod[:-3] 

354 desc = {"description": "", "status": "", "name": mod} 

355 with io.open(f, errors="replace") as fd: 

356 for line in fd: 

357 if line[0] != "#": 

358 continue 

359 p = line.find("scapy.contrib.") 

360 if p >= 0: 

361 p += 14 

362 q = line.find("=", p) 

363 key = line[p:q].strip() 

364 value = line[q + 1:].strip() 

365 desc[key] = value 

366 if desc["status"] == "skip": 

367 break 

368 if desc["description"] and desc["status"]: 

369 results.append(desc) 

370 break 

371 if _debug: 

372 if desc["status"] == "skip": 

373 pass 

374 elif not desc["description"] or not desc["status"]: 

375 raise Scapy_Exception("Module %s is missing its " 

376 "contrib infos !" % mod) 

377 results.sort(key=lambda x: x["name"]) 

378 if ret: 

379 return results 

380 else: 

381 for desc in results: 

382 print("%(name)-20s: %(description)-40s status=%(status)s" % desc) 

383 return None 

384 

385 

386############################## 

387# Session saving/restoring # 

388############################## 

389 

390def update_ipython_session(session): 

391 # type: (Dict[str, Any]) -> None 

392 """Updates IPython session with a custom one""" 

393 if "_oh" not in session: 

394 session["_oh"] = session["Out"] = {} 

395 session["In"] = {} 

396 try: 

397 from IPython import get_ipython 

398 get_ipython().user_ns.update(session) 

399 except Exception: 

400 pass 

401 

402 

403def _scapy_prestart_builtins(): 

404 # type: () -> Dict[str, Any] 

405 """Load Scapy prestart and return all builtins""" 

406 return { 

407 k: v 

408 for k, v in importlib.import_module(".config", "scapy").__dict__.copy().items() 

409 if _validate_local(k) 

410 } 

411 

412 

413def _scapy_builtins(): 

414 # type: () -> Dict[str, Any] 

415 """Load Scapy and return all builtins""" 

416 return { 

417 k: v 

418 for k, v in importlib.import_module(".all", "scapy").__dict__.copy().items() 

419 if _validate_local(k) 

420 } 

421 

422 

423def _scapy_exts(): 

424 # type: () -> Dict[str, Any] 

425 """Load Scapy exts and return their builtins""" 

426 from scapy.config import conf 

427 res = {} 

428 for modname, spec in conf.exts.all_specs.items(): 

429 if spec.default: 

430 mod = sys.modules[modname] 

431 res.update({ 

432 k: v 

433 for k, v in mod.__dict__.copy().items() 

434 if _validate_local(k) 

435 }) 

436 return res 

437 

438 

439def save_session(fname="", session=None, pickleProto=-1): 

440 # type: (str, Optional[Dict[str, Any]], int) -> None 

441 """Save current Scapy session to the file specified in the fname arg. 

442 

443 params: 

444 - fname: file to save the scapy session in 

445 - session: scapy session to use. If None, the console one will be used 

446 - pickleProto: pickle proto version (default: -1 = latest)""" 

447 from scapy import utils 

448 from scapy.config import conf, ConfClass 

449 if not fname: 

450 fname = conf.session 

451 if not fname: 

452 conf.session = fname = utils.get_temp_file(keep=True) 

453 log_interactive.info("Saving session into [%s]", fname) 

454 

455 if not session: 

456 if conf.interactive_shell in ["ipython", "ptipython"]: 

457 from IPython import get_ipython 

458 session = get_ipython().user_ns 

459 else: 

460 session = builtins.__dict__["scapy_session"] 

461 

462 if not session: 

463 log_interactive.error("No session found ?!") 

464 return 

465 

466 ignore = session.get("_scpybuiltins", []) 

467 hard_ignore = ["scapy_session", "In", "Out", "open"] 

468 to_be_saved = session.copy() 

469 

470 for k in list(to_be_saved): 

471 i = to_be_saved[k] 

472 if k[0] == "_": 

473 del to_be_saved[k] 

474 elif hasattr(i, "__module__") and i.__module__.startswith("IPython"): 

475 del to_be_saved[k] 

476 elif isinstance(i, ConfClass): 

477 del to_be_saved[k] 

478 elif k in ignore or k in hard_ignore: 

479 del to_be_saved[k] 

480 elif isinstance(i, (type, types.ModuleType, types.FunctionType)): 

481 if k[0] != "_": 

482 log_interactive.warning("[%s] (%s) can't be saved.", k, type(i)) 

483 del to_be_saved[k] 

484 else: 

485 try: 

486 pickle.dumps(i) 

487 except Exception: 

488 log_interactive.warning("[%s] (%s) can't be saved.", k, type(i)) 

489 

490 try: 

491 os.rename(fname, fname + ".bak") 

492 except OSError: 

493 pass 

494 

495 f = gzip.open(fname, "wb") 

496 pickle.dump(to_be_saved, f, pickleProto) 

497 f.close() 

498 

499 

500def load_session(fname=None): 

501 # type: (Optional[Union[str, None]]) -> None 

502 """Load current Scapy session from the file specified in the fname arg. 

503 This will erase any existing session. 

504 

505 params: 

506 - fname: file to load the scapy session from""" 

507 from scapy.config import conf 

508 if fname is None: 

509 fname = conf.session 

510 try: 

511 s = pickle.load(gzip.open(fname, "rb")) 

512 except IOError: 

513 try: 

514 s = pickle.load(open(fname, "rb")) 

515 except IOError: 

516 # Raise "No such file exception" 

517 raise 

518 

519 scapy_session = builtins.__dict__["scapy_session"] 

520 s.update({k: scapy_session[k] for k in scapy_session["_scpybuiltins"]}) 

521 scapy_session.clear() 

522 scapy_session.update(s) 

523 update_ipython_session(scapy_session) 

524 

525 log_loading.info("Loaded session [%s]", fname) 

526 

527 

528def update_session(fname=None): 

529 # type: (Optional[Union[str, None]]) -> None 

530 """Update current Scapy session from the file specified in the fname arg. 

531 

532 params: 

533 - fname: file to load the scapy session from""" 

534 from scapy.config import conf 

535 if fname is None: 

536 fname = conf.session 

537 try: 

538 s = pickle.load(gzip.open(fname, "rb")) 

539 except IOError: 

540 s = pickle.load(open(fname, "rb")) 

541 scapy_session = builtins.__dict__["scapy_session"] 

542 scapy_session.update(s) 

543 update_ipython_session(scapy_session) 

544 

545 

546@overload 

547def init_session(session_name, # type: Optional[Union[str, None]] 

548 mydict, # type: Optional[Union[Dict[str, Any], None]] 

549 ret, # type: Literal[True] 

550 ): 

551 # type: (...) -> Dict[str, Any] 

552 pass 

553 

554 

555@overload 

556def init_session(session_name, # type: Optional[Union[str, None]] 

557 mydict=None, # type: Optional[Union[Dict[str, Any], None]] 

558 ret=False, # type: Literal[False] 

559 ): 

560 # type: (...) -> None 

561 pass 

562 

563 

564def init_session(session_name, # type: Optional[Union[str, None]] 

565 mydict=None, # type: Optional[Union[Dict[str, Any], None]] 

566 ret=False, # type: bool 

567 ): 

568 # type: (...) -> Union[Dict[str, Any], None] 

569 from scapy.config import conf 

570 SESSION = {} # type: Optional[Dict[str, Any]] 

571 

572 # Load Scapy 

573 scapy_builtins = _scapy_builtins() 

574 

575 # Load exts 

576 scapy_builtins.update(_scapy_exts()) 

577 

578 if session_name: 

579 try: 

580 os.stat(session_name) 

581 except OSError: 

582 log_loading.info("New session [%s]", session_name) 

583 else: 

584 try: 

585 try: 

586 SESSION = pickle.load(gzip.open(session_name, "rb")) 

587 except IOError: 

588 SESSION = pickle.load(open(session_name, "rb")) 

589 log_loading.info("Using existing session [%s]", session_name) 

590 except ValueError: 

591 msg = "Error opening Python3 pickled session on Python2 [%s]" 

592 log_loading.error(msg, session_name) 

593 except EOFError: 

594 log_loading.error("Error opening session [%s]", session_name) 

595 except AttributeError: 

596 log_loading.error("Error opening session [%s]. " 

597 "Attribute missing", session_name) 

598 

599 if SESSION: 

600 if "conf" in SESSION: 

601 conf.configure(SESSION["conf"]) 

602 conf.session = session_name 

603 SESSION["conf"] = conf 

604 else: 

605 conf.session = session_name 

606 else: 

607 conf.session = session_name 

608 SESSION = {"conf": conf} 

609 else: 

610 SESSION = {"conf": conf} 

611 

612 SESSION.update(scapy_builtins) 

613 SESSION["_scpybuiltins"] = scapy_builtins.keys() 

614 builtins.__dict__["scapy_session"] = SESSION 

615 

616 if mydict is not None: 

617 builtins.__dict__["scapy_session"].update(mydict) 

618 update_ipython_session(mydict) 

619 if ret: 

620 return SESSION 

621 return None 

622 

623################ 

624# Main # 

625################ 

626 

627 

628def _prepare_quote(quote, author, max_len=78): 

629 # type: (str, str, int) -> List[str] 

630 """This function processes a quote and returns a string that is ready 

631to be used in the fancy banner. 

632 

633 """ 

634 _quote = quote.split(' ') 

635 max_len -= 6 

636 lines = [] 

637 cur_line = [] # type: List[str] 

638 

639 def _len(line): 

640 # type: (List[str]) -> int 

641 return sum(len(elt) for elt in line) + len(line) - 1 

642 while _quote: 

643 if not cur_line or (_len(cur_line) + len(_quote[0]) - 1 <= max_len): 

644 cur_line.append(_quote.pop(0)) 

645 continue 

646 lines.append(' | %s' % ' '.join(cur_line)) 

647 cur_line = [] 

648 if cur_line: 

649 lines.append(' | %s' % ' '.join(cur_line)) 

650 cur_line = [] 

651 lines.append(' | %s-- %s' % (" " * (max_len - len(author) - 5), author)) 

652 return lines 

653 

654 

655def get_fancy_banner(mini: Optional[bool] = None) -> str: 

656 """ 

657 Generates the fancy Scapy banner 

658 

659 :param mini: if set, force a mini banner or not. Otherwise detect 

660 """ 

661 from scapy.config import conf 

662 from scapy.utils import get_terminal_width 

663 if mini is None: 

664 mini_banner = (get_terminal_width() or 84) <= 75 

665 else: 

666 mini_banner = mini 

667 

668 the_logo = [ 

669 " ", 

670 " aSPY//YASa ", 

671 " apyyyyCY//////////YCa ", 

672 " sY//////YSpcs scpCY//Pp ", 

673 " ayp ayyyyyyySCP//Pp syY//C ", 

674 " AYAsAYYYYYYYY///Ps cY//S", 

675 " pCCCCY//p cSSps y//Y", 

676 " SPPPP///a pP///AC//Y", 

677 " A//A cyP////C", 

678 " p///Ac sC///a", 

679 " P////YCpc A//A", 

680 " scccccp///pSP///p p//Y", 

681 " sY/////////y caa S//P", 

682 " cayCyayP//Ya pY/Ya", 

683 " sY/PsY////YCc aC//Yp ", 

684 " sc sccaCY//PCypaapyCP//YSs ", 

685 " spCPY//////YPSps ", 

686 " ccaacs ", 

687 " ", 

688 ] 

689 

690 # Used on mini screens 

691 the_logo_mini = [ 

692 " .SYPACCCSASYY ", 

693 "P /SCS/CCS ACS", 

694 " /A AC", 

695 " A/PS /SPPS", 

696 " YP (SC", 

697 " SPS/A. SC", 

698 " Y/PACC PP", 

699 " PY*AYC CAA", 

700 " YYCY//SCYP ", 

701 ] 

702 

703 the_banner = [ 

704 "", 

705 "", 

706 " |", 

707 " | Welcome to Scapy", 

708 " | Version %s" % conf.version, 

709 " |", 

710 " | https://github.com/secdev/scapy", 

711 " |", 

712 " | Have fun!", 

713 " |", 

714 ] 

715 

716 if mini_banner: 

717 the_logo = the_logo_mini 

718 the_banner = [x[2:] for x in the_banner[3:-1]] 

719 the_banner = [""] + the_banner + [""] 

720 else: 

721 quote, author = choice(QUOTES) 

722 the_banner.extend(_prepare_quote(quote, author, max_len=39)) 

723 the_banner.append(" |") 

724 return "\n".join( 

725 logo + banner for logo, banner in zip_longest( 

726 (conf.color_theme.logo(line) for line in the_logo), 

727 (conf.color_theme.success(line) for line in the_banner), 

728 fillvalue="" 

729 ) 

730 ) 

731 

732 

733def interact(mydict=None, argv=None, mybanner=None, loglevel=logging.INFO): 

734 # type: (Optional[Any], Optional[Any], Optional[Any], int) -> None 

735 """ 

736 Starts Scapy's console. 

737 """ 

738 # We're in interactive mode, let's throw the DeprecationWarnings 

739 warnings.simplefilter("always") 

740 

741 # Set interactive mode, load the color scheme 

742 from scapy.config import conf 

743 conf.interactive = True 

744 conf.color_theme = DefaultTheme() 

745 if loglevel is not None: 

746 conf.logLevel = loglevel 

747 

748 STARTUP_FILE = DEFAULT_STARTUP_FILE 

749 PRESTART_FILE = DEFAULT_PRESTART_FILE 

750 

751 session_name = None 

752 

753 if argv is None: 

754 argv = sys.argv 

755 

756 try: 

757 opts = getopt.getopt(argv[1:], "hs:Cc:Pp:d:H") 

758 for opt, param in opts[0]: 

759 if opt == "-h": 

760 _usage() 

761 elif opt == "-H": 

762 conf.fancy_banner = False 

763 conf.verb = 1 

764 conf.logLevel = logging.WARNING 

765 elif opt == "-s": 

766 session_name = param 

767 elif opt == "-c": 

768 STARTUP_FILE = param 

769 elif opt == "-C": 

770 STARTUP_FILE = None 

771 elif opt == "-p": 

772 PRESTART_FILE = param 

773 elif opt == "-P": 

774 PRESTART_FILE = None 

775 elif opt == "-d": 

776 conf.logLevel = max(1, conf.logLevel - 10) 

777 

778 if len(opts[1]) > 0: 

779 raise getopt.GetoptError( 

780 "Too many parameters : [%s]" % " ".join(opts[1]) 

781 ) 

782 

783 except getopt.GetoptError as msg: 

784 log_loading.error(msg) 

785 sys.exit(1) 

786 

787 # Reset sys.argv, otherwise IPython thinks it is for him 

788 sys.argv = sys.argv[:1] 

789 

790 if PRESTART_FILE: 

791 _read_config_file( 

792 PRESTART_FILE, 

793 interactive=True, 

794 _locals=_scapy_prestart_builtins(), 

795 default=DEFAULT_PRESTART, 

796 ) 

797 

798 SESSION = init_session(session_name, mydict=mydict, ret=True) 

799 

800 if STARTUP_FILE: 

801 _read_config_file( 

802 STARTUP_FILE, 

803 interactive=True, 

804 _locals=SESSION 

805 ) 

806 

807 if conf.fancy_banner: 

808 banner_text = get_fancy_banner() 

809 else: 

810 banner_text = "Welcome to Scapy (%s)" % conf.version 

811 if mybanner is not None: 

812 banner_text += "\n" 

813 banner_text += mybanner 

814 

815 # Make sure the history file has proper permissions 

816 try: 

817 if not pathlib.Path(conf.histfile).exists(): 

818 pathlib.Path(conf.histfile).touch() 

819 _check_perms(conf.histfile) 

820 except OSError: 

821 pass 

822 

823 # Configure interactive terminal 

824 

825 if conf.interactive_shell not in [ 

826 "ipython", 

827 "python", 

828 "ptpython", 

829 "ptipython", 

830 "bpython", 

831 "auto"]: 

832 log_loading.warning("Unknown conf.interactive_shell ! Using 'auto'") 

833 conf.interactive_shell = "auto" 

834 

835 # Auto detect available shells. 

836 # Order: 

837 # 1. IPython 

838 # 2. bpython 

839 # 3. ptpython 

840 

841 _IMPORTS = { 

842 "ipython": ["IPython"], 

843 "bpython": ["bpython"], 

844 "ptpython": ["ptpython"], 

845 "ptipython": ["IPython", "ptpython"], 

846 } 

847 

848 if conf.interactive_shell == "auto": 

849 # Auto detect 

850 for imp in ["IPython", "bpython", "ptpython"]: 

851 try: 

852 importlib.import_module(imp) 

853 conf.interactive_shell = imp.lower() 

854 break 

855 except ImportError: 

856 continue 

857 else: 

858 log_loading.warning( 

859 "No alternative Python interpreters found ! " 

860 "Using standard Python shell instead." 

861 ) 

862 conf.interactive_shell = "python" 

863 

864 if conf.interactive_shell in _IMPORTS: 

865 # Check import 

866 for imp in _IMPORTS[conf.interactive_shell]: 

867 try: 

868 importlib.import_module(imp) 

869 except ImportError: 

870 log_loading.warning("%s requested but not found !" % imp) 

871 conf.interactive_shell = "python" 

872 

873 # Default shell 

874 if conf.interactive_shell == "python": 

875 disabled = ["History"] 

876 if WINDOWS: 

877 disabled.append("Colors") 

878 conf.color_theme = BlackAndWhite() 

879 else: 

880 try: 

881 # Bad completer.. but better than nothing 

882 import rlcompleter 

883 import readline 

884 readline.set_completer( 

885 rlcompleter.Completer(namespace=SESSION).complete 

886 ) 

887 readline.parse_and_bind('tab: complete') 

888 except ImportError: 

889 disabled.insert(0, "AutoCompletion") 

890 # Display warning when using the default REPL 

891 log_loading.info( 

892 "Using the default Python shell: %s %s disabled." % ( 

893 ",".join(disabled), 

894 "is" if len(disabled) == 1 else "are" 

895 ) 

896 ) 

897 

898 # ptpython configure function 

899 def ptpython_configure(repl): 

900 # type: (Any) -> None 

901 # Hide status bar 

902 repl.show_status_bar = False 

903 # Complete while typing (versus only when pressing tab) 

904 repl.complete_while_typing = False 

905 # Enable auto-suggestions 

906 repl.enable_auto_suggest = True 

907 # Disable exit confirmation 

908 repl.confirm_exit = False 

909 # Show signature 

910 repl.show_signature = True 

911 # Apply Scapy color theme: TODO 

912 # repl.install_ui_colorscheme("scapy", 

913 # Style.from_dict(_custom_ui_colorscheme)) 

914 # repl.use_ui_colorscheme("scapy") 

915 

916 # Extend banner text 

917 if conf.interactive_shell in ["ipython", "ptipython"]: 

918 import IPython 

919 if conf.interactive_shell == "ptipython": 

920 banner = banner_text + " using IPython %s" % IPython.__version__ 

921 try: 

922 from importlib.metadata import version 

923 ptpython_version = " " + version('ptpython') 

924 except ImportError: 

925 ptpython_version = "" 

926 banner += " and ptpython%s" % ptpython_version 

927 else: 

928 banner = banner_text + " using IPython %s" % IPython.__version__ 

929 elif conf.interactive_shell == "ptpython": 

930 try: 

931 from importlib.metadata import version 

932 ptpython_version = " " + version('ptpython') 

933 except ImportError: 

934 ptpython_version = "" 

935 banner = banner_text + " using ptpython%s" % ptpython_version 

936 elif conf.interactive_shell == "bpython": 

937 import bpython 

938 banner = banner_text + " using bpython %s" % bpython.__version__ 

939 

940 # Start IPython or ptipython 

941 if conf.interactive_shell in ["ipython", "ptipython"]: 

942 banner += "\n" 

943 if conf.interactive_shell == "ptipython": 

944 from ptpython.ipython import embed 

945 else: 

946 from IPython import embed 

947 try: 

948 from traitlets.config.loader import Config 

949 except ImportError: 

950 log_loading.warning( 

951 "traitlets not available. Some Scapy shell features won't be " 

952 "available." 

953 ) 

954 try: 

955 embed( 

956 display_banner=False, 

957 user_ns=SESSION, 

958 exec_lines=["print(\"\"\"" + banner + "\"\"\")"] 

959 ) 

960 except Exception: 

961 code.interact(banner=banner_text, local=SESSION) 

962 else: 

963 cfg = Config() 

964 try: 

965 from IPython import get_ipython 

966 if not get_ipython(): 

967 raise ImportError 

968 except ImportError: 

969 # Set "classic" prompt style when launched from 

970 # run_scapy(.bat) files Register and apply scapy 

971 # color+prompt style 

972 apply_ipython_style(shell=cfg.InteractiveShellEmbed) 

973 cfg.InteractiveShellEmbed.confirm_exit = False 

974 cfg.InteractiveShellEmbed.separate_in = u'' 

975 if int(IPython.__version__[0]) >= 6: 

976 cfg.InteractiveShellEmbed.term_title = True 

977 cfg.InteractiveShellEmbed.term_title_format = ("Scapy %s" % 

978 conf.version) 

979 # As of IPython 6-7, the jedi completion module is a dumpster 

980 # of fire that should be scrapped never to be seen again. 

981 # This is why the following defaults to False. Feel free to hurt 

982 # yourself (#GH4056) :P 

983 cfg.Completer.use_jedi = conf.ipython_use_jedi 

984 else: 

985 cfg.InteractiveShellEmbed.term_title = False 

986 cfg.HistoryAccessor.hist_file = conf.histfile 

987 cfg.InteractiveShell.banner1 = banner 

988 if conf.verb < 2: 

989 cfg.InteractiveShellEmbed.enable_tip = False 

990 # configuration can thus be specified here. 

991 _kwargs = {} 

992 if conf.interactive_shell == "ptipython": 

993 _kwargs["configure"] = ptpython_configure 

994 try: 

995 embed(config=cfg, user_ns=SESSION, **_kwargs) 

996 except (AttributeError, TypeError): 

997 code.interact(banner=banner_text, local=SESSION) 

998 # Start ptpython 

999 elif conf.interactive_shell == "ptpython": 

1000 # ptpython has special, non-default handling of __repr__ which breaks Scapy. 

1001 # For instance: >>> IP() 

1002 log_loading.warning("ptpython support is currently partially broken") 

1003 from ptpython.repl import embed 

1004 # ptpython has no banner option 

1005 banner += "\n" 

1006 print(banner) 

1007 embed( 

1008 locals=SESSION, 

1009 history_filename=conf.histfile, 

1010 title="Scapy %s" % conf.version, 

1011 configure=ptpython_configure 

1012 ) 

1013 # Start bpython 

1014 elif conf.interactive_shell == "bpython": 

1015 from bpython.curtsies import main as embed 

1016 embed( 

1017 args=["-q", "-i"], 

1018 locals_=SESSION, 

1019 banner=banner, 

1020 welcome_message="" 

1021 ) 

1022 # Start Python 

1023 elif conf.interactive_shell == "python": 

1024 code.interact(banner=banner_text, local=SESSION) 

1025 else: 

1026 raise ValueError("Invalid conf.interactive_shell") 

1027 

1028 if conf.session: 

1029 save_session(conf.session, SESSION) 

1030 

1031 

1032if __name__ == "__main__": 

1033 interact()