Coverage for /pythoncovmergedfiles/medio/medio/usr/local/lib/python3.11/site-packages/scapy/utils.py: 35%

Shortcuts on this page

r m x   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

2028 statements  

1# SPDX-License-Identifier: GPL-2.0-only 

2# This file is part of Scapy 

3# See https://scapy.net/ for more information 

4# Copyright (C) Philippe Biondi <phil@secdev.org> 

5 

6""" 

7General utility functions. 

8""" 

9 

10 

11from decimal import Decimal 

12from io import StringIO 

13from itertools import zip_longest 

14from uuid import UUID 

15 

16import argparse 

17import array 

18import base64 

19import collections 

20import decimal 

21import difflib 

22import enum 

23import gzip 

24import inspect 

25import locale 

26import math 

27import os 

28import pickle 

29import random 

30import re 

31import shutil 

32import socket 

33import struct 

34import subprocess 

35import sys 

36import tempfile 

37import threading 

38import time 

39import traceback 

40import warnings 

41 

42from scapy.config import conf 

43from scapy.consts import DARWIN, OPENBSD, WINDOWS 

44from scapy.data import MTU, DLT_EN10MB, DLT_RAW 

45from scapy.compat import ( 

46 orb, 

47 plain_str, 

48 chb, 

49 hex_bytes, 

50 bytes_encode, 

51) 

52from scapy.error import ( 

53 log_interactive, 

54 log_runtime, 

55 Scapy_Exception, 

56 warning, 

57) 

58from scapy.pton_ntop import inet_pton 

59 

60# Typing imports 

61from typing import ( 

62 cast, 

63 Any, 

64 AnyStr, 

65 Callable, 

66 Dict, 

67 IO, 

68 Iterator, 

69 List, 

70 Optional, 

71 TYPE_CHECKING, 

72 Tuple, 

73 Type, 

74 Union, 

75 overload, 

76) 

77from scapy.compat import ( 

78 DecoratorCallable, 

79 Literal, 

80) 

81 

82if TYPE_CHECKING: 

83 from scapy.packet import Packet 

84 from scapy.plist import _PacketIterable, PacketList 

85 from scapy.supersocket import SuperSocket 

86 import prompt_toolkit 

87 

88_ByteStream = Union[IO[bytes], gzip.GzipFile] 

89 

90########### 

91# Tools # 

92########### 

93 

94 

95def issubtype(x, # type: Any 

96 t, # type: Union[type, str] 

97 ): 

98 # type: (...) -> bool 

99 """issubtype(C, B) -> bool 

100 

101 Return whether C is a class and if it is a subclass of class B. 

102 When using a tuple as the second argument issubtype(X, (A, B, ...)), 

103 is a shortcut for issubtype(X, A) or issubtype(X, B) or ... (etc.). 

104 """ 

105 if isinstance(t, str): 

106 return t in (z.__name__ for z in x.__bases__) 

107 if isinstance(x, type) and issubclass(x, t): 

108 return True 

109 return False 

110 

111 

112_Decimal = Union[Decimal, int] 

113 

114 

115class EDecimal(Decimal): 

116 """Extended Decimal 

117 

118 This implements arithmetic and comparison with float for 

119 backward compatibility 

120 """ 

121 

122 def __add__(self, other, context=None): 

123 # type: (_Decimal, Any) -> EDecimal 

124 return EDecimal(Decimal.__add__(self, Decimal(other))) 

125 

126 def __radd__(self, other): 

127 # type: (_Decimal) -> EDecimal 

128 return EDecimal(Decimal.__add__(self, Decimal(other))) 

129 

130 def __sub__(self, other): 

131 # type: (_Decimal) -> EDecimal 

132 return EDecimal(Decimal.__sub__(self, Decimal(other))) 

133 

134 def __rsub__(self, other): 

135 # type: (_Decimal) -> EDecimal 

136 return EDecimal(Decimal.__rsub__(self, Decimal(other))) 

137 

138 def __mul__(self, other): 

139 # type: (_Decimal) -> EDecimal 

140 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

141 

142 def __rmul__(self, other): 

143 # type: (_Decimal) -> EDecimal 

144 return EDecimal(Decimal.__mul__(self, Decimal(other))) 

145 

146 def __truediv__(self, other): 

147 # type: (_Decimal) -> EDecimal 

148 return EDecimal(Decimal.__truediv__(self, Decimal(other))) 

149 

150 def __floordiv__(self, other): 

151 # type: (_Decimal) -> EDecimal 

152 return EDecimal(Decimal.__floordiv__(self, Decimal(other))) 

153 

154 def __divmod__(self, other): 

155 # type: (_Decimal) -> Tuple[EDecimal, EDecimal] 

156 r = Decimal.__divmod__(self, Decimal(other)) 

157 return EDecimal(r[0]), EDecimal(r[1]) 

158 

159 def __mod__(self, other): 

160 # type: (_Decimal) -> EDecimal 

161 return EDecimal(Decimal.__mod__(self, Decimal(other))) 

162 

163 def __rmod__(self, other): 

164 # type: (_Decimal) -> EDecimal 

165 return EDecimal(Decimal.__rmod__(self, Decimal(other))) 

166 

167 def __pow__(self, other, modulo=None): 

168 # type: (_Decimal, Optional[_Decimal]) -> EDecimal 

169 return EDecimal(Decimal.__pow__(self, Decimal(other), modulo)) 

170 

171 def __eq__(self, other): 

172 # type: (Any) -> bool 

173 if isinstance(other, Decimal): 

174 return super(EDecimal, self).__eq__(other) 

175 else: 

176 return bool(float(self) == other) 

177 

178 def normalize(self, precision): # type: ignore 

179 # type: (int) -> EDecimal 

180 with decimal.localcontext() as ctx: 

181 ctx.prec = precision 

182 return EDecimal(super(EDecimal, self).normalize(ctx)) 

183 

184 

185@overload 

186def get_temp_file(keep, autoext, fd): 

187 # type: (bool, str, Literal[True]) -> IO[bytes] 

188 pass 

189 

190 

191@overload 

192def get_temp_file(keep=False, autoext="", fd=False): 

193 # type: (bool, str, Literal[False]) -> str 

194 pass 

195 

196 

197def get_temp_file(keep=False, autoext="", fd=False): 

198 # type: (bool, str, bool) -> Union[IO[bytes], str] 

199 """Creates a temporary file. 

200 

201 :param keep: If False, automatically delete the file when Scapy exits. 

202 :param autoext: Suffix to add to the generated file name. 

203 :param fd: If True, this returns a file-like object with the temporary 

204 file opened. If False (default), this returns a file path. 

205 """ 

206 f = tempfile.NamedTemporaryFile(prefix="scapy", suffix=autoext, 

207 delete=False) 

208 if not keep: 

209 conf.temp_files.append(f.name) 

210 

211 if fd: 

212 return f 

213 else: 

214 # Close the file so something else can take it. 

215 f.close() 

216 return f.name 

217 

218 

219def get_temp_dir(keep=False): 

220 # type: (bool) -> str 

221 """Creates a temporary file, and returns its name. 

222 

223 :param keep: If False (default), the directory will be recursively 

224 deleted when Scapy exits. 

225 :return: A full path to a temporary directory. 

226 """ 

227 

228 dname = tempfile.mkdtemp(prefix="scapy") 

229 

230 if not keep: 

231 conf.temp_files.append(dname) 

232 

233 return dname 

234 

235 

236def _create_fifo() -> Tuple[str, Any]: 

237 """Creates a temporary fifo. 

238 

239 You must then use open_fifo() on the server_fd once 

240 the client is connected to use it. 

241 

242 :returns: (client_file, server_fd) 

243 """ 

244 if WINDOWS: 

245 from scapy.arch.windows.structures import _get_win_fifo 

246 return _get_win_fifo() 

247 else: 

248 f = get_temp_file() 

249 os.unlink(f) 

250 os.mkfifo(f) 

251 return f, f 

252 

253 

254def _open_fifo(fd: Any, mode: str = "rb") -> IO[bytes]: 

255 """Open the server_fd (see create_fifo) 

256 """ 

257 if WINDOWS: 

258 from scapy.arch.windows.structures import _win_fifo_open 

259 return _win_fifo_open(fd) 

260 else: 

261 return open(fd, mode) 

262 

263 

264def sane(x, color=False): 

265 # type: (AnyStr, bool) -> str 

266 r = "" 

267 for i in x: 

268 j = orb(i) 

269 if (j < 32) or (j >= 127): 

270 if color: 

271 r += conf.color_theme.not_printable(".") 

272 else: 

273 r += "." 

274 else: 

275 r += chr(j) 

276 return r 

277 

278 

279@conf.commands.register 

280def restart(): 

281 # type: () -> None 

282 """Restarts scapy""" 

283 if not conf.interactive or not os.path.isfile(sys.argv[0]): 

284 raise OSError("Scapy was not started from console") 

285 if WINDOWS: 

286 res_code = 1 

287 try: 

288 res_code = subprocess.call([sys.executable] + sys.argv) 

289 finally: 

290 os._exit(res_code) 

291 os.execv(sys.executable, [sys.executable] + sys.argv) 

292 

293 

294def lhex(x): 

295 # type: (Any) -> str 

296 from scapy.volatile import VolatileValue 

297 if isinstance(x, VolatileValue): 

298 return repr(x) 

299 if isinstance(x, int): 

300 return hex(x) 

301 if isinstance(x, tuple): 

302 return "(%s)" % ", ".join(lhex(v) for v in x) 

303 if isinstance(x, list): 

304 return "[%s]" % ", ".join(lhex(v) for v in x) 

305 return str(x) 

306 

307 

308@conf.commands.register 

309def hexdump(p, dump=False): 

310 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

311 """Build a tcpdump like hexadecimal view 

312 

313 :param p: a Packet 

314 :param dump: define if the result must be printed or returned in a variable 

315 :return: a String only when dump=True 

316 """ 

317 s = "" 

318 x = bytes_encode(p) 

319 x_len = len(x) 

320 i = 0 

321 while i < x_len: 

322 s += "%04x " % i 

323 for j in range(16): 

324 if i + j < x_len: 

325 s += "%02X " % orb(x[i + j]) 

326 else: 

327 s += " " 

328 s += " %s\n" % sane(x[i:i + 16], color=True) 

329 i += 16 

330 # remove trailing \n 

331 s = s[:-1] if s.endswith("\n") else s 

332 if dump: 

333 return s 

334 else: 

335 print(s) 

336 return None 

337 

338 

339@conf.commands.register 

340def linehexdump(p, onlyasc=0, onlyhex=0, dump=False): 

341 # type: (Union[Packet, AnyStr], int, int, bool) -> Optional[str] 

342 """Build an equivalent view of hexdump() on a single line 

343 

344 Note that setting both onlyasc and onlyhex to 1 results in a empty output 

345 

346 :param p: a Packet 

347 :param onlyasc: 1 to display only the ascii view 

348 :param onlyhex: 1 to display only the hexadecimal view 

349 :param dump: print the view if False 

350 :return: a String only when dump=True 

351 """ 

352 s = "" 

353 s = hexstr(p, onlyasc=onlyasc, onlyhex=onlyhex, color=not dump) 

354 if dump: 

355 return s 

356 else: 

357 print(s) 

358 return None 

359 

360 

361@conf.commands.register 

362def chexdump(p, dump=False): 

363 # type: (Union[Packet, AnyStr], bool) -> Optional[str] 

364 """Build a per byte hexadecimal representation 

365 

366 Example: 

367 >>> chexdump(IP()) 

368 0x45, 0x00, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x40, 0x00, 0x7c, 0xe7, 0x7f, 0x00, 0x00, 0x01, 0x7f, 0x00, 0x00, 0x01 # noqa: E501 

369 

370 :param p: a Packet 

371 :param dump: print the view if False 

372 :return: a String only if dump=True 

373 """ 

374 x = bytes_encode(p) 

375 s = ", ".join("%#04x" % orb(x) for x in x) 

376 if dump: 

377 return s 

378 else: 

379 print(s) 

380 return None 

381 

382 

383@conf.commands.register 

384def hexstr(p, onlyasc=0, onlyhex=0, color=False): 

385 # type: (Union[Packet, AnyStr], int, int, bool) -> str 

386 """Build a fancy tcpdump like hex from bytes.""" 

387 x = bytes_encode(p) 

388 s = [] 

389 if not onlyasc: 

390 s.append(" ".join("%02X" % orb(b) for b in x)) 

391 if not onlyhex: 

392 s.append(sane(x, color=color)) 

393 return " ".join(s) 

394 

395 

396def repr_hex(s): 

397 # type: (bytes) -> str 

398 """ Convert provided bitstring to a simple string of hex digits """ 

399 return "".join("%02x" % orb(x) for x in s) 

400 

401 

402@conf.commands.register 

403def hexdiff( 

404 a: Union['Packet', AnyStr], 

405 b: Union['Packet', AnyStr], 

406 algo: Optional[str] = None, 

407 autojunk: bool = False, 

408) -> None: 

409 """ 

410 Show differences between 2 binary strings, Packets... 

411 

412 Available algorithms: 

413 - wagnerfischer: Use the Wagner and Fischer algorithm to compute the 

414 Levenstein distance between the strings then backtrack. 

415 - difflib: Use the difflib.SequenceMatcher implementation. This based on a 

416 modified version of the Ratcliff and Obershelp algorithm. 

417 This is much faster, but far less accurate. 

418 https://docs.python.org/3.8/library/difflib.html#difflib.SequenceMatcher 

419 

420 :param a: 

421 :param b: The binary strings, packets... to compare 

422 :param algo: Force the algo to be 'wagnerfischer' or 'difflib'. 

423 By default, this is chosen depending on the complexity, optimistically 

424 preferring wagnerfischer unless really necessary. 

425 :param autojunk: (difflib only) See difflib documentation. 

426 """ 

427 xb = bytes_encode(a) 

428 yb = bytes_encode(b) 

429 

430 if algo is None: 

431 # Choose the best algorithm 

432 complexity = len(xb) * len(yb) 

433 if complexity < 1e7: 

434 # Comparing two (non-jumbos) Ethernet packets is ~2e6 which is manageable. 

435 # Anything much larger than this shouldn't be attempted by default. 

436 algo = "wagnerfischer" 

437 if complexity > 1e6: 

438 log_interactive.info( 

439 "Complexity is a bit high. hexdiff will take a few seconds." 

440 ) 

441 else: 

442 algo = "difflib" 

443 

444 backtrackx = [] 

445 backtracky = [] 

446 

447 if algo == "wagnerfischer": 

448 xb = xb[::-1] 

449 yb = yb[::-1] 

450 

451 # costs for the 3 operations 

452 INSERT = 1 

453 DELETE = 1 

454 SUBST = 1 

455 

456 # Typically, d[i,j] will hold the distance between 

457 # the first i characters of xb and the first j characters of yb. 

458 # We change the Wagner Fischer to also store pointers to all 

459 # the intermediate steps taken while calculating the Levenstein distance. 

460 d = {(-1, -1): (0, (-1, -1))} 

461 for j in range(len(yb)): 

462 d[-1, j] = (j + 1) * INSERT, (-1, j - 1) 

463 for i in range(len(xb)): 

464 d[i, -1] = (i + 1) * INSERT + 1, (i - 1, -1) 

465 

466 # Compute the Levenstein distance between the two strings, but 

467 # store all the steps to be able to backtrack at the end. 

468 for j in range(len(yb)): 

469 for i in range(len(xb)): 

470 d[i, j] = min( 

471 (d[i - 1, j - 1][0] + SUBST * (xb[i] != yb[j]), (i - 1, j - 1)), 

472 (d[i - 1, j][0] + DELETE, (i - 1, j)), 

473 (d[i, j - 1][0] + INSERT, (i, j - 1)), 

474 ) 

475 

476 # Iterate through the steps backwards to create the diff 

477 i = len(xb) - 1 

478 j = len(yb) - 1 

479 while not (i == j == -1): 

480 i2, j2 = d[i, j][1] 

481 backtrackx.append(xb[i2 + 1:i + 1]) 

482 backtracky.append(yb[j2 + 1:j + 1]) 

483 i, j = i2, j2 

484 elif algo == "difflib": 

485 sm = difflib.SequenceMatcher(a=xb, b=yb, autojunk=autojunk) 

486 xarr = [xb[i:i + 1] for i in range(len(xb))] 

487 yarr = [yb[i:i + 1] for i in range(len(yb))] 

488 # Iterate through opcodes to build the backtrack 

489 for opcode in sm.get_opcodes(): 

490 typ, x0, x1, y0, y1 = opcode 

491 if typ == 'delete': 

492 backtrackx += xarr[x0:x1] 

493 backtracky += [b''] * (x1 - x0) 

494 elif typ == 'insert': 

495 backtrackx += [b''] * (y1 - y0) 

496 backtracky += yarr[y0:y1] 

497 elif typ in ['equal', 'replace']: 

498 backtrackx += xarr[x0:x1] 

499 backtracky += yarr[y0:y1] 

500 # Some lines may have been considered as junk. Check the sizes 

501 if autojunk: 

502 lbx = len(backtrackx) 

503 lby = len(backtracky) 

504 backtrackx += [b''] * (max(lbx, lby) - lbx) 

505 backtracky += [b''] * (max(lbx, lby) - lby) 

506 else: 

507 raise ValueError("Unknown algorithm '%s'" % algo) 

508 

509 # Print the diff 

510 

511 x = y = i = 0 

512 colorize: Dict[int, Callable[[str], str]] = { 

513 0: lambda x: x, 

514 -1: conf.color_theme.left, 

515 1: conf.color_theme.right 

516 } 

517 

518 dox = 1 

519 doy = 0 

520 btx_len = len(backtrackx) 

521 while i < btx_len: 

522 linex = backtrackx[i:i + 16] 

523 liney = backtracky[i:i + 16] 

524 xx = sum(len(k) for k in linex) 

525 yy = sum(len(k) for k in liney) 

526 if dox and not xx: 

527 dox = 0 

528 doy = 1 

529 if dox and linex == liney: 

530 doy = 1 

531 

532 if dox: 

533 xd = y 

534 j = 0 

535 while not linex[j]: 

536 j += 1 

537 xd -= 1 

538 print(colorize[doy - dox]("%04x" % xd), end=' ') 

539 x += xx 

540 line = linex 

541 else: 

542 print(" ", end=' ') 

543 if doy: 

544 yd = y 

545 j = 0 

546 while not liney[j]: 

547 j += 1 

548 yd -= 1 

549 print(colorize[doy - dox]("%04x" % yd), end=' ') 

550 y += yy 

551 line = liney 

552 else: 

553 print(" ", end=' ') 

554 

555 print(" ", end=' ') 

556 

557 cl = "" 

558 for j in range(16): 

559 if i + j < min(len(backtrackx), len(backtracky)): 

560 if line[j]: 

561 col = colorize[(linex[j] != liney[j]) * (doy - dox)] 

562 print(col("%02X" % orb(line[j])), end=' ') 

563 if linex[j] == liney[j]: 

564 cl += sane(line[j], color=True) 

565 else: 

566 cl += col(sane(line[j])) 

567 else: 

568 print(" ", end=' ') 

569 cl += " " 

570 else: 

571 print(" ", end=' ') 

572 if j == 7: 

573 print("", end=' ') 

574 

575 print(" ", cl) 

576 

577 if doy or not yy: 

578 doy = 0 

579 dox = 1 

580 i += 16 

581 else: 

582 if yy: 

583 dox = 0 

584 doy = 1 

585 else: 

586 i += 16 

587 

588 

589if struct.pack("H", 1) == b"\x00\x01": # big endian 

590 checksum_endian_transform = lambda chk: chk # type: Callable[[int], int] 

591else: 

592 checksum_endian_transform = lambda chk: ((chk >> 8) & 0xff) | chk << 8 

593 

594 

595def checksum(pkt): 

596 # type: (bytes) -> int 

597 if len(pkt) % 2 == 1: 

598 pkt += b"\0" 

599 s = sum(array.array("H", pkt)) 

600 s = (s >> 16) + (s & 0xffff) 

601 s += s >> 16 

602 s = ~s 

603 return checksum_endian_transform(s) & 0xffff 

604 

605 

606def _fletcher16(charbuf): 

607 # type: (bytes) -> Tuple[int, int] 

608 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

609 c0 = c1 = 0 

610 for char in charbuf: 

611 c0 += char 

612 c1 += c0 

613 

614 c0 %= 255 

615 c1 %= 255 

616 return (c0, c1) 

617 

618 

619@conf.commands.register 

620def fletcher16_checksum(binbuf): 

621 # type: (bytes) -> int 

622 """Calculates Fletcher-16 checksum of the given buffer. 

623 

624 Note: 

625 If the buffer contains the two checkbytes derived from the Fletcher-16 checksum # noqa: E501 

626 the result of this function has to be 0. Otherwise the buffer has been corrupted. # noqa: E501 

627 """ 

628 (c0, c1) = _fletcher16(binbuf) 

629 return (c1 << 8) | c0 

630 

631 

632@conf.commands.register 

633def fletcher16_checkbytes(binbuf, offset): 

634 # type: (bytes, int) -> bytes 

635 """Calculates the Fletcher-16 checkbytes returned as 2 byte binary-string. 

636 

637 Including the bytes into the buffer (at the position marked by offset) the # noqa: E501 

638 global Fletcher-16 checksum of the buffer will be 0. Thus it is easy to verify # noqa: E501 

639 the integrity of the buffer on the receiver side. 

640 

641 For details on the algorithm, see RFC 2328 chapter 12.1.7 and RFC 905 Annex B. # noqa: E501 

642 """ 

643 

644 # This is based on the GPLed C implementation in Zebra <http://www.zebra.org/> # noqa: E501 

645 if len(binbuf) < offset: 

646 raise Exception("Packet too short for checkbytes %d" % len(binbuf)) 

647 

648 binbuf = binbuf[:offset] + b"\x00\x00" + binbuf[offset + 2:] 

649 (c0, c1) = _fletcher16(binbuf) 

650 

651 x = ((len(binbuf) - offset - 1) * c0 - c1) % 255 

652 

653 if (x <= 0): 

654 x += 255 

655 

656 y = 510 - c0 - x 

657 

658 if (y > 255): 

659 y -= 255 

660 return chb(x) + chb(y) 

661 

662 

663def mac2str(mac): 

664 # type: (str) -> bytes 

665 return b"".join(chb(int(x, 16)) for x in plain_str(mac).split(':')) 

666 

667 

668def valid_mac(mac): 

669 # type: (str) -> bool 

670 try: 

671 return len(mac2str(mac)) == 6 

672 except ValueError: 

673 pass 

674 return False 

675 

676 

677def str2mac(s): 

678 # type: (bytes) -> str 

679 if isinstance(s, str): 

680 return ("%02x:" * len(s))[:-1] % tuple(map(ord, s)) 

681 return ("%02x:" * len(s))[:-1] % tuple(s) 

682 

683 

684def randstring(length): 

685 # type: (int) -> bytes 

686 """ 

687 Returns a random string of length (length >= 0) 

688 """ 

689 return b"".join(struct.pack('B', random.randint(0, 255)) 

690 for _ in range(length)) 

691 

692 

693def zerofree_randstring(length): 

694 # type: (int) -> bytes 

695 """ 

696 Returns a random string of length (length >= 0) without zero in it. 

697 """ 

698 return b"".join(struct.pack('B', random.randint(1, 255)) 

699 for _ in range(length)) 

700 

701 

702def stror(s1, s2): 

703 # type: (bytes, bytes) -> bytes 

704 """ 

705 Returns the binary OR of the 2 provided strings s1 and s2. s1 and s2 

706 must be of same length. 

707 """ 

708 return b"".join(map(lambda x, y: struct.pack("!B", x | y), s1, s2)) 

709 

710 

711def strxor(s1, s2): 

712 # type: (bytes, bytes) -> bytes 

713 """ 

714 Returns the binary XOR of the 2 provided strings s1 and s2. s1 and s2 

715 must be of same length. 

716 """ 

717 return b"".join(map(lambda x, y: struct.pack("!B", x ^ y), s1, s2)) 

718 

719 

720def strand(s1, s2): 

721 # type: (bytes, bytes) -> bytes 

722 """ 

723 Returns the binary AND of the 2 provided strings s1 and s2. s1 and s2 

724 must be of same length. 

725 """ 

726 return b"".join(map(lambda x, y: struct.pack("!B", x & y), s1, s2)) 

727 

728 

729def strrot(s1, count, right=True): 

730 # type: (bytes, int, bool) -> bytes 

731 """ 

732 Rotate the binary by 'count' bytes 

733 """ 

734 off = count % len(s1) 

735 if right: 

736 return s1[-off:] + s1[:-off] 

737 else: 

738 return s1[off:] + s1[:off] 

739 

740 

741# Workaround bug 643005 : https://sourceforge.net/tracker/?func=detail&atid=105470&aid=643005&group_id=5470 # noqa: E501 

742try: 

743 socket.inet_aton("255.255.255.255") 

744except socket.error: 

745 def inet_aton(ip_string): 

746 # type: (str) -> bytes 

747 if ip_string == "255.255.255.255": 

748 return b"\xff" * 4 

749 else: 

750 return socket.inet_aton(ip_string) 

751else: 

752 inet_aton = socket.inet_aton # type: ignore 

753 

754inet_ntoa = socket.inet_ntoa 

755 

756 

757def atol(x): 

758 # type: (str) -> int 

759 try: 

760 ip = inet_aton(x) 

761 except socket.error: 

762 raise ValueError("Bad IP format: %s" % x) 

763 return cast(int, struct.unpack("!I", ip)[0]) 

764 

765 

766def valid_ip(addr): 

767 # type: (str) -> bool 

768 try: 

769 addr = plain_str(addr) 

770 except UnicodeDecodeError: 

771 return False 

772 try: 

773 atol(addr) 

774 except (OSError, ValueError, socket.error): 

775 return False 

776 return True 

777 

778 

779def valid_net(addr): 

780 # type: (str) -> bool 

781 try: 

782 addr = plain_str(addr) 

783 except UnicodeDecodeError: 

784 return False 

785 if '/' in addr: 

786 ip, mask = addr.split('/', 1) 

787 return valid_ip(ip) and mask.isdigit() and 0 <= int(mask) <= 32 

788 return valid_ip(addr) 

789 

790 

791def valid_ip6(addr): 

792 # type: (str) -> bool 

793 try: 

794 addr = plain_str(addr) 

795 except UnicodeDecodeError: 

796 return False 

797 try: 

798 inet_pton(socket.AF_INET6, addr) 

799 except socket.error: 

800 return False 

801 return True 

802 

803 

804def valid_net6(addr): 

805 # type: (str) -> bool 

806 try: 

807 addr = plain_str(addr) 

808 except UnicodeDecodeError: 

809 return False 

810 if '/' in addr: 

811 ip, mask = addr.split('/', 1) 

812 return valid_ip6(ip) and mask.isdigit() and 0 <= int(mask) <= 128 

813 return valid_ip6(addr) 

814 

815 

816def ltoa(x): 

817 # type: (int) -> str 

818 return inet_ntoa(struct.pack("!I", x & 0xffffffff)) 

819 

820 

821def itom(x): 

822 # type: (int) -> int 

823 return (0xffffffff00000000 >> x) & 0xffffffff 

824 

825 

826def in4_cidr2mask(m): 

827 # type: (int) -> bytes 

828 """ 

829 Return the mask (bitstring) associated with provided length 

830 value. For instance if function is called on 20, return value is 

831 b'\xff\xff\xf0\x00'. 

832 """ 

833 if m > 32 or m < 0: 

834 raise Scapy_Exception("value provided to in4_cidr2mask outside [0, 32] domain (%d)" % m) # noqa: E501 

835 

836 return strxor( 

837 b"\xff" * 4, 

838 struct.pack(">I", 2**(32 - m) - 1) 

839 ) 

840 

841 

842def in4_isincluded(addr, prefix, mask): 

843 # type: (str, str, int) -> bool 

844 """ 

845 Returns True when 'addr' belongs to prefix/mask. False otherwise. 

846 """ 

847 temp = inet_pton(socket.AF_INET, addr) 

848 pref = in4_cidr2mask(mask) 

849 zero = inet_pton(socket.AF_INET, prefix) 

850 return zero == strand(temp, pref) 

851 

852 

853def in4_ismaddr(str): 

854 # type: (str) -> bool 

855 """ 

856 Returns True if provided address in printable format belongs to 

857 allocated Multicast address space (224.0.0.0/4). 

858 """ 

859 return in4_isincluded(str, "224.0.0.0", 4) 

860 

861 

862def in4_ismlladdr(str): 

863 # type: (str) -> bool 

864 """ 

865 Returns True if address belongs to link-local multicast address 

866 space (224.0.0.0/24) 

867 """ 

868 return in4_isincluded(str, "224.0.0.0", 24) 

869 

870 

871def in4_ismgladdr(str): 

872 # type: (str) -> bool 

873 """ 

874 Returns True if address belongs to global multicast address 

875 space (224.0.1.0-238.255.255.255). 

876 """ 

877 return ( 

878 in4_isincluded(str, "224.0.0.0", 4) and 

879 not in4_isincluded(str, "224.0.0.0", 24) and 

880 not in4_isincluded(str, "239.0.0.0", 8) 

881 ) 

882 

883 

884def in4_ismlsaddr(str): 

885 # type: (str) -> bool 

886 """ 

887 Returns True if address belongs to limited scope multicast address 

888 space (239.0.0.0/8). 

889 """ 

890 return in4_isincluded(str, "239.0.0.0", 8) 

891 

892 

893def in4_isaddrllallnodes(str): 

894 # type: (str) -> bool 

895 """ 

896 Returns True if address is the link-local all-nodes multicast 

897 address (224.0.0.1). 

898 """ 

899 return (inet_pton(socket.AF_INET, "224.0.0.1") == 

900 inet_pton(socket.AF_INET, str)) 

901 

902 

903def in4_getnsmac(a): 

904 # type: (bytes) -> str 

905 """ 

906 Return the multicast mac address associated with provided 

907 IPv4 address. Passed address must be in network format. 

908 """ 

909 

910 return "01:00:5e:%.2x:%.2x:%.2x" % (a[1] & 0x7f, a[2], a[3]) 

911 

912 

913def decode_locale_str(x): 

914 # type: (bytes) -> str 

915 """ 

916 Decode bytes into a string using the system locale. 

917 Useful on Windows where it can be unusual (e.g. cp1252) 

918 """ 

919 return x.decode(encoding=locale.getlocale()[1] or "utf-8", errors="replace") 

920 

921 

922class ContextManagerSubprocess(object): 

923 """ 

924 Context manager that eases checking for unknown command, without 

925 crashing. 

926 

927 Example: 

928 >>> with ContextManagerSubprocess("tcpdump"): 

929 >>> subprocess.Popen(["tcpdump", "--version"]) 

930 ERROR: Could not execute tcpdump, is it installed? 

931 

932 """ 

933 

934 def __init__(self, prog, suppress=True): 

935 # type: (str, bool) -> None 

936 self.prog = prog 

937 self.suppress = suppress 

938 

939 def __enter__(self): 

940 # type: () -> None 

941 pass 

942 

943 def __exit__(self, 

944 exc_type, # type: Optional[type] 

945 exc_value, # type: Optional[Exception] 

946 traceback, # type: Optional[Any] 

947 ): 

948 # type: (...) -> Optional[bool] 

949 if exc_value is None or exc_type is None: 

950 return None 

951 # Errored 

952 if isinstance(exc_value, EnvironmentError): 

953 msg = "Could not execute %s, is it installed?" % self.prog 

954 else: 

955 msg = "%s: execution failed (%s)" % ( 

956 self.prog, 

957 exc_type.__class__.__name__ 

958 ) 

959 if not self.suppress: 

960 raise exc_type(msg) 

961 log_runtime.error(msg, exc_info=True) 

962 return True # Suppress the exception 

963 

964 

965class ContextManagerCaptureOutput(object): 

966 """ 

967 Context manager that intercept the console's output. 

968 

969 Example: 

970 >>> with ContextManagerCaptureOutput() as cmco: 

971 ... print("hey") 

972 ... assert cmco.get_output() == "hey" 

973 """ 

974 

975 def __init__(self): 

976 # type: () -> None 

977 self.result_export_object = "" 

978 

979 def __enter__(self): 

980 # type: () -> ContextManagerCaptureOutput 

981 from unittest import mock 

982 

983 def write(s, decorator=self): 

984 # type: (str, ContextManagerCaptureOutput) -> None 

985 decorator.result_export_object += s 

986 mock_stdout = mock.Mock() 

987 mock_stdout.write = write 

988 self.bck_stdout = sys.stdout 

989 sys.stdout = mock_stdout 

990 return self 

991 

992 def __exit__(self, *exc): 

993 # type: (*Any) -> Literal[False] 

994 sys.stdout = self.bck_stdout 

995 return False 

996 

997 def get_output(self, eval_bytes=False): 

998 # type: (bool) -> str 

999 if self.result_export_object.startswith("b'") and eval_bytes: 

1000 return plain_str(eval(self.result_export_object)) 

1001 return self.result_export_object 

1002 

1003 

1004def do_graph( 

1005 graph, # type: str 

1006 prog=None, # type: Optional[str] 

1007 format=None, # type: Optional[str] 

1008 target=None, # type: Optional[Union[IO[bytes], str]] 

1009 type=None, # type: Optional[str] 

1010 string=None, # type: Optional[bool] 

1011 options=None # type: Optional[List[str]] 

1012): 

1013 # type: (...) -> Optional[str] 

1014 """Processes graph description using an external software. 

1015 This method is used to convert a graphviz format to an image. 

1016 

1017 :param graph: GraphViz graph description 

1018 :param prog: which graphviz program to use 

1019 :param format: output type (svg, ps, gif, jpg, etc.), passed to dot's "-T" 

1020 option 

1021 :param string: if not None, simply return the graph string 

1022 :param target: filename or redirect. Defaults pipe to Imagemagick's 

1023 display program 

1024 :param options: options to be passed to prog 

1025 """ 

1026 

1027 if format is None: 

1028 format = "svg" 

1029 if string: 

1030 return graph 

1031 if type is not None: 

1032 warnings.warn( 

1033 "type is deprecated, and was renamed format", 

1034 DeprecationWarning 

1035 ) 

1036 format = type 

1037 if prog is None: 

1038 prog = conf.prog.dot 

1039 start_viewer = False 

1040 if target is None: 

1041 if WINDOWS: 

1042 target = get_temp_file(autoext="." + format) 

1043 start_viewer = True 

1044 else: 

1045 with ContextManagerSubprocess(conf.prog.display): 

1046 target = subprocess.Popen([conf.prog.display], 

1047 stdin=subprocess.PIPE).stdin 

1048 if format is not None: 

1049 format = "-T%s" % format 

1050 if isinstance(target, str): 

1051 if target.startswith('|'): 

1052 target = subprocess.Popen(target[1:].lstrip(), shell=True, 

1053 stdin=subprocess.PIPE).stdin 

1054 elif target.startswith('>'): 

1055 target = open(target[1:].lstrip(), "wb") 

1056 else: 

1057 target = open(os.path.abspath(target), "wb") 

1058 target = cast(IO[bytes], target) 

1059 proc = subprocess.Popen( 

1060 "\"%s\" %s %s" % (prog, options or "", format or ""), 

1061 shell=True, stdin=subprocess.PIPE, stdout=target, 

1062 stderr=subprocess.PIPE 

1063 ) 

1064 _, stderr = proc.communicate(bytes_encode(graph)) 

1065 if proc.returncode != 0: 

1066 raise OSError( 

1067 "GraphViz call failed (is it installed?):\n" + 

1068 plain_str(stderr) 

1069 ) 

1070 try: 

1071 target.close() 

1072 except Exception: 

1073 pass 

1074 if start_viewer: 

1075 # Workaround for file not found error: We wait until tempfile is written. # noqa: E501 

1076 waiting_start = time.time() 

1077 while not os.path.exists(target.name): 

1078 time.sleep(0.1) 

1079 if time.time() - waiting_start > 3: 

1080 warning("Temporary file '%s' could not be written. Graphic will not be displayed.", tempfile) # noqa: E501 

1081 break 

1082 else: 

1083 if WINDOWS and conf.prog.display == conf.prog._default: 

1084 os.startfile(target.name) 

1085 else: 

1086 with ContextManagerSubprocess(conf.prog.display): 

1087 subprocess.Popen([conf.prog.display, target.name]) 

1088 return None 

1089 

1090 

1091_TEX_TR = { 

1092 "{": "{\\tt\\char123}", 

1093 "}": "{\\tt\\char125}", 

1094 "\\": "{\\tt\\char92}", 

1095 "^": "\\^{}", 

1096 "$": "\\$", 

1097 "#": "\\#", 

1098 "_": "\\_", 

1099 "&": "\\&", 

1100 "%": "\\%", 

1101 "|": "{\\tt\\char124}", 

1102 "~": "{\\tt\\char126}", 

1103 "<": "{\\tt\\char60}", 

1104 ">": "{\\tt\\char62}", 

1105} 

1106 

1107 

1108def tex_escape(x): 

1109 # type: (str) -> str 

1110 s = "" 

1111 for c in x: 

1112 s += _TEX_TR.get(c, c) 

1113 return s 

1114 

1115 

1116def colgen(*lstcol, # type: Any 

1117 **kargs # type: Any 

1118 ): 

1119 # type: (...) -> Iterator[Any] 

1120 """Returns a generator that mixes provided quantities forever 

1121 trans: a function to convert the three arguments into a color. lambda x,y,z:(x,y,z) by default""" # noqa: E501 

1122 if len(lstcol) < 2: 

1123 lstcol *= 2 

1124 trans = kargs.get("trans", lambda x, y, z: (x, y, z)) 

1125 while True: 

1126 for i in range(len(lstcol)): 

1127 for j in range(len(lstcol)): 

1128 for k in range(len(lstcol)): 

1129 if i != j or j != k or k != i: 

1130 yield trans(lstcol[(i + j) % len(lstcol)], lstcol[(j + k) % len(lstcol)], lstcol[(k + i) % len(lstcol)]) # noqa: E501 

1131 

1132 

1133def incremental_label(label="tag%05i", start=0): 

1134 # type: (str, int) -> Iterator[str] 

1135 while True: 

1136 yield label % start 

1137 start += 1 

1138 

1139 

1140def binrepr(val): 

1141 # type: (int) -> str 

1142 return bin(val)[2:] 

1143 

1144 

1145def long_converter(s): 

1146 # type: (str) -> int 

1147 return int(s.replace('\n', '').replace(' ', ''), 16) 

1148 

1149######################### 

1150# Enum management # 

1151######################### 

1152 

1153 

1154class EnumElement: 

1155 def __init__(self, key, value): 

1156 # type: (str, int) -> None 

1157 self._key = key 

1158 self._value = value 

1159 

1160 def __repr__(self): 

1161 # type: () -> str 

1162 return "<%s %s[%r]>" % (self.__dict__.get("_name", self.__class__.__name__), self._key, self._value) # noqa: E501 

1163 

1164 def __getattr__(self, attr): 

1165 # type: (str) -> Any 

1166 return getattr(self._value, attr) 

1167 

1168 def __str__(self): 

1169 # type: () -> str 

1170 return self._key 

1171 

1172 def __bytes__(self): 

1173 # type: () -> bytes 

1174 return bytes_encode(self.__str__()) 

1175 

1176 def __hash__(self): 

1177 # type: () -> int 

1178 return self._value 

1179 

1180 def __int__(self): 

1181 # type: () -> int 

1182 return int(self._value) 

1183 

1184 def __eq__(self, other): 

1185 # type: (Any) -> bool 

1186 return self._value == int(other) 

1187 

1188 def __neq__(self, other): 

1189 # type: (Any) -> bool 

1190 return not self.__eq__(other) 

1191 

1192 

1193class Enum_metaclass(type): 

1194 element_class = EnumElement 

1195 

1196 def __new__(cls, name, bases, dct): 

1197 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1198 rdict = {} 

1199 for k, v in dct.items(): 

1200 if isinstance(v, int): 

1201 v = cls.element_class(k, v) 

1202 dct[k] = v 

1203 rdict[v] = k 

1204 dct["__rdict__"] = rdict 

1205 return super(Enum_metaclass, cls).__new__(cls, name, bases, dct) 

1206 

1207 def __getitem__(self, attr): 

1208 # type: (int) -> Any 

1209 return self.__rdict__[attr] # type: ignore 

1210 

1211 def __contains__(self, val): 

1212 # type: (int) -> bool 

1213 return val in self.__rdict__ # type: ignore 

1214 

1215 def get(self, attr, val=None): 

1216 # type: (str, Optional[Any]) -> Any 

1217 return self.__rdict__.get(attr, val) # type: ignore 

1218 

1219 def __repr__(self): 

1220 # type: () -> str 

1221 return "<%s>" % self.__dict__.get("name", self.__name__) 

1222 

1223 

1224################### 

1225# Object saving # 

1226################### 

1227 

1228 

1229def export_object(obj): 

1230 # type: (Any) -> None 

1231 import zlib 

1232 print(base64.b64encode(zlib.compress(pickle.dumps(obj, 2), 9)).decode()) 

1233 

1234 

1235def import_object(obj=None): 

1236 # type: (Optional[str]) -> Any 

1237 import zlib 

1238 if obj is None: 

1239 obj = sys.stdin.read() 

1240 return pickle.loads(zlib.decompress(base64.b64decode(obj.strip()))) 

1241 

1242 

1243def save_object(fname, obj): 

1244 # type: (str, Any) -> None 

1245 """Pickle a Python object""" 

1246 

1247 fd = gzip.open(fname, "wb") 

1248 pickle.dump(obj, fd) 

1249 fd.close() 

1250 

1251 

1252def load_object(fname): 

1253 # type: (str) -> Any 

1254 """unpickle a Python object""" 

1255 return pickle.load(gzip.open(fname, "rb")) 

1256 

1257 

1258@conf.commands.register 

1259def corrupt_bytes(data, p=0.01, n=None): 

1260 # type: (str, float, Optional[int]) -> bytes 

1261 """ 

1262 Corrupt a given percentage (at least one byte) or number of bytes 

1263 from a string 

1264 """ 

1265 s = array.array("B", bytes_encode(data)) 

1266 s_len = len(s) 

1267 if n is None: 

1268 n = max(1, int(s_len * p)) 

1269 for i in random.sample(range(s_len), n): 

1270 s[i] = (s[i] + random.randint(1, 255)) % 256 

1271 return s.tobytes() 

1272 

1273 

1274@conf.commands.register 

1275def corrupt_bits(data, p=0.01, n=None): 

1276 # type: (str, float, Optional[int]) -> bytes 

1277 """ 

1278 Flip a given percentage (at least one bit) or number of bits 

1279 from a string 

1280 """ 

1281 s = array.array("B", bytes_encode(data)) 

1282 s_len = len(s) * 8 

1283 if n is None: 

1284 n = max(1, int(s_len * p)) 

1285 for i in random.sample(range(s_len), n): 

1286 s[i // 8] ^= 1 << (i % 8) 

1287 return s.tobytes() 

1288 

1289 

1290############################# 

1291# pcap capture file stuff # 

1292############################# 

1293 

1294@conf.commands.register 

1295def wrpcap(filename, # type: Union[IO[bytes], str] 

1296 pkt, # type: _PacketIterable 

1297 *args, # type: Any 

1298 **kargs # type: Any 

1299 ): 

1300 # type: (...) -> None 

1301 """Write a list of packets to a pcap file 

1302 

1303 :param filename: the name of the file to write packets to, or an open, 

1304 writable file-like object. The file descriptor will be 

1305 closed at the end of the call, so do not use an object you 

1306 do not want to close (e.g., running wrpcap(sys.stdout, []) 

1307 in interactive mode will crash Scapy). 

1308 :param gz: set to 1 to save a gzipped capture 

1309 :param linktype: force linktype value 

1310 :param endianness: "<" or ">", force endianness 

1311 :param sync: do not bufferize writes to the capture file 

1312 """ 

1313 with PcapWriter(filename, *args, **kargs) as fdesc: 

1314 fdesc.write(pkt) 

1315 

1316 

1317@conf.commands.register 

1318def wrpcapng(filename, # type: str 

1319 pkt, # type: _PacketIterable 

1320 ): 

1321 # type: (...) -> None 

1322 """Write a list of packets to a pcapng file 

1323 

1324 :param filename: the name of the file to write packets to, or an open, 

1325 writable file-like object. The file descriptor will be 

1326 closed at the end of the call, so do not use an object you 

1327 do not want to close (e.g., running wrpcapng(sys.stdout, []) 

1328 in interactive mode will crash Scapy). 

1329 :param pkt: packets to write 

1330 """ 

1331 with PcapNgWriter(filename) as fdesc: 

1332 fdesc.write(pkt) 

1333 

1334 

1335@conf.commands.register 

1336def rdpcap(filename, count=-1): 

1337 # type: (Union[IO[bytes], str], int) -> PacketList 

1338 """Read a pcap or pcapng file and return a packet list 

1339 

1340 :param count: read only <count> packets 

1341 """ 

1342 # Rant: Our complicated use of metaclasses and especially the 

1343 # __call__ function is, of course, not supported by MyPy. 

1344 # One day we should simplify this mess and use a much simpler 

1345 # layout that will actually be supported and properly dissected. 

1346 with PcapReader(filename) as fdesc: # type: ignore 

1347 return fdesc.read_all(count=count) 

1348 

1349 

1350# NOTE: Type hinting 

1351# Mypy doesn't understand the following metaclass, and thinks each 

1352# constructor (PcapReader...) needs 3 arguments each. To avoid this, 

1353# we add a fake (=None) to the last 2 arguments then force the value 

1354# to not be None in the signature and pack the whole thing in an ignore. 

1355# This allows to not have # type: ignore every time we call those 

1356# constructors. 

1357 

1358class PcapReader_metaclass(type): 

1359 """Metaclass for (Raw)Pcap(Ng)Readers""" 

1360 

1361 def __new__(cls, name, bases, dct): 

1362 # type: (Any, str, Any, Dict[str, Any]) -> Any 

1363 """The `alternative` class attribute is declared in the PcapNg 

1364 variant, and set here to the Pcap variant. 

1365 

1366 """ 

1367 newcls = super(PcapReader_metaclass, cls).__new__( 

1368 cls, name, bases, dct 

1369 ) 

1370 if 'alternative' in dct: 

1371 dct['alternative'].alternative = newcls 

1372 return newcls 

1373 

1374 def __call__(cls, filename): 

1375 # type: (Union[IO[bytes], str]) -> Any 

1376 """Creates a cls instance, use the `alternative` if that 

1377 fails. 

1378 

1379 """ 

1380 i = cls.__new__( 

1381 cls, 

1382 cls.__name__, 

1383 cls.__bases__, 

1384 cls.__dict__ # type: ignore 

1385 ) 

1386 filename, fdesc, magic = cls.open(filename) 

1387 if not magic: 

1388 raise Scapy_Exception( 

1389 "No data could be read!" 

1390 ) 

1391 try: 

1392 i.__init__(filename, fdesc, magic) 

1393 return i 

1394 except (Scapy_Exception, EOFError): 

1395 pass 

1396 

1397 if "alternative" in cls.__dict__: 

1398 cls = cls.__dict__["alternative"] 

1399 i = cls.__new__( 

1400 cls, 

1401 cls.__name__, 

1402 cls.__bases__, 

1403 cls.__dict__ # type: ignore 

1404 ) 

1405 try: 

1406 i.__init__(filename, fdesc, magic) 

1407 return i 

1408 except (Scapy_Exception, EOFError): 

1409 pass 

1410 

1411 raise Scapy_Exception("Not a supported capture file") 

1412 

1413 @staticmethod 

1414 def open(fname # type: Union[IO[bytes], str] 

1415 ): 

1416 # type: (...) -> Tuple[str, _ByteStream, bytes] 

1417 """Open (if necessary) filename, and read the magic.""" 

1418 if isinstance(fname, str): 

1419 filename = fname 

1420 fdesc = open(filename, "rb") # type: _ByteStream 

1421 magic = fdesc.read(2) 

1422 if magic == b"\x1f\x8b": 

1423 # GZIP header detected. 

1424 fdesc.seek(0) 

1425 fdesc = gzip.GzipFile(fileobj=fdesc) 

1426 magic = fdesc.read(2) 

1427 magic += fdesc.read(2) 

1428 else: 

1429 fdesc = fname 

1430 filename = getattr(fdesc, "name", "No name") 

1431 magic = fdesc.read(4) 

1432 return filename, fdesc, magic 

1433 

1434 

1435class RawPcapReader(metaclass=PcapReader_metaclass): 

1436 """A stateful pcap reader. Each packet is returned as a string""" 

1437 

1438 # TODO: use Generics to properly type the various readers. 

1439 # As of right now, RawPcapReader is typed as if it returned packets 

1440 # because all of its child do. Fix that 

1441 

1442 nonblocking_socket = True 

1443 PacketMetadata = collections.namedtuple("PacketMetadata", 

1444 ["sec", "usec", "wirelen", "caplen"]) # noqa: E501 

1445 

1446 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1447 # type: (str, _ByteStream, bytes) -> None 

1448 self.filename = filename 

1449 self.f = fdesc 

1450 if magic == b"\xa1\xb2\xc3\xd4": # big endian 

1451 self.endian = ">" 

1452 self.nano = False 

1453 elif magic == b"\xd4\xc3\xb2\xa1": # little endian 

1454 self.endian = "<" 

1455 self.nano = False 

1456 elif magic == b"\xa1\xb2\x3c\x4d": # big endian, nanosecond-precision 

1457 self.endian = ">" 

1458 self.nano = True 

1459 elif magic == b"\x4d\x3c\xb2\xa1": # little endian, nanosecond-precision # noqa: E501 

1460 self.endian = "<" 

1461 self.nano = True 

1462 else: 

1463 raise Scapy_Exception( 

1464 "Not a pcap capture file (bad magic: %r)" % magic 

1465 ) 

1466 hdr = self.f.read(20) 

1467 if len(hdr) < 20: 

1468 raise Scapy_Exception("Invalid pcap file (too short)") 

1469 vermaj, vermin, tz, sig, snaplen, linktype = struct.unpack( 

1470 self.endian + "HHIIII", hdr 

1471 ) 

1472 self.linktype = linktype 

1473 self.snaplen = snaplen 

1474 

1475 def __enter__(self): 

1476 # type: () -> RawPcapReader 

1477 return self 

1478 

1479 def __iter__(self): 

1480 # type: () -> RawPcapReader 

1481 return self 

1482 

1483 def __next__(self): 

1484 # type: () -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1485 """ 

1486 implement the iterator protocol on a set of packets in a pcap file 

1487 """ 

1488 try: 

1489 return self._read_packet() 

1490 except EOFError: 

1491 raise StopIteration 

1492 

1493 def _read_packet(self, size=MTU): 

1494 # type: (int) -> Tuple[bytes, RawPcapReader.PacketMetadata] 

1495 """return a single packet read from the file as a tuple containing 

1496 (pkt_data, pkt_metadata) 

1497 

1498 raise EOFError when no more packets are available 

1499 """ 

1500 hdr = self.f.read(16) 

1501 if len(hdr) < 16: 

1502 raise EOFError 

1503 sec, usec, caplen, wirelen = struct.unpack(self.endian + "IIII", hdr) 

1504 

1505 try: 

1506 data = self.f.read(caplen)[:size] 

1507 except OverflowError as e: 

1508 warning(f"Pcap: {e}") 

1509 raise EOFError 

1510 

1511 return (data, 

1512 RawPcapReader.PacketMetadata(sec=sec, usec=usec, 

1513 wirelen=wirelen, caplen=caplen)) 

1514 

1515 def read_packet(self, size=MTU): 

1516 # type: (int) -> Packet 

1517 raise Exception( 

1518 "Cannot call read_packet() in RawPcapReader. Use " 

1519 "_read_packet()" 

1520 ) 

1521 

1522 def dispatch(self, 

1523 callback # type: Callable[[Tuple[bytes, RawPcapReader.PacketMetadata]], Any] # noqa: E501 

1524 ): 

1525 # type: (...) -> None 

1526 """call the specified callback routine for each packet read 

1527 

1528 This is just a convenience function for the main loop 

1529 that allows for easy launching of packet processing in a 

1530 thread. 

1531 """ 

1532 for p in self: 

1533 callback(p) 

1534 

1535 def _read_all(self, count=-1): 

1536 # type: (int) -> List[Packet] 

1537 """return a list of all packets in the pcap file 

1538 """ 

1539 res = [] # type: List[Packet] 

1540 while count != 0: 

1541 count -= 1 

1542 try: 

1543 p = self.read_packet() # type: Packet 

1544 except EOFError: 

1545 break 

1546 res.append(p) 

1547 return res 

1548 

1549 def recv(self, size=MTU): 

1550 # type: (int) -> bytes 

1551 """ Emulate a socket 

1552 """ 

1553 return self._read_packet(size=size)[0] 

1554 

1555 def fileno(self): 

1556 # type: () -> int 

1557 return -1 if WINDOWS else self.f.fileno() 

1558 

1559 def close(self): 

1560 # type: () -> None 

1561 if isinstance(self.f, gzip.GzipFile): 

1562 self.f.fileobj.close() # type: ignore 

1563 self.f.close() 

1564 

1565 def __exit__(self, exc_type, exc_value, tracback): 

1566 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

1567 self.close() 

1568 

1569 # emulate SuperSocket 

1570 @staticmethod 

1571 def select(sockets, # type: List[SuperSocket] 

1572 remain=None, # type: Optional[float] 

1573 ): 

1574 # type: (...) -> List[SuperSocket] 

1575 return sockets 

1576 

1577 

1578class PcapReader(RawPcapReader): 

1579 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1580 # type: (str, IO[bytes], bytes) -> None 

1581 RawPcapReader.__init__(self, filename, fdesc, magic) 

1582 try: 

1583 self.LLcls = conf.l2types.num2layer[ 

1584 self.linktype 

1585 ] # type: Type[Packet] 

1586 except KeyError: 

1587 warning("PcapReader: unknown LL type [%i]/[%#x]. Using Raw packets" % (self.linktype, self.linktype)) # noqa: E501 

1588 if conf.raw_layer is None: 

1589 # conf.raw_layer is set on import 

1590 import scapy.packet # noqa: F401 

1591 self.LLcls = conf.raw_layer 

1592 

1593 def __enter__(self): 

1594 # type: () -> PcapReader 

1595 return self 

1596 

1597 def read_packet(self, size=MTU, **kwargs): 

1598 # type: (int, **Any) -> Packet 

1599 rp = super(PcapReader, self)._read_packet(size=size) 

1600 if rp is None: 

1601 raise EOFError 

1602 s, pkt_info = rp 

1603 

1604 try: 

1605 p = self.LLcls(s, **kwargs) # type: Packet 

1606 except KeyboardInterrupt: 

1607 raise 

1608 except Exception: 

1609 if conf.debug_dissector: 

1610 from scapy.sendrecv import debug 

1611 debug.crashed_on = (self.LLcls, s) 

1612 raise 

1613 if conf.raw_layer is None: 

1614 # conf.raw_layer is set on import 

1615 import scapy.packet # noqa: F401 

1616 p = conf.raw_layer(s) 

1617 power = Decimal(10) ** Decimal(-9 if self.nano else -6) 

1618 p.time = EDecimal(pkt_info.sec + power * pkt_info.usec) 

1619 p.wirelen = pkt_info.wirelen 

1620 return p 

1621 

1622 def recv(self, size=MTU, **kwargs): # type: ignore 

1623 # type: (int, **Any) -> Packet 

1624 return self.read_packet(size=size, **kwargs) 

1625 

1626 def __iter__(self): 

1627 # type: () -> PcapReader 

1628 return self 

1629 

1630 def __next__(self): # type: ignore 

1631 # type: () -> Packet 

1632 try: 

1633 return self.read_packet() 

1634 except EOFError: 

1635 raise StopIteration 

1636 

1637 def read_all(self, count=-1): 

1638 # type: (int) -> PacketList 

1639 res = self._read_all(count) 

1640 from scapy import plist 

1641 return plist.PacketList(res, name=os.path.basename(self.filename)) 

1642 

1643 

1644class RawPcapNgReader(RawPcapReader): 

1645 """A stateful pcapng reader. Each packet is returned as 

1646 bytes. 

1647 

1648 """ 

1649 

1650 alternative = RawPcapReader # type: Type[Any] 

1651 

1652 PacketMetadata = collections.namedtuple("PacketMetadataNg", # type: ignore 

1653 ["linktype", "tsresol", 

1654 "tshigh", "tslow", "wirelen", 

1655 "comment", "ifname", "direction", 

1656 "process_information"]) 

1657 

1658 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

1659 # type: (str, IO[bytes], bytes) -> None 

1660 self.filename = filename 

1661 self.f = fdesc 

1662 # A list of (linktype, snaplen, tsresol); will be populated by IDBs. 

1663 self.interfaces = [] # type: List[Tuple[int, int, Dict[str, Any]]] 

1664 self.default_options = { 

1665 "tsresol": 1000000 

1666 } 

1667 self.blocktypes: Dict[ 

1668 int, 

1669 Callable[ 

1670 [bytes, int], 

1671 Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] 

1672 ]] = { 

1673 1: self._read_block_idb, 

1674 2: self._read_block_pkt, 

1675 3: self._read_block_spb, 

1676 6: self._read_block_epb, 

1677 10: self._read_block_dsb, 

1678 0x80000001: self._read_block_pib, 

1679 } 

1680 self.endian = "!" # Will be overwritten by first SHB 

1681 self.process_information = [] # type: List[Dict[str, Any]] 

1682 

1683 if magic != b"\x0a\x0d\x0d\x0a": # PcapNg: 

1684 raise Scapy_Exception( 

1685 "Not a pcapng capture file (bad magic: %r)" % magic 

1686 ) 

1687 

1688 try: 

1689 self._read_block_shb() 

1690 except EOFError: 

1691 raise Scapy_Exception( 

1692 "The first SHB of the pcapng file is malformed !" 

1693 ) 

1694 

1695 def _read_block(self, size=MTU): 

1696 # type: (int) -> Optional[Tuple[bytes, RawPcapNgReader.PacketMetadata]] # noqa: E501 

1697 try: 

1698 blocktype = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1699 except struct.error: 

1700 raise EOFError 

1701 if blocktype == 0x0A0D0D0A: 

1702 # This function updates the endianness based on the block content. 

1703 self._read_block_shb() 

1704 return None 

1705 try: 

1706 blocklen = struct.unpack(self.endian + "I", self.f.read(4))[0] 

1707 except struct.error: 

1708 warning("PcapNg: Error reading blocklen before block body") 

1709 raise EOFError 

1710 if blocklen < 12: 

1711 warning("PcapNg: Invalid block length !") 

1712 raise EOFError 

1713 

1714 _block_body_length = blocklen - 12 

1715 block = self.f.read(_block_body_length) 

1716 if len(block) != _block_body_length: 

1717 raise Scapy_Exception("PcapNg: Invalid Block body length " 

1718 "(too short)") 

1719 self._read_block_tail(blocklen) 

1720 if blocktype in self.blocktypes: 

1721 return self.blocktypes[blocktype](block, size) 

1722 return None 

1723 

1724 def _read_block_tail(self, blocklen): 

1725 # type: (int) -> None 

1726 if blocklen % 4: 

1727 pad = self.f.read(-blocklen % 4) 

1728 warning("PcapNg: bad blocklen %d (MUST be a multiple of 4. " 

1729 "Ignored padding %r" % (blocklen, pad)) 

1730 try: 

1731 if blocklen != struct.unpack(self.endian + 'I', 

1732 self.f.read(4))[0]: 

1733 raise EOFError("PcapNg: Invalid pcapng block (bad blocklen)") 

1734 except struct.error: 

1735 warning("PcapNg: Could not read blocklen after block body") 

1736 raise EOFError 

1737 

1738 def _read_block_shb(self): 

1739 # type: () -> None 

1740 """Section Header Block""" 

1741 _blocklen = self.f.read(4) 

1742 endian = self.f.read(4) 

1743 if endian == b"\x1a\x2b\x3c\x4d": 

1744 self.endian = ">" 

1745 elif endian == b"\x4d\x3c\x2b\x1a": 

1746 self.endian = "<" 

1747 else: 

1748 warning("PcapNg: Bad magic in Section Header Block" 

1749 " (not a pcapng file?)") 

1750 raise EOFError 

1751 

1752 try: 

1753 blocklen = struct.unpack(self.endian + "I", _blocklen)[0] 

1754 except struct.error: 

1755 warning("PcapNg: Could not read blocklen") 

1756 raise EOFError 

1757 if blocklen < 28: 

1758 warning(f"PcapNg: Invalid Section Header Block length ({blocklen})!") # noqa: E501 

1759 raise EOFError 

1760 

1761 # Major version must be 1 

1762 _major = self.f.read(2) 

1763 try: 

1764 major = struct.unpack(self.endian + "H", _major)[0] 

1765 except struct.error: 

1766 warning("PcapNg: Could not read major value") 

1767 raise EOFError 

1768 if major != 1: 

1769 warning(f"PcapNg: SHB Major version {major} unsupported !") 

1770 raise EOFError 

1771 

1772 # Skip minor version & section length 

1773 skipped = self.f.read(10) 

1774 if len(skipped) != 10: 

1775 warning("PcapNg: Could not read minor value & section length") 

1776 raise EOFError 

1777 

1778 _options_len = blocklen - 28 

1779 options = self.f.read(_options_len) 

1780 if len(options) != _options_len: 

1781 raise Scapy_Exception("PcapNg: Invalid Section Header Block " 

1782 " options (too short)") 

1783 self._read_block_tail(blocklen) 

1784 self._read_options(options) 

1785 

1786 def _read_packet(self, size=MTU): # type: ignore 

1787 # type: (int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1788 """Read blocks until it reaches either EOF or a packet, and 

1789 returns None or (packet, (linktype, sec, usec, wirelen)), 

1790 where packet is a string. 

1791 

1792 """ 

1793 while True: 

1794 res = self._read_block(size=size) 

1795 if res is not None: 

1796 return res 

1797 

1798 def _read_options(self, options): 

1799 # type: (bytes) -> Dict[int, bytes] 

1800 opts = dict() 

1801 while len(options) >= 4: 

1802 try: 

1803 code, length = struct.unpack(self.endian + "HH", options[:4]) 

1804 except struct.error: 

1805 warning("PcapNg: options header is too small " 

1806 "%d !" % len(options)) 

1807 raise EOFError 

1808 if code != 0 and 4 + length <= len(options): 

1809 opts[code] = options[4:4 + length] 

1810 if code == 0: 

1811 if length != 0: 

1812 warning("PcapNg: invalid option " 

1813 "length %d for end-of-option" % length) 

1814 break 

1815 if length % 4: 

1816 length += (4 - (length % 4)) 

1817 options = options[4 + length:] 

1818 return opts 

1819 

1820 def _read_block_idb(self, block, _): 

1821 # type: (bytes, int) -> None 

1822 """Interface Description Block""" 

1823 # 2 bytes LinkType + 2 bytes Reserved 

1824 # 4 bytes Snaplen 

1825 options_raw = self._read_options(block[8:]) 

1826 options = self.default_options.copy() # type: Dict[str, Any] 

1827 for c, v in options_raw.items(): 

1828 if c == 9: 

1829 length = len(v) 

1830 if length == 1: 

1831 tsresol = orb(v) 

1832 options["tsresol"] = (2 if tsresol & 128 else 10) ** ( 

1833 tsresol & 127 

1834 ) 

1835 else: 

1836 warning("PcapNg: invalid options " 

1837 "length %d for IDB tsresol" % length) 

1838 elif c == 2: 

1839 options["name"] = v 

1840 elif c == 1: 

1841 options["comment"] = v 

1842 try: 

1843 interface: Tuple[int, int, Dict[str, Any]] = struct.unpack( 

1844 self.endian + "HxxI", 

1845 block[:8] 

1846 ) + (options,) 

1847 except struct.error: 

1848 warning("PcapNg: IDB is too small %d/8 !" % len(block)) 

1849 raise EOFError 

1850 self.interfaces.append(interface) 

1851 

1852 def _check_interface_id(self, intid): 

1853 # type: (int) -> None 

1854 """Check the interface id value and raise EOFError if invalid.""" 

1855 tmp_len = len(self.interfaces) 

1856 if intid >= tmp_len: 

1857 warning("PcapNg: invalid interface id %d/%d" % (intid, tmp_len)) 

1858 raise EOFError 

1859 

1860 def _read_block_epb(self, block, size): 

1861 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1862 """Enhanced Packet Block""" 

1863 try: 

1864 intid, tshigh, tslow, caplen, wirelen = struct.unpack( 

1865 self.endian + "5I", 

1866 block[:20], 

1867 ) 

1868 except struct.error: 

1869 warning("PcapNg: EPB is too small %d/20 !" % len(block)) 

1870 raise EOFError 

1871 

1872 # Compute the options offset taking padding into account 

1873 if caplen % 4: 

1874 opt_offset = 20 + caplen + (-caplen) % 4 

1875 else: 

1876 opt_offset = 20 + caplen 

1877 

1878 # Parse options 

1879 options = self._read_options(block[opt_offset:]) 

1880 

1881 process_information = {} 

1882 for code, value in options.items(): 

1883 if code in [0x8001, 0x8003]: # PCAPNG_EPB_PIB_INDEX, PCAPNG_EPB_E_PIB_INDEX 

1884 try: 

1885 proc_index = struct.unpack(self.endian + "I", value)[0] 

1886 except struct.error: 

1887 warning("PcapNg: EPB invalid proc index" 

1888 "(expected 4 bytes, got %d) !" % len(value)) 

1889 raise EOFError 

1890 if proc_index < len(self.process_information): 

1891 key = "proc" if code == 0x8001 else "eproc" 

1892 process_information[key] = self.process_information[proc_index] 

1893 else: 

1894 warning("PcapNg: EPB invalid process information index " 

1895 "(%d/%d) !" % (proc_index, len(self.process_information))) 

1896 

1897 comment = options.get(1, None) 

1898 epb_flags_raw = options.get(2, None) 

1899 if epb_flags_raw: 

1900 try: 

1901 epb_flags, = struct.unpack(self.endian + "I", epb_flags_raw) 

1902 except struct.error: 

1903 warning("PcapNg: EPB invalid flags size" 

1904 "(expected 4 bytes, got %d) !" % len(epb_flags_raw)) 

1905 raise EOFError 

1906 direction = epb_flags & 3 

1907 

1908 else: 

1909 direction = None 

1910 

1911 self._check_interface_id(intid) 

1912 ifname = self.interfaces[intid][2].get('name', None) 

1913 

1914 return (block[20:20 + caplen][:size], 

1915 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1916 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1917 tshigh=tshigh, 

1918 tslow=tslow, 

1919 wirelen=wirelen, 

1920 comment=comment, 

1921 ifname=ifname, 

1922 direction=direction, 

1923 process_information=process_information)) 

1924 

1925 def _read_block_spb(self, block, size): 

1926 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1927 """Simple Packet Block""" 

1928 # "it MUST be assumed that all the Simple Packet Blocks have 

1929 # been captured on the interface previously specified in the 

1930 # first Interface Description Block." 

1931 intid = 0 

1932 self._check_interface_id(intid) 

1933 

1934 try: 

1935 wirelen, = struct.unpack(self.endian + "I", block[:4]) 

1936 except struct.error: 

1937 warning("PcapNg: SPB is too small %d/4 !" % len(block)) 

1938 raise EOFError 

1939 

1940 caplen = min(wirelen, self.interfaces[intid][1]) 

1941 return (block[4:4 + caplen][:size], 

1942 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1943 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1944 tshigh=None, 

1945 tslow=None, 

1946 wirelen=wirelen, 

1947 comment=None, 

1948 ifname=None, 

1949 direction=None, 

1950 process_information={})) 

1951 

1952 def _read_block_pkt(self, block, size): 

1953 # type: (bytes, int) -> Tuple[bytes, RawPcapNgReader.PacketMetadata] 

1954 """(Obsolete) Packet Block""" 

1955 try: 

1956 intid, drops, tshigh, tslow, caplen, wirelen = struct.unpack( 

1957 self.endian + "HH4I", 

1958 block[:20], 

1959 ) 

1960 except struct.error: 

1961 warning("PcapNg: PKT is too small %d/20 !" % len(block)) 

1962 raise EOFError 

1963 

1964 self._check_interface_id(intid) 

1965 return (block[20:20 + caplen][:size], 

1966 RawPcapNgReader.PacketMetadata(linktype=self.interfaces[intid][0], # noqa: E501 

1967 tsresol=self.interfaces[intid][2]['tsresol'], # noqa: E501 

1968 tshigh=tshigh, 

1969 tslow=tslow, 

1970 wirelen=wirelen, 

1971 comment=None, 

1972 ifname=None, 

1973 direction=None, 

1974 process_information={})) 

1975 

1976 def _read_block_dsb(self, block, size): 

1977 # type: (bytes, int) -> None 

1978 """Decryption Secrets Block""" 

1979 

1980 # Parse the secrets type and length fields 

1981 try: 

1982 secrets_type, secrets_length = struct.unpack( 

1983 self.endian + "II", 

1984 block[:8], 

1985 ) 

1986 block = block[8:] 

1987 except struct.error: 

1988 warning("PcapNg: DSB is too small %d!", len(block)) 

1989 raise EOFError 

1990 

1991 # Compute the secrets length including the padding 

1992 padded_secrets_length = secrets_length + (-secrets_length) % 4 

1993 if len(block) < padded_secrets_length: 

1994 warning("PcapNg: invalid DSB secrets length!") 

1995 raise EOFError 

1996 

1997 # Extract secrets data and options 

1998 secrets_data = block[:padded_secrets_length][:secrets_length] 

1999 if block[padded_secrets_length:]: 

2000 warning("PcapNg: DSB options are not supported!") 

2001 

2002 # TLS Key Log 

2003 if secrets_type == 0x544c534b: 

2004 if getattr(conf, "tls_sessions", False) is False: 

2005 warning("PcapNg: TLS Key Log available, but " 

2006 "the TLS layer is not loaded! Scapy won't be able " 

2007 "to decrypt the packets.") 

2008 else: 

2009 from scapy.layers.tls.session import load_nss_keys 

2010 

2011 # Write Key Log to a file and parse it 

2012 filename = get_temp_file() 

2013 with open(filename, "wb") as fd: 

2014 fd.write(secrets_data) 

2015 fd.close() 

2016 

2017 keys = load_nss_keys(filename) 

2018 if not keys: 

2019 warning("PcapNg: invalid TLS Key Log in DSB!") 

2020 else: 

2021 # Note: these attributes are only available when the TLS 

2022 # layer is loaded. 

2023 conf.tls_nss_keys = keys 

2024 conf.tls_session_enable = True 

2025 else: 

2026 warning("PcapNg: Unknown DSB secrets type (0x%x)!", secrets_type) 

2027 

2028 def _read_block_pib(self, block, _): 

2029 # type: (bytes, int) -> None 

2030 """Apple Process Information Block""" 

2031 

2032 # Get the Process ID 

2033 try: 

2034 dpeb_pid = struct.unpack(self.endian + "I", block[:4])[0] 

2035 process_information = {"id": dpeb_pid} 

2036 block = block[4:] 

2037 except struct.error: 

2038 warning("PcapNg: DPEB is too small (%d). Cannot get PID!", 

2039 len(block)) 

2040 raise EOFError 

2041 

2042 # Get Options 

2043 options = self._read_options(block) 

2044 for code, value in options.items(): 

2045 if code == 2: 

2046 process_information["name"] = value.decode("ascii", "backslashreplace") 

2047 elif code == 4: 

2048 if len(value) == 16: 

2049 process_information["uuid"] = str(UUID(bytes=value)) 

2050 else: 

2051 warning("PcapNg: DPEB UUID length is invalid (%d)!", 

2052 len(value)) 

2053 

2054 # Store process information 

2055 self.process_information.append(process_information) 

2056 

2057 

2058class PcapNgReader(RawPcapNgReader, PcapReader): 

2059 

2060 alternative = PcapReader 

2061 

2062 def __init__(self, filename, fdesc=None, magic=None): # type: ignore 

2063 # type: (str, IO[bytes], bytes) -> None 

2064 RawPcapNgReader.__init__(self, filename, fdesc, magic) 

2065 

2066 def __enter__(self): 

2067 # type: () -> PcapNgReader 

2068 return self 

2069 

2070 def read_packet(self, size=MTU, **kwargs): 

2071 # type: (int, **Any) -> Packet 

2072 rp = super(PcapNgReader, self)._read_packet(size=size) 

2073 if rp is None: 

2074 raise EOFError 

2075 s, (linktype, tsresol, tshigh, tslow, wirelen, comment, ifname, direction, process_information) = rp # noqa: E501 

2076 try: 

2077 cls = conf.l2types.num2layer[linktype] # type: Type[Packet] 

2078 p = cls(s, **kwargs) # type: Packet 

2079 except KeyboardInterrupt: 

2080 raise 

2081 except Exception: 

2082 if conf.debug_dissector: 

2083 raise 

2084 if conf.raw_layer is None: 

2085 # conf.raw_layer is set on import 

2086 import scapy.packet # noqa: F401 

2087 p = conf.raw_layer(s) 

2088 if tshigh is not None: 

2089 p.time = EDecimal((tshigh << 32) + tslow) / tsresol 

2090 p.wirelen = wirelen 

2091 p.comment = comment 

2092 p.direction = direction 

2093 p.process_information = process_information.copy() 

2094 if ifname is not None: 

2095 p.sniffed_on = ifname.decode('utf-8', 'backslashreplace') 

2096 return p 

2097 

2098 def recv(self, size: int = MTU, **kwargs: Any) -> 'Packet': # type: ignore 

2099 return self.read_packet(size=size, **kwargs) 

2100 

2101 

2102class GenericPcapWriter(object): 

2103 nano = False 

2104 linktype: int 

2105 

2106 def _write_header(self, pkt): 

2107 # type: (Optional[Union[Packet, bytes]]) -> None 

2108 raise NotImplementedError 

2109 

2110 def _write_packet(self, 

2111 packet, # type: Union[bytes, Packet] 

2112 linktype, # type: int 

2113 sec=None, # type: Optional[float] 

2114 usec=None, # type: Optional[int] 

2115 caplen=None, # type: Optional[int] 

2116 wirelen=None, # type: Optional[int] 

2117 comment=None, # type: Optional[bytes] 

2118 ifname=None, # type: Optional[bytes] 

2119 direction=None, # type: Optional[int] 

2120 ): 

2121 # type: (...) -> None 

2122 raise NotImplementedError 

2123 

2124 def _get_time(self, 

2125 packet, # type: Union[bytes, Packet] 

2126 sec, # type: Optional[float] 

2127 usec # type: Optional[int] 

2128 ): 

2129 # type: (...) -> Tuple[float, int] 

2130 if hasattr(packet, "time"): 

2131 if sec is None: 

2132 packet_time = packet.time 

2133 tmp = int(packet_time) 

2134 usec = int(round((packet_time - tmp) * 

2135 (1000000000 if self.nano else 1000000))) 

2136 sec = float(packet_time) 

2137 if sec is not None and usec is None: 

2138 usec = 0 

2139 return sec, usec # type: ignore 

2140 

2141 def write_header(self, pkt): 

2142 # type: (Optional[Union[Packet, bytes]]) -> None 

2143 if not hasattr(self, 'linktype'): 

2144 try: 

2145 if pkt is None or isinstance(pkt, bytes): 

2146 # Can't guess LL 

2147 raise KeyError 

2148 self.linktype = conf.l2types.layer2num[ 

2149 pkt.__class__ 

2150 ] 

2151 except KeyError: 

2152 msg = "%s: unknown LL type for %s. Using type 1 (Ethernet)" 

2153 warning(msg, self.__class__.__name__, pkt.__class__.__name__) 

2154 self.linktype = DLT_EN10MB 

2155 self._write_header(pkt) 

2156 

2157 def write_packet(self, 

2158 packet, # type: Union[bytes, Packet] 

2159 sec=None, # type: Optional[float] 

2160 usec=None, # type: Optional[int] 

2161 caplen=None, # type: Optional[int] 

2162 wirelen=None, # type: Optional[int] 

2163 ): 

2164 # type: (...) -> None 

2165 """ 

2166 Writes a single packet to the pcap file. 

2167 

2168 :param packet: Packet, or bytes for a single packet 

2169 :type packet: scapy.packet.Packet or bytes 

2170 :param sec: time the packet was captured, in seconds since epoch. If 

2171 not supplied, defaults to now. 

2172 :type sec: float 

2173 :param usec: If ``nano=True``, then number of nanoseconds after the 

2174 second that the packet was captured. If ``nano=False``, 

2175 then the number of microseconds after the second the 

2176 packet was captured. If ``sec`` is not specified, 

2177 this value is ignored. 

2178 :type usec: int or long 

2179 :param caplen: The length of the packet in the capture file. If not 

2180 specified, uses ``len(raw(packet))``. 

2181 :type caplen: int 

2182 :param wirelen: The length of the packet on the wire. If not 

2183 specified, tries ``packet.wirelen``, otherwise uses 

2184 ``caplen``. 

2185 :type wirelen: int 

2186 :return: None 

2187 :rtype: None 

2188 """ 

2189 f_sec, usec = self._get_time(packet, sec, usec) 

2190 

2191 rawpkt = bytes_encode(packet) 

2192 caplen = len(rawpkt) if caplen is None else caplen 

2193 

2194 if wirelen is None: 

2195 if hasattr(packet, "wirelen"): 

2196 wirelen = packet.wirelen 

2197 if wirelen is None: 

2198 wirelen = caplen 

2199 

2200 comment = getattr(packet, "comment", None) 

2201 ifname = getattr(packet, "sniffed_on", None) 

2202 direction = getattr(packet, "direction", None) 

2203 if not isinstance(packet, bytes): 

2204 linktype: int = conf.l2types.layer2num[ 

2205 packet.__class__ 

2206 ] 

2207 else: 

2208 linktype = self.linktype 

2209 if ifname is not None: 

2210 ifname = str(ifname).encode('utf-8') 

2211 self._write_packet( 

2212 rawpkt, 

2213 sec=f_sec, usec=usec, 

2214 caplen=caplen, wirelen=wirelen, 

2215 comment=comment, 

2216 ifname=ifname, 

2217 direction=direction, 

2218 linktype=linktype 

2219 ) 

2220 

2221 

2222class GenericRawPcapWriter(GenericPcapWriter): 

2223 header_present = False 

2224 nano = False 

2225 sync = False 

2226 f = None # type: Union[IO[bytes], gzip.GzipFile] 

2227 

2228 def fileno(self): 

2229 # type: () -> int 

2230 return -1 if WINDOWS else self.f.fileno() 

2231 

2232 def flush(self): 

2233 # type: () -> Optional[Any] 

2234 return self.f.flush() 

2235 

2236 def close(self): 

2237 # type: () -> Optional[Any] 

2238 if not self.header_present: 

2239 self.write_header(None) 

2240 return self.f.close() 

2241 

2242 def __enter__(self): 

2243 # type: () -> GenericRawPcapWriter 

2244 return self 

2245 

2246 def __exit__(self, exc_type, exc_value, tracback): 

2247 # type: (Optional[Any], Optional[Any], Optional[Any]) -> None 

2248 self.flush() 

2249 self.close() 

2250 

2251 def write(self, pkt): 

2252 # type: (Union[_PacketIterable, bytes]) -> None 

2253 """ 

2254 Writes a Packet, a SndRcvList object, or bytes to a pcap file. 

2255 

2256 :param pkt: Packet(s) to write (one record for each Packet), or raw 

2257 bytes to write (as one record). 

2258 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet or bytes 

2259 """ 

2260 if isinstance(pkt, bytes): 

2261 if not self.header_present: 

2262 self.write_header(pkt) 

2263 self.write_packet(pkt) 

2264 else: 

2265 # Import here to avoid circular dependency 

2266 from scapy.supersocket import IterSocket 

2267 for p in IterSocket(pkt).iter: 

2268 if not self.header_present: 

2269 self.write_header(p) 

2270 

2271 if not isinstance(p, bytes) and \ 

2272 self.linktype != conf.l2types.get(type(p), None): 

2273 warning("Inconsistent linktypes detected!" 

2274 " The resulting file might contain" 

2275 " invalid packets." 

2276 ) 

2277 

2278 self.write_packet(p) 

2279 

2280 

2281class RawPcapWriter(GenericRawPcapWriter): 

2282 """A stream PCAP writer with more control than wrpcap()""" 

2283 

2284 def __init__(self, 

2285 filename, # type: Union[IO[bytes], str] 

2286 linktype=None, # type: Optional[int] 

2287 gz=False, # type: bool 

2288 endianness="", # type: str 

2289 append=False, # type: bool 

2290 sync=False, # type: bool 

2291 nano=False, # type: bool 

2292 snaplen=MTU, # type: int 

2293 bufsz=4096, # type: int 

2294 ): 

2295 # type: (...) -> None 

2296 """ 

2297 :param filename: the name of the file to write packets to, or an open, 

2298 writable file-like object. 

2299 :param linktype: force linktype to a given value. If None, linktype is 

2300 taken from the first writer packet 

2301 :param gz: compress the capture on the fly 

2302 :param endianness: force an endianness (little:"<", big:">"). 

2303 Default is native 

2304 :param append: append packets to the capture file instead of 

2305 truncating it 

2306 :param sync: do not bufferize writes to the capture file 

2307 :param nano: use nanosecond-precision (requires libpcap >= 1.5.0) 

2308 

2309 """ 

2310 

2311 if linktype: 

2312 self.linktype = linktype 

2313 self.snaplen = snaplen 

2314 self.append = append 

2315 self.gz = gz 

2316 self.endian = endianness 

2317 self.sync = sync 

2318 self.nano = nano 

2319 if sync: 

2320 bufsz = 0 

2321 

2322 if isinstance(filename, str): 

2323 self.filename = filename 

2324 if gz: 

2325 self.f = cast(_ByteStream, gzip.open( 

2326 filename, append and "ab" or "wb", 9 

2327 )) 

2328 else: 

2329 self.f = open(filename, append and "ab" or "wb", bufsz) 

2330 else: 

2331 self.f = filename 

2332 self.filename = getattr(filename, "name", "No name") 

2333 

2334 def _write_header(self, pkt): 

2335 # type: (Optional[Union[Packet, bytes]]) -> None 

2336 self.header_present = True 

2337 

2338 if self.append: 

2339 # Even if prone to race conditions, this seems to be 

2340 # safest way to tell whether the header is already present 

2341 # because we have to handle compressed streams that 

2342 # are not as flexible as basic files 

2343 if self.gz: 

2344 g = gzip.open(self.filename, "rb") # type: _ByteStream 

2345 else: 

2346 g = open(self.filename, "rb") 

2347 try: 

2348 if g.read(16): 

2349 return 

2350 finally: 

2351 g.close() 

2352 

2353 if not hasattr(self, 'linktype'): 

2354 raise ValueError( 

2355 "linktype could not be guessed. " 

2356 "Please pass a linktype while creating the writer" 

2357 ) 

2358 

2359 self.f.write(struct.pack(self.endian + "IHHIIII", 0xa1b23c4d if self.nano else 0xa1b2c3d4, # noqa: E501 

2360 2, 4, 0, 0, self.snaplen, self.linktype)) 

2361 self.f.flush() 

2362 

2363 def _write_packet(self, 

2364 packet, # type: Union[bytes, Packet] 

2365 linktype, # type: int 

2366 sec=None, # type: Optional[float] 

2367 usec=None, # type: Optional[int] 

2368 caplen=None, # type: Optional[int] 

2369 wirelen=None, # type: Optional[int] 

2370 comment=None, # type: Optional[bytes] 

2371 ifname=None, # type: Optional[bytes] 

2372 direction=None, # type: Optional[int] 

2373 ): 

2374 # type: (...) -> None 

2375 """ 

2376 Writes a single packet to the pcap file. 

2377 

2378 :param packet: bytes for a single packet 

2379 :type packet: bytes 

2380 :param linktype: linktype value associated with the packet 

2381 :type linktype: int 

2382 :param sec: time the packet was captured, in seconds since epoch. If 

2383 not supplied, defaults to now. 

2384 :type sec: float 

2385 :param usec: not used with pcapng 

2386 packet was captured 

2387 :type usec: int or long 

2388 :param caplen: The length of the packet in the capture file. If not 

2389 specified, uses ``len(packet)``. 

2390 :type caplen: int 

2391 :param wirelen: The length of the packet on the wire. If not 

2392 specified, uses ``caplen``. 

2393 :type wirelen: int 

2394 :return: None 

2395 :rtype: None 

2396 """ 

2397 if caplen is None: 

2398 caplen = len(packet) 

2399 if wirelen is None: 

2400 wirelen = caplen 

2401 if sec is None or usec is None: 

2402 t = time.time() 

2403 it = int(t) 

2404 if sec is None: 

2405 sec = it 

2406 usec = int(round((t - it) * 

2407 (1000000000 if self.nano else 1000000))) 

2408 elif usec is None: 

2409 usec = 0 

2410 

2411 self.f.write(struct.pack(self.endian + "IIII", 

2412 int(sec), usec, caplen, wirelen)) 

2413 self.f.write(bytes(packet)) 

2414 if self.sync: 

2415 self.f.flush() 

2416 

2417 

2418class RawPcapNgWriter(GenericRawPcapWriter): 

2419 """A stream pcapng writer with more control than wrpcapng()""" 

2420 

2421 def __init__(self, 

2422 filename, # type: str 

2423 ): 

2424 # type: (...) -> None 

2425 

2426 self.header_present = False 

2427 self.tsresol = 1000000 

2428 # A dict to keep if_name to IDB id mapping. 

2429 # unknown if_name(None) id=0 

2430 self.interfaces2id: Dict[Optional[bytes], int] = {None: 0} 

2431 

2432 # tcpdump only support little-endian in PCAPng files 

2433 self.endian = "<" 

2434 self.endian_magic = b"\x4d\x3c\x2b\x1a" 

2435 

2436 self.filename = filename 

2437 self.f = open(filename, "wb", 4096) 

2438 

2439 def _get_time(self, 

2440 packet, # type: Union[bytes, Packet] 

2441 sec, # type: Optional[float] 

2442 usec # type: Optional[int] 

2443 ): 

2444 # type: (...) -> Tuple[float, int] 

2445 if hasattr(packet, "time"): 

2446 if sec is None: 

2447 sec = float(packet.time) 

2448 

2449 if usec is None: 

2450 usec = 0 

2451 

2452 return sec, usec # type: ignore 

2453 

2454 def _add_padding(self, raw_data): 

2455 # type: (bytes) -> bytes 

2456 raw_data += ((-len(raw_data)) % 4) * b"\x00" 

2457 return raw_data 

2458 

2459 def build_block(self, block_type, block_body, options=None): 

2460 # type: (bytes, bytes, Optional[bytes]) -> bytes 

2461 

2462 # Pad Block Body to 32 bits 

2463 block_body = self._add_padding(block_body) 

2464 

2465 if options: 

2466 block_body += options 

2467 

2468 # An empty block is 12 bytes long 

2469 block_total_length = 12 + len(block_body) 

2470 

2471 # Block Type 

2472 block = block_type 

2473 # Block Total Length$ 

2474 block += struct.pack(self.endian + "I", block_total_length) 

2475 # Block Body 

2476 block += block_body 

2477 # Block Total Length$ 

2478 block += struct.pack(self.endian + "I", block_total_length) 

2479 

2480 return block 

2481 

2482 def _write_header(self, pkt): 

2483 # type: (Optional[Union[Packet, bytes]]) -> None 

2484 if not self.header_present: 

2485 self.header_present = True 

2486 self._write_block_shb() 

2487 self._write_block_idb(linktype=self.linktype) 

2488 

2489 def _write_block_shb(self): 

2490 # type: () -> None 

2491 

2492 # Block Type 

2493 block_type = b"\x0A\x0D\x0D\x0A" 

2494 # Byte-Order Magic 

2495 block_shb = self.endian_magic 

2496 # Major Version 

2497 block_shb += struct.pack(self.endian + "H", 1) 

2498 # Minor Version 

2499 block_shb += struct.pack(self.endian + "H", 0) 

2500 # Section Length 

2501 block_shb += struct.pack(self.endian + "q", -1) 

2502 

2503 self.f.write(self.build_block(block_type, block_shb)) 

2504 

2505 def _write_block_idb(self, 

2506 linktype, # type: int 

2507 ifname=None # type: Optional[bytes] 

2508 ): 

2509 # type: (...) -> None 

2510 

2511 # Block Type 

2512 block_type = struct.pack(self.endian + "I", 1) 

2513 # LinkType 

2514 block_idb = struct.pack(self.endian + "H", linktype) 

2515 # Reserved 

2516 block_idb += struct.pack(self.endian + "H", 0) 

2517 # SnapLen 

2518 block_idb += struct.pack(self.endian + "I", 262144) 

2519 

2520 # if_name option 

2521 opts = None 

2522 if ifname is not None: 

2523 opts = struct.pack(self.endian + "HH", 2, len(ifname)) 

2524 # Pad Option Value to 32 bits 

2525 opts += self._add_padding(ifname) 

2526 opts += struct.pack(self.endian + "HH", 0, 0) 

2527 

2528 self.f.write(self.build_block(block_type, block_idb, options=opts)) 

2529 

2530 def _write_block_spb(self, raw_pkt): 

2531 # type: (bytes) -> None 

2532 

2533 # Block Type 

2534 block_type = struct.pack(self.endian + "I", 3) 

2535 # Original Packet Length 

2536 block_spb = struct.pack(self.endian + "I", len(raw_pkt)) 

2537 # Packet Data 

2538 block_spb += raw_pkt 

2539 

2540 self.f.write(self.build_block(block_type, block_spb)) 

2541 

2542 def _write_block_epb(self, 

2543 raw_pkt, # type: bytes 

2544 ifid, # type: int 

2545 timestamp=None, # type: Optional[Union[EDecimal, float]] # noqa: E501 

2546 caplen=None, # type: Optional[int] 

2547 orglen=None, # type: Optional[int] 

2548 comment=None, # type: Optional[bytes] 

2549 flags=None, # type: Optional[int] 

2550 ): 

2551 # type: (...) -> None 

2552 

2553 if timestamp: 

2554 tmp_ts = int(timestamp * self.tsresol) 

2555 ts_high = tmp_ts >> 32 

2556 ts_low = tmp_ts & 0xFFFFFFFF 

2557 else: 

2558 ts_high = ts_low = 0 

2559 

2560 if not caplen: 

2561 caplen = len(raw_pkt) 

2562 

2563 if not orglen: 

2564 orglen = len(raw_pkt) 

2565 

2566 # Block Type 

2567 block_type = struct.pack(self.endian + "I", 6) 

2568 # Interface ID 

2569 block_epb = struct.pack(self.endian + "I", ifid) 

2570 # Timestamp (High) 

2571 block_epb += struct.pack(self.endian + "I", ts_high) 

2572 # Timestamp (Low) 

2573 block_epb += struct.pack(self.endian + "I", ts_low) 

2574 # Captured Packet Length 

2575 block_epb += struct.pack(self.endian + "I", caplen) 

2576 # Original Packet Length 

2577 block_epb += struct.pack(self.endian + "I", orglen) 

2578 # Packet Data 

2579 block_epb += raw_pkt 

2580 

2581 # Options 

2582 opts = b'' 

2583 if comment is not None: 

2584 comment = bytes_encode(comment) 

2585 opts += struct.pack(self.endian + "HH", 1, len(comment)) 

2586 # Pad Option Value to 32 bits 

2587 opts += self._add_padding(comment) 

2588 if type(flags) == int: 

2589 opts += struct.pack(self.endian + "HH", 2, 4) 

2590 opts += struct.pack(self.endian + "I", flags) 

2591 if opts: 

2592 opts += struct.pack(self.endian + "HH", 0, 0) 

2593 

2594 self.f.write(self.build_block(block_type, block_epb, 

2595 options=opts)) 

2596 

2597 def _write_packet(self, # type: ignore 

2598 packet, # type: bytes 

2599 linktype, # type: int 

2600 sec=None, # type: Optional[float] 

2601 usec=None, # type: Optional[int] 

2602 caplen=None, # type: Optional[int] 

2603 wirelen=None, # type: Optional[int] 

2604 comment=None, # type: Optional[bytes] 

2605 ifname=None, # type: Optional[bytes] 

2606 direction=None, # type: Optional[int] 

2607 ): 

2608 # type: (...) -> None 

2609 """ 

2610 Writes a single packet to the pcap file. 

2611 

2612 :param packet: bytes for a single packet 

2613 :type packet: bytes 

2614 :param linktype: linktype value associated with the packet 

2615 :type linktype: int 

2616 :param sec: time the packet was captured, in seconds since epoch. If 

2617 not supplied, defaults to now. 

2618 :type sec: float 

2619 :param caplen: The length of the packet in the capture file. If not 

2620 specified, uses ``len(packet)``. 

2621 :type caplen: int 

2622 :param wirelen: The length of the packet on the wire. If not 

2623 specified, uses ``caplen``. 

2624 :type wirelen: int 

2625 :param comment: UTF-8 string containing human-readable comment text 

2626 that is associated to the current block. Line separators 

2627 SHOULD be a carriage-return + linefeed ('\r\n') or 

2628 just linefeed ('\n'); either form may appear and 

2629 be considered a line separator. The string is not 

2630 zero-terminated. 

2631 :type bytes 

2632 :param ifname: UTF-8 string containing the 

2633 name of the device used to capture data. 

2634 The string is not zero-terminated. 

2635 :type bytes 

2636 :param direction: 0 = information not available, 

2637 1 = inbound, 

2638 2 = outbound 

2639 :type int 

2640 :return: None 

2641 :rtype: None 

2642 """ 

2643 if caplen is None: 

2644 caplen = len(packet) 

2645 if wirelen is None: 

2646 wirelen = caplen 

2647 

2648 ifid = self.interfaces2id.get(ifname, None) 

2649 if ifid is None: 

2650 ifid = max(self.interfaces2id.values()) + 1 

2651 self.interfaces2id[ifname] = ifid 

2652 self._write_block_idb(linktype=linktype, ifname=ifname) 

2653 

2654 # EPB flags (32 bits). 

2655 # currently only direction is implemented (least 2 significant bits) 

2656 if type(direction) == int: 

2657 flags = direction & 0x3 

2658 else: 

2659 flags = None 

2660 

2661 self._write_block_epb(packet, timestamp=sec, caplen=caplen, 

2662 orglen=wirelen, comment=comment, ifid=ifid, flags=flags) 

2663 if self.sync: 

2664 self.f.flush() 

2665 

2666 

2667class PcapWriter(RawPcapWriter): 

2668 """A stream PCAP writer with more control than wrpcap()""" 

2669 pass 

2670 

2671 

2672class PcapNgWriter(RawPcapNgWriter): 

2673 """A stream pcapng writer with more control than wrpcapng()""" 

2674 

2675 def _get_time(self, 

2676 packet, # type: Union[bytes, Packet] 

2677 sec, # type: Optional[float] 

2678 usec # type: Optional[int] 

2679 ): 

2680 # type: (...) -> Tuple[float, int] 

2681 if hasattr(packet, "time"): 

2682 if sec is None: 

2683 sec = float(packet.time) 

2684 

2685 if usec is None: 

2686 usec = 0 

2687 

2688 return sec, usec # type: ignore 

2689 

2690 

2691@conf.commands.register 

2692def rderf(filename, count=-1): 

2693 # type: (Union[IO[bytes], str], int) -> PacketList 

2694 """Read a ERF file and return a packet list 

2695 

2696 :param count: read only <count> packets 

2697 """ 

2698 with ERFEthernetReader(filename) as fdesc: 

2699 return fdesc.read_all(count=count) 

2700 

2701 

2702class ERFEthernetReader_metaclass(PcapReader_metaclass): 

2703 def __call__(cls, filename): 

2704 # type: (Union[IO[bytes], str]) -> Any 

2705 i = cls.__new__(cls, cls.__name__, cls.__bases__, cls.__dict__) # type: ignore 

2706 filename, fdesc = cls.open(filename) 

2707 try: 

2708 i.__init__(filename, fdesc) 

2709 return i 

2710 except (Scapy_Exception, EOFError): 

2711 pass 

2712 

2713 if "alternative" in cls.__dict__: 

2714 cls = cls.__dict__["alternative"] 

2715 i = cls.__new__( 

2716 cls, 

2717 cls.__name__, 

2718 cls.__bases__, 

2719 cls.__dict__ # type: ignore 

2720 ) 

2721 try: 

2722 i.__init__(filename, fdesc) 

2723 return i 

2724 except (Scapy_Exception, EOFError): 

2725 pass 

2726 

2727 raise Scapy_Exception("Not a supported capture file") 

2728 

2729 @staticmethod 

2730 def open(fname # type: ignore 

2731 ): 

2732 # type: (...) -> Tuple[str, _ByteStream] 

2733 """Open (if necessary) filename""" 

2734 if isinstance(fname, str): 

2735 filename = fname 

2736 try: 

2737 with gzip.open(filename, "rb") as tmp: 

2738 tmp.read(1) 

2739 fdesc = gzip.open(filename, "rb") # type: _ByteStream 

2740 except IOError: 

2741 fdesc = open(filename, "rb") 

2742 

2743 else: 

2744 fdesc = fname 

2745 filename = getattr(fdesc, "name", "No name") 

2746 return filename, fdesc 

2747 

2748 

2749class ERFEthernetReader(PcapReader, 

2750 metaclass=ERFEthernetReader_metaclass): 

2751 

2752 def __init__(self, filename, fdesc=None): # type: ignore 

2753 # type: (Union[IO[bytes], str], IO[bytes]) -> None 

2754 self.filename = filename # type: ignore 

2755 self.f = fdesc 

2756 self.power = Decimal(10) ** Decimal(-9) 

2757 

2758 # time is in 64-bits Endace's format which can be see here: 

2759 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2760 def _convert_erf_timestamp(self, t): 

2761 # type: (int) -> EDecimal 

2762 sec = t >> 32 

2763 frac_sec = t & 0xffffffff 

2764 frac_sec *= 10**9 

2765 frac_sec += (frac_sec & 0x80000000) << 1 

2766 frac_sec >>= 32 

2767 return EDecimal(sec + self.power * frac_sec) 

2768 

2769 # The details of ERF Packet format can be see here: 

2770 # https://www.endace.com/erf-extensible-record-format-types.pdf 

2771 def read_packet(self, size=MTU, **kwargs): 

2772 # type: (int, **Any) -> Packet 

2773 

2774 # General ERF Header have exactly 16 bytes 

2775 hdr = self.f.read(16) 

2776 if len(hdr) < 16: 

2777 raise EOFError 

2778 

2779 # The timestamp is in little-endian byte-order. 

2780 time = struct.unpack('<Q', hdr[:8])[0] 

2781 # The rest is in big-endian byte-order. 

2782 # Ignoring flags and lctr (loss counter) since they are ERF specific 

2783 # header fields which Packet object does not support. 

2784 type, _, rlen, _, wlen = struct.unpack('>BBHHH', hdr[8:]) 

2785 # Check if the type != 0x02, type Ethernet 

2786 if type & 0x02 == 0: 

2787 raise Scapy_Exception("Invalid ERF Type (Not TYPE_ETH)") 

2788 

2789 # If there are extended headers, ignore it because Packet object does 

2790 # not support it. Extended headers size is 8 bytes before the payload. 

2791 if type & 0x80: 

2792 _ = self.f.read(8) 

2793 s = self.f.read(rlen - 24) 

2794 else: 

2795 s = self.f.read(rlen - 16) 

2796 

2797 # Ethernet has 2 bytes of padding containing `offset` and `pad`. Both 

2798 # of the fields are disregarded by Endace. 

2799 pb = s[2:size] 

2800 from scapy.layers.l2 import Ether 

2801 try: 

2802 p = Ether(pb, **kwargs) # type: Packet 

2803 except KeyboardInterrupt: 

2804 raise 

2805 except Exception: 

2806 if conf.debug_dissector: 

2807 from scapy.sendrecv import debug 

2808 debug.crashed_on = (Ether, s) 

2809 raise 

2810 if conf.raw_layer is None: 

2811 # conf.raw_layer is set on import 

2812 import scapy.packet # noqa: F401 

2813 p = conf.raw_layer(s) 

2814 

2815 p.time = self._convert_erf_timestamp(time) 

2816 p.wirelen = wlen 

2817 

2818 return p 

2819 

2820 

2821@conf.commands.register 

2822def wrerf(filename, # type: Union[IO[bytes], str] 

2823 pkt, # type: _PacketIterable 

2824 *args, # type: Any 

2825 **kargs # type: Any 

2826 ): 

2827 # type: (...) -> None 

2828 """Write a list of packets to a ERF file 

2829 

2830 :param filename: the name of the file to write packets to, or an open, 

2831 writable file-like object. The file descriptor will be 

2832 closed at the end of the call, so do not use an object you 

2833 do not want to close (e.g., running wrerf(sys.stdout, []) 

2834 in interactive mode will crash Scapy). 

2835 :param gz: set to 1 to save a gzipped capture 

2836 :param append: append packets to the capture file instead of 

2837 truncating it 

2838 :param sync: do not bufferize writes to the capture file 

2839 """ 

2840 with ERFEthernetWriter(filename, *args, **kargs) as fdesc: 

2841 fdesc.write(pkt) 

2842 

2843 

2844class ERFEthernetWriter(PcapWriter): 

2845 """A stream ERF Ethernet writer with more control than wrerf()""" 

2846 

2847 def __init__(self, 

2848 filename, # type: Union[IO[bytes], str] 

2849 gz=False, # type: bool 

2850 append=False, # type: bool 

2851 sync=False, # type: bool 

2852 ): 

2853 # type: (...) -> None 

2854 """ 

2855 :param filename: the name of the file to write packets to, or an open, 

2856 writable file-like object. 

2857 :param gz: compress the capture on the fly 

2858 :param append: append packets to the capture file instead of 

2859 truncating it 

2860 :param sync: do not bufferize writes to the capture file 

2861 """ 

2862 super(ERFEthernetWriter, self).__init__(filename, 

2863 gz=gz, 

2864 append=append, 

2865 sync=sync) 

2866 

2867 def write(self, pkt): # type: ignore 

2868 # type: (_PacketIterable) -> None 

2869 """ 

2870 Writes a Packet, a SndRcvList object, or bytes to a ERF file. 

2871 

2872 :param pkt: Packet(s) to write (one record for each Packet) 

2873 :type pkt: iterable[scapy.packet.Packet], scapy.packet.Packet 

2874 """ 

2875 # Import here to avoid circular dependency 

2876 from scapy.supersocket import IterSocket 

2877 for p in IterSocket(pkt).iter: 

2878 self.write_packet(p) 

2879 

2880 def write_packet(self, pkt): # type: ignore 

2881 # type: (Packet) -> None 

2882 

2883 if hasattr(pkt, "time"): 

2884 sec = int(pkt.time) 

2885 usec = int((int(round((pkt.time - sec) * 10**9)) << 32) / 10**9) 

2886 t = (sec << 32) + usec 

2887 else: 

2888 t = int(time.time()) << 32 

2889 

2890 # There are 16 bytes of headers + 2 bytes of padding before the packets 

2891 # payload. 

2892 rlen = len(pkt) + 18 

2893 

2894 if hasattr(pkt, "wirelen"): 

2895 wirelen = pkt.wirelen 

2896 if wirelen is None: 

2897 wirelen = rlen 

2898 

2899 self.f.write(struct.pack("<Q", t)) 

2900 self.f.write(struct.pack(">BBHHHH", 2, 0, rlen, 0, wirelen, 0)) 

2901 self.f.write(bytes(pkt)) 

2902 self.f.flush() 

2903 

2904 def close(self): 

2905 # type: () -> Optional[Any] 

2906 return self.f.close() 

2907 

2908 

2909@conf.commands.register 

2910def import_hexcap(input_string=None): 

2911 # type: (Optional[str]) -> bytes 

2912 """Imports a tcpdump like hexadecimal view 

2913 

2914 e.g: exported via hexdump() or tcpdump or wireshark's "export as hex" 

2915 

2916 :param input_string: String containing the hexdump input to parse. If None, 

2917 read from standard input. 

2918 """ 

2919 re_extract_hexcap = re.compile(r"^((0x)?[0-9a-fA-F]{2,}[ :\t]{,3}|) *(([0-9a-fA-F]{2} {,2}){,16})") # noqa: E501 

2920 p = "" 

2921 try: 

2922 if input_string: 

2923 input_function = StringIO(input_string).readline 

2924 else: 

2925 input_function = input 

2926 while True: 

2927 line = input_function().strip() 

2928 if not line: 

2929 break 

2930 try: 

2931 p += re_extract_hexcap.match(line).groups()[2] # type: ignore 

2932 except Exception: 

2933 warning("Parsing error during hexcap") 

2934 continue 

2935 except EOFError: 

2936 pass 

2937 

2938 p = p.replace(" ", "") 

2939 return hex_bytes(p) 

2940 

2941 

2942@conf.commands.register 

2943def wireshark(pktlist, wait=False, **kwargs): 

2944 # type: (List[Packet], bool, **Any) -> Optional[Any] 

2945 """ 

2946 Runs Wireshark on a list of packets. 

2947 

2948 See :func:`tcpdump` for more parameter description. 

2949 

2950 Note: this defaults to wait=False, to run Wireshark in the background. 

2951 """ 

2952 return tcpdump(pktlist, prog=conf.prog.wireshark, wait=wait, **kwargs) 

2953 

2954 

2955@conf.commands.register 

2956def tdecode( 

2957 pktlist, # type: Union[IO[bytes], None, str, _PacketIterable] 

2958 args=None, # type: Optional[List[str]] 

2959 **kwargs # type: Any 

2960): 

2961 # type: (...) -> Any 

2962 """ 

2963 Run tshark on a list of packets. 

2964 

2965 :param args: If not specified, defaults to ``tshark -V``. 

2966 

2967 See :func:`tcpdump` for more parameters. 

2968 """ 

2969 if args is None: 

2970 args = ["-V"] 

2971 return tcpdump(pktlist, prog=conf.prog.tshark, args=args, **kwargs) 

2972 

2973 

2974def _guess_linktype_name(value): 

2975 # type: (int) -> str 

2976 """Guess the DLT name from its value.""" 

2977 from scapy.libs.winpcapy import pcap_datalink_val_to_name 

2978 return cast(bytes, pcap_datalink_val_to_name(value)).decode() 

2979 

2980 

2981def _guess_linktype_value(name): 

2982 # type: (str) -> int 

2983 """Guess the value of a DLT name.""" 

2984 from scapy.libs.winpcapy import pcap_datalink_name_to_val 

2985 val = cast(int, pcap_datalink_name_to_val(name.encode())) 

2986 if val == -1: 

2987 warning("Unknown linktype: %s. Using EN10MB", name) 

2988 return DLT_EN10MB 

2989 return val 

2990 

2991 

2992@conf.commands.register 

2993def tcpdump( 

2994 pktlist=None, # type: Union[IO[bytes], None, str, _PacketIterable] 

2995 dump=False, # type: bool 

2996 getfd=False, # type: bool 

2997 args=None, # type: Optional[List[str]] 

2998 flt=None, # type: Optional[str] 

2999 prog=None, # type: Optional[Any] 

3000 getproc=False, # type: bool 

3001 quiet=False, # type: bool 

3002 use_tempfile=None, # type: Optional[Any] 

3003 read_stdin_opts=None, # type: Optional[Any] 

3004 linktype=None, # type: Optional[Any] 

3005 wait=True, # type: bool 

3006 _suppress=False # type: bool 

3007): 

3008 # type: (...) -> Any 

3009 """Run tcpdump or tshark on a list of packets. 

3010 

3011 When using ``tcpdump`` on OSX (``prog == conf.prog.tcpdump``), this uses a 

3012 temporary file to store the packets. This works around a bug in Apple's 

3013 version of ``tcpdump``: http://apple.stackexchange.com/questions/152682/ 

3014 

3015 Otherwise, the packets are passed in stdin. 

3016 

3017 This function can be explicitly enabled or disabled with the 

3018 ``use_tempfile`` parameter. 

3019 

3020 When using ``wireshark``, it will be called with ``-ki -`` to start 

3021 immediately capturing packets from stdin. 

3022 

3023 Otherwise, the command will be run with ``-r -`` (which is correct for 

3024 ``tcpdump`` and ``tshark``). 

3025 

3026 This can be overridden with ``read_stdin_opts``. This has no effect when 

3027 ``use_tempfile=True``, or otherwise reading packets from a regular file. 

3028 

3029 :param pktlist: a Packet instance, a PacketList instance or a list of 

3030 Packet instances. Can also be a filename (as a string), an open 

3031 file-like object that must be a file format readable by 

3032 tshark (Pcap, PcapNg, etc.) or None (to sniff) 

3033 :param flt: a filter to use with tcpdump 

3034 :param dump: when set to True, returns a string instead of displaying it. 

3035 :param getfd: when set to True, returns a file-like object to read data 

3036 from tcpdump or tshark from. 

3037 :param getproc: when set to True, the subprocess.Popen object is returned 

3038 :param args: arguments (as a list) to pass to tshark (example for tshark: 

3039 args=["-T", "json"]). 

3040 :param prog: program to use (defaults to tcpdump, will work with tshark) 

3041 :param quiet: when set to True, the process stderr is discarded 

3042 :param use_tempfile: When set to True, always use a temporary file to store 

3043 packets. 

3044 When set to False, pipe packets through stdin. 

3045 When set to None (default), only use a temporary file with 

3046 ``tcpdump`` on OSX. 

3047 :param read_stdin_opts: When set, a list of arguments needed to capture 

3048 from stdin. Otherwise, attempts to guess. 

3049 :param linktype: A custom DLT value or name, to overwrite the default 

3050 values. 

3051 :param wait: If True (default), waits for the process to terminate before 

3052 returning to Scapy. If False, the process will be detached to the 

3053 background. If dump, getproc or getfd is True, these have the same 

3054 effect as ``wait=False``. 

3055 

3056 Examples:: 

3057 

3058 >>> tcpdump([IP()/TCP(), IP()/UDP()]) 

3059 reading from file -, link-type RAW (Raw IP) 

3060 16:46:00.474515 IP 127.0.0.1.20 > 127.0.0.1.80: Flags [S], seq 0, win 8192, length 0 # noqa: E501 

3061 16:46:00.475019 IP 127.0.0.1.53 > 127.0.0.1.53: [|domain] 

3062 

3063 >>> tcpdump([IP()/TCP(), IP()/UDP()], prog=conf.prog.tshark) 

3064 1 0.000000 127.0.0.1 -> 127.0.0.1 TCP 40 20->80 [SYN] Seq=0 Win=8192 Len=0 # noqa: E501 

3065 2 0.000459 127.0.0.1 -> 127.0.0.1 UDP 28 53->53 Len=0 

3066 

3067 To get a JSON representation of a tshark-parsed PacketList(), one can:: 

3068 

3069 >>> import json, pprint 

3070 >>> json_data = json.load(tcpdump(IP(src="217.25.178.5", 

3071 ... dst="45.33.32.156"), 

3072 ... prog=conf.prog.tshark, 

3073 ... args=["-T", "json"], 

3074 ... getfd=True)) 

3075 >>> pprint.pprint(json_data) 

3076 [{u'_index': u'packets-2016-12-23', 

3077 u'_score': None, 

3078 u'_source': {u'layers': {u'frame': {u'frame.cap_len': u'20', 

3079 u'frame.encap_type': u'7', 

3080 [...] 

3081 }, 

3082 u'ip': {u'ip.addr': u'45.33.32.156', 

3083 u'ip.checksum': u'0x0000a20d', 

3084 [...] 

3085 u'ip.ttl': u'64', 

3086 u'ip.version': u'4'}, 

3087 u'raw': u'Raw packet data'}}, 

3088 u'_type': u'pcap_file'}] 

3089 >>> json_data[0]['_source']['layers']['ip']['ip.ttl'] 

3090 u'64' 

3091 """ 

3092 getfd = getfd or getproc 

3093 if prog is None: 

3094 if not conf.prog.tcpdump: 

3095 raise Scapy_Exception( 

3096 "tcpdump is not available" 

3097 ) 

3098 prog = [conf.prog.tcpdump] 

3099 elif isinstance(prog, str): 

3100 prog = [prog] 

3101 else: 

3102 raise ValueError("prog must be a string") 

3103 

3104 if linktype is not None: 

3105 if isinstance(linktype, int): 

3106 # Guess name from value 

3107 try: 

3108 linktype_name = _guess_linktype_name(linktype) 

3109 except StopIteration: 

3110 linktype = -1 

3111 else: 

3112 # Guess value from name 

3113 if linktype.startswith("DLT_"): 

3114 linktype = linktype[4:] 

3115 linktype_name = linktype 

3116 try: 

3117 linktype = _guess_linktype_value(linktype) 

3118 except KeyError: 

3119 linktype = -1 

3120 if linktype == -1: 

3121 raise ValueError( 

3122 "Unknown linktype. Try passing its datalink name instead" 

3123 ) 

3124 prog += ["-y", linktype_name] 

3125 

3126 # Build Popen arguments 

3127 if args is None: 

3128 args = [] 

3129 else: 

3130 # Make a copy of args 

3131 args = list(args) 

3132 

3133 if flt is not None: 

3134 # Check the validity of the filter 

3135 if linktype is None and isinstance(pktlist, str): 

3136 # linktype is unknown but required. Read it from file 

3137 with PcapReader(pktlist) as rd: 

3138 if isinstance(rd, PcapNgReader): 

3139 # Get the linktype from the first packet 

3140 try: 

3141 _, metadata = rd._read_packet() 

3142 linktype = metadata.linktype 

3143 if OPENBSD and linktype == 228: 

3144 linktype = DLT_RAW 

3145 except EOFError: 

3146 raise ValueError( 

3147 "Cannot get linktype from a PcapNg packet." 

3148 ) 

3149 else: 

3150 linktype = rd.linktype 

3151 from scapy.arch.common import compile_filter 

3152 compile_filter(flt, linktype=linktype) 

3153 args.append(flt) 

3154 

3155 stdout = subprocess.PIPE if dump or getfd else None 

3156 stderr = open(os.devnull) if quiet else None 

3157 proc = None 

3158 

3159 if use_tempfile is None: 

3160 # Apple's tcpdump cannot read from stdin, see: 

3161 # http://apple.stackexchange.com/questions/152682/ 

3162 use_tempfile = DARWIN and prog[0] == conf.prog.tcpdump 

3163 

3164 if read_stdin_opts is None: 

3165 if prog[0] == conf.prog.wireshark: 

3166 # Start capturing immediately (-k) from stdin (-i -) 

3167 read_stdin_opts = ["-ki", "-"] 

3168 elif prog[0] == conf.prog.tcpdump and not OPENBSD: 

3169 # Capture in packet-buffered mode (-U) from stdin (-r -) 

3170 read_stdin_opts = ["-U", "-r", "-"] 

3171 else: 

3172 read_stdin_opts = ["-r", "-"] 

3173 else: 

3174 # Make a copy of read_stdin_opts 

3175 read_stdin_opts = list(read_stdin_opts) 

3176 

3177 if pktlist is None: 

3178 # sniff 

3179 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3180 proc = subprocess.Popen( 

3181 prog + args, 

3182 stdout=stdout, 

3183 stderr=stderr, 

3184 ) 

3185 elif isinstance(pktlist, str): 

3186 # file 

3187 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3188 proc = subprocess.Popen( 

3189 prog + ["-r", pktlist] + args, 

3190 stdout=stdout, 

3191 stderr=stderr, 

3192 ) 

3193 elif use_tempfile: 

3194 tmpfile = get_temp_file( # type: ignore 

3195 autoext=".pcap", 

3196 fd=True 

3197 ) # type: IO[bytes] 

3198 try: 

3199 tmpfile.writelines( 

3200 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3201 ) 

3202 except AttributeError: 

3203 pktlist = cast("_PacketIterable", pktlist) 

3204 wrpcap(tmpfile, pktlist, linktype=linktype) 

3205 else: 

3206 tmpfile.close() 

3207 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3208 proc = subprocess.Popen( 

3209 prog + ["-r", tmpfile.name] + args, 

3210 stdout=stdout, 

3211 stderr=stderr, 

3212 ) 

3213 else: 

3214 try: 

3215 pktlist.fileno() # type: ignore 

3216 # pass the packet stream 

3217 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3218 proc = subprocess.Popen( 

3219 prog + read_stdin_opts + args, 

3220 stdin=pktlist, # type: ignore 

3221 stdout=stdout, 

3222 stderr=stderr, 

3223 ) 

3224 except (AttributeError, ValueError): 

3225 # write the packet stream to stdin 

3226 with ContextManagerSubprocess(prog[0], suppress=_suppress): 

3227 proc = subprocess.Popen( 

3228 prog + read_stdin_opts + args, 

3229 stdin=subprocess.PIPE, 

3230 stdout=stdout, 

3231 stderr=stderr, 

3232 ) 

3233 if proc is None: 

3234 # An error has occurred 

3235 return 

3236 try: 

3237 proc.stdin.writelines( # type: ignore 

3238 iter(lambda: pktlist.read(1048576), b"") # type: ignore 

3239 ) 

3240 except AttributeError: 

3241 wrpcap(proc.stdin, pktlist, linktype=linktype) # type: ignore 

3242 except UnboundLocalError: 

3243 # The error was handled by ContextManagerSubprocess 

3244 pass 

3245 else: 

3246 proc.stdin.close() # type: ignore 

3247 if proc is None: 

3248 # An error has occurred 

3249 return 

3250 if dump: 

3251 data = b"".join( 

3252 iter(lambda: proc.stdout.read(1048576), b"") # type: ignore 

3253 ) 

3254 proc.terminate() 

3255 return data 

3256 if getproc: 

3257 return proc 

3258 if getfd: 

3259 return proc.stdout 

3260 if wait: 

3261 proc.wait() 

3262 

3263 

3264@conf.commands.register 

3265def hexedit(pktlist): 

3266 # type: (_PacketIterable) -> PacketList 

3267 """Run hexedit on a list of packets, then return the edited packets.""" 

3268 f = get_temp_file() 

3269 wrpcap(f, pktlist) 

3270 with ContextManagerSubprocess(conf.prog.hexedit): 

3271 subprocess.call([conf.prog.hexedit, f]) 

3272 rpktlist = rdpcap(f) 

3273 os.unlink(f) 

3274 return rpktlist 

3275 

3276 

3277def get_terminal_width(): 

3278 # type: () -> Optional[int] 

3279 """Get terminal width (number of characters) if in a window. 

3280 

3281 Notice: this will try several methods in order to 

3282 support as many terminals and OS as possible. 

3283 """ 

3284 sizex = shutil.get_terminal_size(fallback=(0, 0))[0] 

3285 if sizex != 0: 

3286 return sizex 

3287 # Backups 

3288 if WINDOWS: 

3289 from ctypes import windll, create_string_buffer 

3290 # http://code.activestate.com/recipes/440694-determine-size-of-console-window-on-windows/ 

3291 h = windll.kernel32.GetStdHandle(-12) 

3292 csbi = create_string_buffer(22) 

3293 res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) 

3294 if res: 

3295 (bufx, bufy, curx, cury, wattr, 

3296 left, top, right, bottom, maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) # noqa: E501 

3297 sizex = right - left + 1 

3298 # sizey = bottom - top + 1 

3299 return sizex 

3300 return sizex 

3301 # We have various methods 

3302 # COLUMNS is set on some terminals 

3303 try: 

3304 sizex = int(os.environ['COLUMNS']) 

3305 except Exception: 

3306 pass 

3307 if sizex: 

3308 return sizex 

3309 # We can query TIOCGWINSZ 

3310 try: 

3311 import fcntl 

3312 import termios 

3313 s = struct.pack('HHHH', 0, 0, 0, 0) 

3314 x = fcntl.ioctl(1, termios.TIOCGWINSZ, s) 

3315 sizex = struct.unpack('HHHH', x)[1] 

3316 except (IOError, ModuleNotFoundError): 

3317 # If everything failed, return default terminal size 

3318 sizex = 79 

3319 return sizex 

3320 

3321 

3322def pretty_list(rtlst, # type: List[Tuple[Union[str, List[str]], ...]] 

3323 header, # type: List[Tuple[str, ...]] 

3324 sortBy=0, # type: Optional[int] 

3325 borders=False, # type: bool 

3326 ): 

3327 # type: (...) -> str 

3328 """ 

3329 Pretty list to fit the terminal, and add header. 

3330 

3331 :param rtlst: a list of tuples. each tuple contains a value which can 

3332 be either a string or a list of string. 

3333 :param sortBy: the column id (starting with 0) which will be used for 

3334 ordering 

3335 :param borders: whether to put borders on the table or not 

3336 """ 

3337 if borders: 

3338 _space = "|" 

3339 else: 

3340 _space = " " 

3341 cols = len(header[0]) 

3342 # Windows has a fat terminal border 

3343 _spacelen = len(_space) * (cols - 1) + int(WINDOWS) 

3344 _croped = False 

3345 if sortBy is not None: 

3346 # Sort correctly 

3347 rtlst.sort(key=lambda x: x[sortBy]) 

3348 # Resolve multi-values 

3349 for i, line in enumerate(rtlst): 

3350 ids = [] # type: List[int] 

3351 values = [] # type: List[Union[str, List[str]]] 

3352 for j, val in enumerate(line): 

3353 if isinstance(val, list): 

3354 ids.append(j) 

3355 values.append(val or " ") 

3356 if values: 

3357 del rtlst[i] 

3358 k = 0 

3359 for ex_vals in zip_longest(*values, fillvalue=" "): 

3360 if k: 

3361 extra_line = [" "] * cols 

3362 else: 

3363 extra_line = list(line) # type: ignore 

3364 for j, h in enumerate(ids): 

3365 extra_line[h] = ex_vals[j] 

3366 rtlst.insert(i + k, tuple(extra_line)) 

3367 k += 1 

3368 rtslst = cast(List[Tuple[str, ...]], rtlst) 

3369 # Append tag 

3370 rtslst = header + rtslst 

3371 # Detect column's width 

3372 colwidth = [max(len(y) for y in x) for x in zip(*rtslst)] 

3373 # Make text fit in box (if required) 

3374 width = get_terminal_width() 

3375 if conf.auto_crop_tables and width: 

3376 width = width - _spacelen 

3377 while sum(colwidth) > width: 

3378 _croped = True 

3379 # Needs to be cropped 

3380 # Get the longest row 

3381 i = colwidth.index(max(colwidth)) 

3382 # Get all elements of this row 

3383 row = [len(x[i]) for x in rtslst] 

3384 # Get biggest element of this row: biggest of the array 

3385 j = row.index(max(row)) 

3386 # Re-build column tuple with the edited element 

3387 t = list(rtslst[j]) 

3388 t[i] = t[i][:-2] + "_" 

3389 rtslst[j] = tuple(t) 

3390 # Update max size 

3391 row[j] = len(t[i]) 

3392 colwidth[i] = max(row) 

3393 if _croped: 

3394 log_runtime.info("Table cropped to fit the terminal (conf.auto_crop_tables==True)") # noqa: E501 

3395 # Generate padding scheme 

3396 fmt = _space.join(["%%-%ds" % x for x in colwidth]) 

3397 # Append separation line if needed 

3398 if borders: 

3399 rtslst.insert(1, tuple("-" * x for x in colwidth)) 

3400 # Compile 

3401 return "\n".join(fmt % x for x in rtslst) 

3402 

3403 

3404def human_size(x, fmt=".1f"): 

3405 # type: (int, str) -> str 

3406 """ 

3407 Convert a size in octets to a human string representation 

3408 """ 

3409 units = ['K', 'M', 'G', 'T', 'P', 'E'] 

3410 if not x: 

3411 return "0B" 

3412 i = int(math.log(x, 2**10)) 

3413 if i and i < len(units): 

3414 return format(x / 2**(10 * i), fmt) + units[i - 1] 

3415 return str(x) + "B" 

3416 

3417 

3418def __make_table( 

3419 yfmtfunc, # type: Callable[[int], str] 

3420 fmtfunc, # type: Callable[[int], str] 

3421 endline, # type: str 

3422 data, # type: List[Tuple[Packet, Packet]] 

3423 fxyz, # type: Callable[[Packet, Packet], Tuple[Any, Any, Any]] 

3424 sortx=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3425 sorty=None, # type: Optional[Callable[[str], Tuple[Any, ...]]] 

3426 seplinefunc=None, # type: Optional[Callable[[int, List[int]], str]] 

3427 dump=False # type: bool 

3428): 

3429 # type: (...) -> Optional[str] 

3430 """Core function of the make_table suite, which generates the table""" 

3431 vx = {} # type: Dict[str, int] 

3432 vy = {} # type: Dict[str, Optional[int]] 

3433 vz = {} # type: Dict[Tuple[str, str], str] 

3434 vxf = {} # type: Dict[str, str] 

3435 

3436 tmp_len = 0 

3437 for e in data: 

3438 xx, yy, zz = [str(s) for s in fxyz(*e)] 

3439 tmp_len = max(len(yy), tmp_len) 

3440 vx[xx] = max(vx.get(xx, 0), len(xx), len(zz)) 

3441 vy[yy] = None 

3442 vz[(xx, yy)] = zz 

3443 

3444 vxk = list(vx) 

3445 vyk = list(vy) 

3446 if sortx: 

3447 vxk.sort(key=sortx) 

3448 else: 

3449 try: 

3450 vxk.sort(key=int) 

3451 except Exception: 

3452 try: 

3453 vxk.sort(key=atol) 

3454 except Exception: 

3455 vxk.sort() 

3456 if sorty: 

3457 vyk.sort(key=sorty) 

3458 else: 

3459 try: 

3460 vyk.sort(key=int) 

3461 except Exception: 

3462 try: 

3463 vyk.sort(key=atol) 

3464 except Exception: 

3465 vyk.sort() 

3466 

3467 s = "" 

3468 if seplinefunc: 

3469 sepline = seplinefunc(tmp_len, [vx[x] for x in vxk]) 

3470 s += sepline + "\n" 

3471 

3472 fmt = yfmtfunc(tmp_len) 

3473 s += fmt % "" 

3474 s += ' ' 

3475 for x in vxk: 

3476 vxf[x] = fmtfunc(vx[x]) 

3477 s += vxf[x] % x 

3478 s += ' ' 

3479 s += endline + "\n" 

3480 if seplinefunc: 

3481 s += sepline + "\n" 

3482 for y in vyk: 

3483 s += fmt % y 

3484 s += ' ' 

3485 for x in vxk: 

3486 s += vxf[x] % vz.get((x, y), "-") 

3487 s += ' ' 

3488 s += endline + "\n" 

3489 if seplinefunc: 

3490 s += sepline + "\n" 

3491 

3492 if dump: 

3493 return s 

3494 else: 

3495 print(s, end="") 

3496 return None 

3497 

3498 

3499def make_table(*args, **kargs): 

3500 # type: (*Any, **Any) -> Optional[Any] 

3501 return __make_table( 

3502 lambda l: "%%-%is" % l, 

3503 lambda l: "%%-%is" % l, 

3504 "", 

3505 *args, 

3506 **kargs 

3507 ) 

3508 

3509 

3510def make_lined_table(*args, **kargs): 

3511 # type: (*Any, **Any) -> Optional[str] 

3512 return __make_table( # type: ignore 

3513 lambda l: "%%-%is |" % l, 

3514 lambda l: "%%-%is |" % l, 

3515 "", 

3516 *args, 

3517 seplinefunc=lambda a, x: "+".join( 

3518 '-' * (y + 2) for y in [a - 1] + x + [-2] 

3519 ), 

3520 **kargs 

3521 ) 

3522 

3523 

3524def make_tex_table(*args, **kargs): 

3525 # type: (*Any, **Any) -> Optional[str] 

3526 return __make_table( # type: ignore 

3527 lambda l: "%s", 

3528 lambda l: "& %s", 

3529 "\\\\", 

3530 *args, 

3531 seplinefunc=lambda a, x: "\\hline", 

3532 **kargs 

3533 ) 

3534 

3535#################### 

3536# WHOIS CLIENT # 

3537#################### 

3538 

3539 

3540def whois(ip_address): 

3541 # type: (str) -> bytes 

3542 """Whois client for Python""" 

3543 whois_ip = str(ip_address) 

3544 try: 

3545 query = socket.gethostbyname(whois_ip) 

3546 except Exception: 

3547 query = whois_ip 

3548 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

3549 s.connect(("whois.ripe.net", 43)) 

3550 s.send(query.encode("utf8") + b"\r\n") 

3551 answer = b"" 

3552 while True: 

3553 d = s.recv(4096) 

3554 answer += d 

3555 if not d: 

3556 break 

3557 s.close() 

3558 ignore_tag = b"remarks:" 

3559 # ignore all lines starting with the ignore_tag 

3560 lines = [line for line in answer.split(b"\n") if not line or (line and not line.startswith(ignore_tag))] # noqa: E501 

3561 # remove empty lines at the bottom 

3562 for i in range(1, len(lines)): 

3563 if not lines[-i].strip(): 

3564 del lines[-i] 

3565 else: 

3566 break 

3567 return b"\n".join(lines[3:]) 

3568 

3569#################### 

3570# CLI utils # 

3571#################### 

3572 

3573 

3574class _CLIUtilMetaclass(type): 

3575 class TYPE(enum.Enum): 

3576 COMMAND = 0 

3577 OUTPUT = 1 

3578 COMPLETE = 2 

3579 

3580 def __new__(cls, # type: Type[_CLIUtilMetaclass] 

3581 name, # type: str 

3582 bases, # type: Tuple[type, ...] 

3583 dct # type: Dict[str, Any] 

3584 ): 

3585 # type: (...) -> Type[CLIUtil] 

3586 dct["commands"] = { 

3587 x.__name__: x 

3588 for x in dct.values() 

3589 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMMAND 

3590 } 

3591 dct["commands_output"] = { 

3592 x.cliutil_ref.__name__: x 

3593 for x in dct.values() 

3594 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.OUTPUT 

3595 } 

3596 dct["commands_complete"] = { 

3597 x.cliutil_ref.__name__: x 

3598 for x in dct.values() 

3599 if getattr(x, "cliutil_type", None) == _CLIUtilMetaclass.TYPE.COMPLETE 

3600 } 

3601 newcls = cast(Type['CLIUtil'], type.__new__(cls, name, bases, dct)) 

3602 return newcls 

3603 

3604 

3605class CLIUtil(metaclass=_CLIUtilMetaclass): 

3606 """ 

3607 Provides a Util class to easily create simple CLI tools in Scapy, 

3608 that can still be used as an API. 

3609 

3610 Doc: 

3611 - override the ps1() function 

3612 - register commands with the @CLIUtil.addcomment decorator 

3613 - call the loop() function when ready 

3614 """ 

3615 

3616 def _depcheck(self) -> None: 

3617 """ 

3618 Check that all dependencies are installed 

3619 """ 

3620 try: 

3621 import prompt_toolkit # noqa: F401 

3622 except ImportError: 

3623 # okay we lie but prompt_toolkit is a dependency... 

3624 raise ImportError("You need to have IPython installed to use the CLI") 

3625 

3626 # Okay let's do nice code 

3627 commands: Dict[str, Callable[..., Any]] = {} 

3628 # print output of command 

3629 commands_output: Dict[str, Callable[..., str]] = {} 

3630 # provides completion to command 

3631 commands_complete: Dict[str, Callable[..., List[str]]] = {} 

3632 

3633 def __init__(self, cli: bool = True, debug: bool = False) -> None: 

3634 """ 

3635 DEV: overwrite 

3636 """ 

3637 if cli: 

3638 self._depcheck() 

3639 self.loop(debug=debug) 

3640 

3641 @staticmethod 

3642 def _inspectkwargs(func: DecoratorCallable) -> None: 

3643 """ 

3644 Internal function to parse arguments from the kwargs of the functions 

3645 """ 

3646 func._flagnames = [ # type: ignore 

3647 x.name for x in 

3648 inspect.signature(func).parameters.values() 

3649 if x.kind == inspect.Parameter.KEYWORD_ONLY 

3650 ] 

3651 func._flags = [ # type: ignore 

3652 ("-%s" % x) if len(x) == 1 else ("--%s" % x) 

3653 for x in func._flagnames # type: ignore 

3654 ] 

3655 

3656 @staticmethod 

3657 def _parsekwargs( 

3658 func: DecoratorCallable, 

3659 args: List[str] 

3660 ) -> Tuple[List[str], Dict[str, Literal[True]]]: 

3661 """ 

3662 Internal function to parse CLI arguments of a function. 

3663 """ 

3664 kwargs: Dict[str, Literal[True]] = {} 

3665 if func._flags: # type: ignore 

3666 i = 0 

3667 for arg in args: 

3668 if arg in func._flags: # type: ignore 

3669 i += 1 

3670 kwargs[func._flagnames[func._flags.index(arg)]] = True # type: ignore # noqa: E501 

3671 continue 

3672 break 

3673 args = args[i:] 

3674 return args, kwargs 

3675 

3676 @classmethod 

3677 def _parseallargs( 

3678 cls, 

3679 func: DecoratorCallable, 

3680 cmd: str, args: List[str] 

3681 ) -> Tuple[List[str], Dict[str, Literal[True]], Dict[str, Literal[True]]]: 

3682 """ 

3683 Internal function to parse CLI arguments of both the function 

3684 and its output function. 

3685 """ 

3686 args, kwargs = cls._parsekwargs(func, args) 

3687 outkwargs: Dict[str, Literal[True]] = {} 

3688 if cmd in cls.commands_output: 

3689 args, outkwargs = cls._parsekwargs(cls.commands_output[cmd], args) 

3690 return args, kwargs, outkwargs 

3691 

3692 @classmethod 

3693 def addcommand( 

3694 cls, 

3695 spaces: bool = False, 

3696 globsupport: bool = False, 

3697 ) -> Callable[[DecoratorCallable], DecoratorCallable]: 

3698 """ 

3699 Decorator to register a command 

3700 """ 

3701 def func(cmd: DecoratorCallable) -> DecoratorCallable: 

3702 cmd.cliutil_type = _CLIUtilMetaclass.TYPE.COMMAND # type: ignore 

3703 cmd._spaces = spaces # type: ignore 

3704 cmd._globsupport = globsupport # type: ignore 

3705 cls._inspectkwargs(cmd) 

3706 if cmd._globsupport and not cmd._spaces: # type: ignore 

3707 raise ValueError("Cannot use globsupport without spaces.") 

3708 return cmd 

3709 return func 

3710 

3711 @classmethod 

3712 def addoutput(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3713 """ 

3714 Decorator to register a command output processor 

3715 """ 

3716 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3717 processor.cliutil_type = _CLIUtilMetaclass.TYPE.OUTPUT # type: ignore 

3718 processor.cliutil_ref = cmd # type: ignore 

3719 cls._inspectkwargs(processor) 

3720 return processor 

3721 return func 

3722 

3723 @classmethod 

3724 def addcomplete(cls, cmd: DecoratorCallable) -> Callable[[DecoratorCallable], DecoratorCallable]: # noqa: E501 

3725 """ 

3726 Decorator to register a command completor 

3727 """ 

3728 def func(processor: DecoratorCallable) -> DecoratorCallable: 

3729 processor.cliutil_type = _CLIUtilMetaclass.TYPE.COMPLETE # type: ignore 

3730 processor.cliutil_ref = cmd # type: ignore 

3731 return processor 

3732 return func 

3733 

3734 def ps1(self) -> str: 

3735 """ 

3736 Return the PS1 of the shell 

3737 """ 

3738 return "> " 

3739 

3740 def close(self) -> None: 

3741 """ 

3742 Function called on exiting 

3743 """ 

3744 print("Exited") 

3745 

3746 def help(self, cmd: Optional[str] = None) -> None: 

3747 """ 

3748 Return the help related to this CLI util 

3749 """ 

3750 def _args(func: Any) -> str: 

3751 flags = func._flags.copy() 

3752 if func.__name__ in self.commands_output: 

3753 flags += self.commands_output[func.__name__]._flags # type: ignore 

3754 return " %s%s" % ( 

3755 ( 

3756 "%s " % " ".join("[%s]" % x for x in flags) 

3757 if flags else "" 

3758 ), 

3759 " ".join( 

3760 "<%s%s>" % ( 

3761 x.name, 

3762 "?" if 

3763 (x.default is None or x.default != inspect.Parameter.empty) 

3764 else "" 

3765 ) 

3766 for x in list(inspect.signature(func).parameters.values())[1:] 

3767 if x.name not in func._flagnames and x.name[0] != "_" 

3768 ) 

3769 ) 

3770 

3771 if cmd: 

3772 if cmd not in self.commands: 

3773 print("Unknown command '%s'" % cmd) 

3774 return 

3775 # help for one command 

3776 func = self.commands[cmd] 

3777 print("%s%s: %s" % ( 

3778 cmd, 

3779 _args(func), 

3780 func.__doc__ and func.__doc__.strip() 

3781 )) 

3782 else: 

3783 header = "│ %s - Help │" % self.__class__.__name__ 

3784 print("┌" + "─" * (len(header) - 2) + "┐") 

3785 print(header) 

3786 print("└" + "─" * (len(header) - 2) + "┘") 

3787 print( 

3788 pretty_list( 

3789 [ 

3790 ( 

3791 cmd, 

3792 _args(func), 

3793 func.__doc__ and func.__doc__.strip().split("\n")[0] or "" 

3794 ) 

3795 for cmd, func in self.commands.items() 

3796 ], 

3797 [("Command", "Arguments", "Description")] 

3798 ) 

3799 ) 

3800 

3801 def _completer(self) -> 'prompt_toolkit.completion.Completer': 

3802 """ 

3803 Returns a prompt_toolkit custom completer 

3804 """ 

3805 from prompt_toolkit.completion import Completer, Completion 

3806 

3807 class CLICompleter(Completer): 

3808 def get_completions(cmpl, document, complete_event): # type: ignore 

3809 if not complete_event.completion_requested: 

3810 # Only activate when the user does <TAB> 

3811 return 

3812 parts = document.text.split(" ") 

3813 cmd = parts[0].lower() 

3814 if cmd not in self.commands: 

3815 # We are trying to complete the command 

3816 for possible_cmd in (x for x in self.commands if x.startswith(cmd)): 

3817 yield Completion(possible_cmd, start_position=-len(cmd)) 

3818 else: 

3819 # We are trying to complete the command content 

3820 if len(parts) == 1: 

3821 return 

3822 args, _, _ = self._parseallargs(self.commands[cmd], cmd, parts[1:]) 

3823 arg = " ".join(args) 

3824 if cmd in self.commands_complete: 

3825 for possible_arg in self.commands_complete[cmd](self, arg): 

3826 yield Completion(possible_arg, start_position=-len(arg)) 

3827 return 

3828 return CLICompleter() 

3829 

3830 def loop(self, debug: int = 0) -> None: 

3831 """ 

3832 Main command handling loop 

3833 """ 

3834 from prompt_toolkit import PromptSession 

3835 session = PromptSession(completer=self._completer()) 

3836 

3837 while True: 

3838 try: 

3839 cmd = session.prompt(self.ps1()).strip() 

3840 except KeyboardInterrupt: 

3841 continue 

3842 except EOFError: 

3843 self.close() 

3844 break 

3845 args = cmd.split(" ")[1:] 

3846 cmd = cmd.split(" ")[0].strip().lower() 

3847 if not cmd: 

3848 continue 

3849 if cmd in ["help", "h", "?"]: 

3850 self.help(" ".join(args)) 

3851 continue 

3852 if cmd in "exit": 

3853 break 

3854 if cmd not in self.commands: 

3855 print("Unknown command. Type help or ?") 

3856 else: 

3857 # check the number of arguments 

3858 func = self.commands[cmd] 

3859 args, kwargs, outkwargs = self._parseallargs(func, cmd, args) 

3860 if func._spaces: # type: ignore 

3861 args = [" ".join(args)] 

3862 # if globsupport is set, we might need to do several calls 

3863 if func._globsupport and "*" in args[0]: # type: ignore 

3864 if args[0].count("*") > 1: 

3865 print("More than 1 glob star (*) is currently unsupported.") 

3866 continue 

3867 before, after = args[0].split("*", 1) 

3868 reg = re.compile(re.escape(before) + r".*" + after) 

3869 calls = [ 

3870 [x] for x in 

3871 self.commands_complete[cmd](self, before) 

3872 if reg.match(x) 

3873 ] 

3874 else: 

3875 calls = [args] 

3876 else: 

3877 calls = [args] 

3878 # now iterate if required, call the function and print its output 

3879 res = None 

3880 for args in calls: 

3881 try: 

3882 res = func(self, *args, **kwargs) 

3883 except TypeError: 

3884 print("Bad number of arguments !") 

3885 self.help(cmd=cmd) 

3886 continue 

3887 except Exception as ex: 

3888 print("Command failed with error: %s" % ex) 

3889 if debug: 

3890 traceback.print_exception(ex) 

3891 try: 

3892 if res and cmd in self.commands_output: 

3893 self.commands_output[cmd](self, res, **outkwargs) 

3894 except Exception as ex: 

3895 print("Output processor failed with error: %s" % ex) 

3896 

3897 

3898def AutoArgparse( 

3899 func: DecoratorCallable, 

3900 _parseonly: bool = False, 

3901) -> Optional[Tuple[List[str], List[str]]]: 

3902 """ 

3903 Generate an Argparse call from a function, then call this function. 

3904 

3905 Notes: 

3906 

3907 - for the arguments to have a description, the sphinx docstring format 

3908 must be used. See 

3909 https://sphinx-rtd-tutorial.readthedocs.io/en/latest/docstrings.html 

3910 - the arguments must be typed in Python (we ignore Sphinx-specific types) 

3911 untyped arguments are ignored. 

3912 - only types that would be supported by argparse are supported. The others 

3913 are omitted. 

3914 """ 

3915 argsdoc = {} 

3916 if func.__doc__: 

3917 # Sphinx doc format parser 

3918 m = re.match( 

3919 r"((?:.|\n)*?)(\n\s*:(?:param|type|raises|return|rtype)(?:.|\n)*)", 

3920 func.__doc__.strip(), 

3921 ) 

3922 if not m: 

3923 desc = func.__doc__.strip() 

3924 else: 

3925 desc = m.group(1) 

3926 sphinxargs = re.findall( 

3927 r"\s*:(param|type|raises|return|rtype)\s*([^:]*):(.*)", 

3928 m.group(2), 

3929 ) 

3930 for argtype, argparam, argdesc in sphinxargs: 

3931 argparam = argparam.strip() 

3932 argdesc = argdesc.strip() 

3933 if argtype == "param": 

3934 if not argparam: 

3935 raise ValueError(":param: without a name !") 

3936 argsdoc[argparam] = argdesc 

3937 else: 

3938 desc = "" 

3939 

3940 # Process the parameters 

3941 positional = [] 

3942 noargument = [] 

3943 hexarguments = [] 

3944 parameters = {} 

3945 for param in inspect.signature(func).parameters.values(): 

3946 if not param.annotation: 

3947 continue 

3948 noarg = False 

3949 parname = param.name.replace("_", "-") 

3950 paramkwargs: Dict[str, Any] = {} 

3951 if param.annotation is bool: 

3952 if param.default is True: 

3953 parname = "no-" + parname 

3954 paramkwargs["action"] = "store_false" 

3955 else: 

3956 paramkwargs["action"] = "store_true" 

3957 noarg = True 

3958 elif param.annotation is bytes: 

3959 paramkwargs["type"] = str 

3960 hexarguments.append(parname) 

3961 elif param.annotation in [str, int, float]: 

3962 paramkwargs["type"] = param.annotation 

3963 else: 

3964 continue 

3965 if param.default != inspect.Parameter.empty: 

3966 if param.kind == inspect.Parameter.POSITIONAL_ONLY: 

3967 positional.append(param.name) 

3968 paramkwargs["nargs"] = '?' 

3969 else: 

3970 parname = "--" + parname 

3971 paramkwargs["default"] = param.default 

3972 else: 

3973 positional.append(param.name) 

3974 if param.kind == inspect.Parameter.VAR_POSITIONAL: 

3975 paramkwargs["action"] = "append" 

3976 if param.name in argsdoc: 

3977 paramkwargs["help"] = argsdoc[param.name] 

3978 if param.annotation is bytes: 

3979 paramkwargs["help"] = "(hex) " + paramkwargs["help"] 

3980 elif param.annotation is bool: 

3981 paramkwargs["help"] = "(flag) " + paramkwargs["help"] 

3982 else: 

3983 paramkwargs["help"] = ( 

3984 "(%s) " % param.annotation.__name__ + paramkwargs["help"] 

3985 ) 

3986 # Add to the parameter list 

3987 parameters[parname] = paramkwargs 

3988 if noarg: 

3989 noargument.append(parname) 

3990 

3991 if _parseonly: 

3992 # An internal mode used to generate bash autocompletion, do it then exit. 

3993 return ( 

3994 [x for x in parameters if x not in positional] + ["--help"], 

3995 [x for x in noargument if x not in positional] + ["--help"], 

3996 ) 

3997 

3998 # Now build the argparse.ArgumentParser 

3999 parser = argparse.ArgumentParser( 

4000 prog=func.__name__, 

4001 description=desc, 

4002 formatter_class=argparse.ArgumentDefaultsHelpFormatter, 

4003 ) 

4004 

4005 # Add parameters to parser 

4006 for parname, paramkwargs in parameters.items(): 

4007 parser.add_argument(parname, **paramkwargs) 

4008 

4009 # Now parse the sys.argv parameters 

4010 params = vars(parser.parse_args()) 

4011 

4012 # Convert hex parameters if provided 

4013 for p in hexarguments: 

4014 if params[p] is not None: 

4015 try: 

4016 params[p] = bytes.fromhex(params[p]) 

4017 except ValueError: 

4018 print( 

4019 conf.color_theme.fail( 

4020 "ERROR: the value of parameter %s " 

4021 "'%s' is not valid hexadecimal !" % (p, params[p]) 

4022 ) 

4023 ) 

4024 return None 

4025 

4026 # Act as in interactive mode 

4027 conf.logLevel = 20 

4028 from scapy.themes import DefaultTheme 

4029 conf.color_theme = DefaultTheme() 

4030 # And call the function 

4031 try: 

4032 func( 

4033 *[params.pop(x) for x in positional], 

4034 **{ 

4035 (k[3:] if k.startswith("no_") else k): v 

4036 for k, v in params.items() 

4037 } 

4038 ) 

4039 except AssertionError as ex: 

4040 print(conf.color_theme.fail("ERROR: " + str(ex))) 

4041 parser.print_help() 

4042 return None 

4043 

4044 

4045####################### 

4046# PERIODIC SENDER # 

4047####################### 

4048 

4049 

4050class PeriodicSenderThread(threading.Thread): 

4051 def __init__(self, sock, pkt, interval=0.5, ignore_exceptions=True): 

4052 # type: (Any, _PacketIterable, float, bool) -> None 

4053 """ Thread to send packets periodically 

4054 

4055 Args: 

4056 sock: socket where packet is sent periodically 

4057 pkt: packet or list of packets to send 

4058 interval: interval between two packets 

4059 """ 

4060 if not isinstance(pkt, list): 

4061 self._pkts = [cast("Packet", pkt)] # type: _PacketIterable 

4062 else: 

4063 self._pkts = pkt 

4064 self._socket = sock 

4065 self._stopped = threading.Event() 

4066 self._enabled = threading.Event() 

4067 self._enabled.set() 

4068 self._interval = interval 

4069 self._ignore_exceptions = ignore_exceptions 

4070 threading.Thread.__init__(self) 

4071 

4072 def enable(self): 

4073 # type: () -> None 

4074 self._enabled.set() 

4075 

4076 def disable(self): 

4077 # type: () -> None 

4078 self._enabled.clear() 

4079 

4080 def run(self): 

4081 # type: () -> None 

4082 while not self._stopped.is_set() and not self._socket.closed: 

4083 for p in self._pkts: 

4084 try: 

4085 if self._enabled.is_set(): 

4086 self._socket.send(p) 

4087 except (OSError, TimeoutError) as e: 

4088 if self._ignore_exceptions: 

4089 return 

4090 else: 

4091 raise e 

4092 self._stopped.wait(timeout=self._interval) 

4093 if self._stopped.is_set() or self._socket.closed: 

4094 break 

4095 

4096 def stop(self): 

4097 # type: () -> None 

4098 self._stopped.set() 

4099 self.join(self._interval * 2) 

4100 

4101 

4102class SingleConversationSocket(object): 

4103 def __init__(self, o): 

4104 # type: (Any) -> None 

4105 self._inner = o 

4106 self._tx_mutex = threading.RLock() 

4107 

4108 @property 

4109 def __dict__(self): # type: ignore 

4110 return self._inner.__dict__ 

4111 

4112 def __getattr__(self, name): 

4113 # type: (str) -> Any 

4114 return getattr(self._inner, name) 

4115 

4116 def sr1(self, *args, **kargs): 

4117 # type: (*Any, **Any) -> Any 

4118 with self._tx_mutex: 

4119 return self._inner.sr1(*args, **kargs) 

4120 

4121 def sr(self, *args, **kargs): 

4122 # type: (*Any, **Any) -> Any 

4123 with self._tx_mutex: 

4124 return self._inner.sr(*args, **kargs) 

4125 

4126 def send(self, x): 

4127 # type: (Packet) -> Any 

4128 with self._tx_mutex: 

4129 try: 

4130 return self._inner.send(x) 

4131 except (ConnectionError, OSError) as e: 

4132 self._inner.close() 

4133 raise e